main方法编写kafka收发消息
时间: 2023-10-28 07:10:33 浏览: 136
通过pykafka接收Kafka消息队列的方法
以下是一个简单的Java程序,用于在Kafka中发送和接收消息:
```java
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
public class KafkaDemo {
public static void main(String[] args) throws Exception {
String topicName = "my-topic";
String groupName = "my-group";
String message = "Hello, Kafka!";
int partition = 0;
String kafkaServer = "localhost:9092";
// 创建一个生产者对象
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", kafkaServer);
producerProps.put("key.serializer", StringSerializer.class.getName());
producerProps.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// 发送一条消息
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, partition, "key", message);
producer.send(record);
// 创建一个消费者对象
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", kafkaServer);
consumerProps.put("group.id", groupName);
consumerProps.put("key.deserializer", StringDeserializer.class.getName());
consumerProps.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 订阅主题
consumer.subscribe(Arrays.asList(topicName));
// 接收消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
```
这个程序中,首先创建了一个生产者对象,然后使用这个生产者对象发送一条消息。接下来创建了一个消费者对象,订阅指定的主题,然后在一个循环中不断接收消息。在接收到消息之后,程序会打印出消息的关键字、值、分区和偏移量。
请注意,这个程序只是一个简单的示例,实际的Kafka应用程序需要更加复杂和健壮。例如,需要考虑错误处理、分区分配、消息确认和重试等问题。
阅读全文