kafka在java代码中如何写
时间: 2024-11-10 09:26:45 浏览: 6
Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和发布/订阅消息系统。在Java中使用Kafka,首先需要添加依赖并创建配置。以下是一个简单的步骤概述:
1. **添加依赖**:
在Maven项目中,添加以下到`pom.xml`文件的`<dependencies>`部分:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your_kafka_version</version>
</dependency>
```
2. **配置生产者** (发送消息):
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test-topic", "message-key", "message-value"));
producer.close();
```
3. **配置消费者** (接收消息):
```java
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "consumer-group"); // 消费组标识
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 每隔100ms拉取一次
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.close();
```
4. **高级特性** (如偏移量管理、事务等) 需要根据官方文档进一步学习。
阅读全文