3. Kafka消费者订阅分区 在命令行窗口中启动Kafka后,手动创建主题“assgin tonic”分区就号*2。具体命令1下: .bin\windows kkafka-topics.bat --create--zo0keeper loca 1heet.210renlicatiofEactor 1 --partitions 2 --topic assign_topic 根据上面给出的主题,完成如下操作。 (1)编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题“assign topic”"。 (2)编写消费者程子1,阅主题的分区0,只消费分区0数据。 (3)编写消费者程了2,阅土题的分区1,只消费分区1数据。
时间: 2024-02-06 15:12:27 浏览: 169
springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区
好的,我会按照您的要求进行回答。
针对您的问题,我可以给出以下步骤:
1. 手动创建主题和分区
在命令行中执行以下命令:
```
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
```
这个命令会在本地的Kafka中创建一个名为"assign_topic"的主题,并且创建两个分区。
2. 编写生产者程序
使用Java语言编写一个Kafka生产者程序,代码如下:
```java
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception{
String topicName = "assign_topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName, UUID.randomUUID().toString()));
System.out.println("Message sent successfully");
producer.close();
}
}
```
这个程序会向"assign_topic"主题发送10条消息,每个消息的内容是一个UUID。
3. 编写消费者程序
使用Java语言编写两个Kafka消费者程序,一个消费分区0的数据,一个消费分区1的数据。代码如下:
消费者1:
```java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerExample1 {
public static void main(String[] args) throws Exception {
String topicName = "assign_topic";
String groupName = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
if (record.partition() == 0) {
System.out.println("Consumer 1: Partition " + record.partition() + ", Offset " + record.offset() + ", Message " + record.value());
}
});
consumer.commitSync();
}
}
}
```
这个程序会消费"assign_topic"主题的分区0中的数据,并且打印出每个消息的分区号、偏移量和内容。
消费者2:
```java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerExample2 {
public static void main(String[] args) throws Exception {
String topicName = "assign_topic";
String groupName = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
if (record.partition() == 1) {
System.out.println("Consumer 2: Partition " + record.partition() + ", Offset " + record.offset() + ", Message " + record.value());
}
});
consumer.commitSync();
}
}
}
```
这个程序会消费"assign_topic"主题的分区1中的数据,并且打印出每个消息的分区号、偏移量和内容。
注意:这两个消费者程序都使用了相同的消费者组ID,这意味着它们会互相竞争消费"assign_topic"主题的消息。如果您想让两个程序同时消费所有的消息,可以将它们的消费者组ID设置为不同的值。
以上就是答案,希望能帮到您!
阅读全文