Kafka消费者和生产者实例的使用步骤
时间: 2023-08-23 21:13:03 浏览: 197
Java实现Kafka生产者消费者代码实例
5星 · 资源好评率100%
使用Kafka消费者和生产者实例的步骤如下:
1. 首先,在项目的配置文件中引入Kafka客户端的依赖。可以在pom.xml文件中添加以下依赖项:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
```
2. 创建Kafka生产者实例。可以使用以下代码创建一个KafkaProducer的实例:
```java
@Configuration
public class Config {
public final static String bootstrapServers = "127.0.0.1:9092";
@Bean(destroyMethod = "close")
public KafkaProducer<String, String> kafkaProducer() {
Properties props = new Properties();
// 设置Kafka服务器地址
props.put("bootstrap.servers", bootstrapServers);
// 设置数据key的序列化处理类
props.put("key.serializer", StringSerializer.class.getName());
// 设置数据value的序列化处理类
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
}
```
3. 创建Kafka消费者实例。可以使用以下代码创建一个KafkaConsumer的实例:
```java
@Configuration
public class Config {
public final static String bootstrapServers = "127.0.0.1:9092";
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
Properties props = new Properties();
// 设置Kafka服务器地址
props.put("bootstrap.servers", bootstrapServers);
// 设置消费者组ID
props.put("group.id", "my-consumer-group");
// 设置数据key的反序列化处理类
props.put("key.deserializer", StringDeserializer.class.getName());
// 设置数据value的反序列化处理类
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}
```
4. 使用Kafka生产者发送消息。可以使用以下代码将消息发送到Kafka集群:
```java
@Autowired
private KafkaProducer<String, String> kafkaProducer;
public void sendMessage(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
kafkaProducer.send(record);
}
```
5. 使用Kafka消费者接收消息。可以使用以下代码从Kafka集群中消费消息:
```java
@Autowired
private KafkaConsumer<String, String> kafkaConsumer;
public void consumeMessages(String topic) {
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理接收到的消息
System.out.println("Received message: " + record.value());
}
}
}
```
请注意,以上代码只是示例,实际使用时需要根据具体的业务需求进行适当的修改和扩展。
#### 引用[.reference_title]
- *1* *3* [Java实现Kafka生产者和消费者的示例](https://blog.csdn.net/heihaozi/article/details/111042472)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^koosearch_v1,239^v3^insert_chatgpt"}} ] [.reference_item]
- *2* [Kafka系列之:Kafka生产者和消费者](https://blog.csdn.net/zhengzaifeidelushang/article/details/129284470)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^koosearch_v1,239^v3^insert_chatgpt"}} ] [.reference_item]
[ .reference_list ]
阅读全文