不使用 KafkaListener 注解,实现对应功能
时间: 2024-05-08 16:17:07 浏览: 105
可以使用 KafkaConsumer API 来实现 Kafka 消费者的功能。以下是一个示例代码,可以实现从 Kafka 主题中消费消息:
```
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class MyKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka 服务器地址
props.put("group.id", "my-group"); // 消费者组 ID
props.put("key.deserializer", StringDeserializer.class.getName()); // 键反序列化器
props.put("value.deserializer", StringDeserializer.class.getName()); // 值反序列化器
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic")); // 订阅主题
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 拉取消息
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value()); // 处理消息
}
}
consumer.close(); // 关闭消费者
}
}
```
这个示例代码创建了一个 Kafka 消费者,使用订阅的方式从主题中拉取消息,并对每条消息进行处理。可以根据需要对代码进行修改,以满足具体的业务需求。
阅读全文