那我想让两个消费组消费 创建两个分区可以吗
时间: 2023-10-05 13:04:26 浏览: 46
是的,你可以创建两个分区,并让两个消费组分别消费这两个分区。这样,每个消费组将独立地消费自己被分配的分区的数据。
以下是在Java中创建两个分区并让两个消费组消费的示例代码:
1. 创建一个名为"topic-name"的主题,并设置两个分区。
```java
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(adminProps);
NewTopic topic = new NewTopic("topic-name", 2, (short) 1);
adminClient.createTopics(Collections.singleton(topic));
```
2. 创建两个不同的消费者组,并分别订阅该主题。
```java
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092");
props1.put("group.id", "consumer-group1");
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props1);
consumer1.subscribe(Arrays.asList("topic-name"));
Properties props2 = new Properties();
props2.put("bootstrap.servers", "localhost:9092");
props2.put("group.id", "consumer-group2");
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props2);
consumer2.subscribe(Arrays.asList("topic-name"));
```
3. 在两个消费者中分别处理分配到的分区的数据。
```java
while (true) {
ConsumerRecords<String, String> records1 = consumer1.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records1) {
// 处理消费者组1接收到的记录
System.out.println("Consumer 1 received message: " + record.value());
}
ConsumerRecords<String, String> records2 = consumer2.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records2) {
// 处理消费者组2接收到的记录
System.out.println("Consumer 2 received message: " + record.value());
}
}
```
通过以上代码,你可以创建两个分区并让两个消费组分别消费这两个分区的数据。每个消费者组将独立地处理自己所分配到的分区的消息。
阅读全文