用java 模拟100个节点,向Kafka发送数据 ,并体现100个不同的consumers
时间: 2023-05-20 21:06:16 浏览: 138
可以使用Kafka的Java客户端API来实现这个功能。首先,需要创建一个Kafka生产者实例,然后使用该实例向Kafka发送数据。同时,需要创建100个Kafka消费者实例,每个消费者实例订阅不同的主题,以体现100个不同的consumers。
以下是Java代码示例:
// 创建Kafka生产者实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 向Kafka发送数据
for (int i = 0; i < 100; i++) {
String topic = "topic" + i;
String message = "message" + i;
producer.send(new ProducerRecord<>(topic, message));
}
// 创建100个Kafka消费者实例
List<Consumer<String, String>> consumers = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String topic = "topic" + i;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group" + i);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
consumers.add(consumer);
}
// 消费Kafka消息
while (true) {
for (Consumer<String, String> consumer : consumers) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}
注意:以上代码仅为示例,实际使用时需要根据具体情况进行修改和优化。