Kafka Java 消息生产与消费实战教程
需积分: 1 76 浏览量
更新于2024-08-03
收藏 5KB TXT 举报
本资源提供了一个使用Java编写的Kafka生产消费程序的示例,旨在帮助理解Kafka在Java环境下的工作方式。Kafka是一个高吞吐量的消息系统,主要用Scala编写,其生产和消费机制与传统消息系统有所区别。
在开始之前,你需要创建一个Maven项目,并在项目的`pom.xml`文件中添加Kafka的相关依赖。这里引用的Kafka版本为0.8.0,对应的依赖如下:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>
```
接下来,我们将详细讲解如何编写Java生产者和消费者代码。
1. Kafka生产者(Producer)
在Java中,生产者用于发送消息到Kafka的主题(Topic)。以下是一个简单的`KafkaProducer`类示例:
```java
package cn.outofmemory.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
private final Producer<String, String> producer;
public static final String TOPIC = "TEST-TOPIC";
public KafkaProducer() {
Properties props = new Properties();
// 配置Kafka服务器地址
props.put("metadata.broker.list", "192.168.193.148:9092");
// 配置value和key的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 配置请求确认模式
props.put("request.required.acks", "1");
// 创建Producer实例
producer = new Producer<>(new ProducerConfig(props));
}
// 发送消息到指定主题
public void sendMessage(String key, String message) {
producer.send(new KeyedMessage<>(TOPIC, key, message));
}
// 关闭生产者
public void close() {
producer.close();
}
}
```
在这个示例中,我们创建了一个`KafkaProducer`类,初始化时设置Kafka服务器地址、序列化类以及请求确认模式。`sendMessage`方法用于发送消息,`close`方法用于关闭生产者连接。
2. Kafka消费者(Consumer)
消费者则从Kafka的主题中接收并处理消息。这里我们创建一个`KafkaConsumer`类:
```java
package cn.outofmemory.kafka;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer {
private final ConsumerConnector consumer;
public KafkaConsumer(String zookeeper, String group) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", group);
props.put("zookeeper.session.timeout.ms", "5000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.interval.ms", "1000");
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
// 获取消息流
public Map<String, List<KafkaStream<byte[], byte[]>>> getMsgStreams(int numThreads) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(KafkaProducer.TOPIC, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
return streams;
}
// 开始消费
public void startConsuming(List<KafkaStream<byte[], byte[]>> streams) {
for (final KafkaStream<byte[], byte[]> stream : streams) {
new Thread(new Runnable() {
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
}).start();
}
}
// 关闭消费者
public void shutdown() {
consumer.shutdown();
}
}
```
`KafkaConsumer`类创建了消费者实例,通过`getMsgStreams`获取消息流,然后在`startConsuming`中启动多线程进行消息消费。
在实际应用中,你可能需要根据自己的需求对这些示例代码进行调整,例如设置不同的序列化类、调整消费者分组策略、处理错误等。同时,确保Kafka服务器已正确安装和配置,Zookeeper也在运行,以便生产者和消费者能够正常通信。
点击了解资源详情
点击了解资源详情
点击了解资源详情
131 浏览量
2019-03-14 上传
2018-01-24 上传
2018-06-20 上传
2021-09-29 上传
2020-08-18 上传
smartsmile2012
- 粉丝: 866
- 资源: 83
最新资源
- 深入浅出:自定义 Grunt 任务的实践指南
- 网络物理突变工具的多点路径规划实现与分析
- multifeed: 实现多作者间的超核心共享与同步技术
- C++商品交易系统实习项目详细要求
- macOS系统Python模块whl包安装教程
- 掌握fullstackJS:构建React框架与快速开发应用
- React-Purify: 实现React组件纯净方法的工具介绍
- deck.js:构建现代HTML演示的JavaScript库
- nunn:现代C++17实现的机器学习库开源项目
- Python安装包 Acquisition-4.12-cp35-cp35m-win_amd64.whl.zip 使用说明
- Amaranthus-tuberculatus基因组分析脚本集
- Ubuntu 12.04下Realtek RTL8821AE驱动的向后移植指南
- 掌握Jest环境下的最新jsdom功能
- CAGI Toolkit:开源Asterisk PBX的AGI应用开发
- MyDropDemo: 体验QGraphicsView的拖放功能
- 远程FPGA平台上的Quartus II17.1 LCD色块闪烁现象解析