java 监听kafka 当kafka入了新数据 就将获取到新数据做成list
时间: 2024-03-12 16:49:41 浏览: 18
要实现在Java中监听Kafka并将新数据保存到List中,可以对上面示例代码进行修改。以下是一个基本的示例代码:
```java
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topicName = "my-topic";
String groupId = "my-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
List<String> dataList = new ArrayList<>();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
dataList.add(record.value());
}
}
} finally {
consumer.close();
}
}
}
```
在此示例中,我们创建了一个List对象`dataList`,并在`for`循环中将每个记录的值添加到其中。因为我们在一个无限循环中,所以每次收到新记录时,列表都会不断增长。如果您希望在处理新数据时采取其他操作,您可以在`for`循环中添加其他逻辑。例如,您可以将数据写入数据库或将其发送到另一个系统。