Kafka Java 消息生产与消费实战教程

需积分: 1 1 下载量 76 浏览量 更新于2024-08-03 收藏 5KB TXT 举报
本资源提供了一个使用Java编写的Kafka生产消费程序的示例,旨在帮助理解Kafka在Java环境下的工作方式。Kafka是一个高吞吐量的消息系统,主要用Scala编写,其生产和消费机制与传统消息系统有所区别。 在开始之前,你需要创建一个Maven项目,并在项目的`pom.xml`文件中添加Kafka的相关依赖。这里引用的Kafka版本为0.8.0,对应的依赖如下: ```xml <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency> ``` 接下来,我们将详细讲解如何编写Java生产者和消费者代码。 1. Kafka生产者(Producer) 在Java中,生产者用于发送消息到Kafka的主题(Topic)。以下是一个简单的`KafkaProducer`类示例: ```java package cn.outofmemory.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer { private final Producer<String, String> producer; public static final String TOPIC = "TEST-TOPIC"; public KafkaProducer() { Properties props = new Properties(); // 配置Kafka服务器地址 props.put("metadata.broker.list", "192.168.193.148:9092"); // 配置value和key的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); // 配置请求确认模式 props.put("request.required.acks", "1"); // 创建Producer实例 producer = new Producer<>(new ProducerConfig(props)); } // 发送消息到指定主题 public void sendMessage(String key, String message) { producer.send(new KeyedMessage<>(TOPIC, key, message)); } // 关闭生产者 public void close() { producer.close(); } } ``` 在这个示例中,我们创建了一个`KafkaProducer`类,初始化时设置Kafka服务器地址、序列化类以及请求确认模式。`sendMessage`方法用于发送消息,`close`方法用于关闭生产者连接。 2. Kafka消费者(Consumer) 消费者则从Kafka的主题中接收并处理消息。这里我们创建一个`KafkaConsumer`类: ```java package cn.outofmemory.kafka; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaConsumer { private final ConsumerConnector consumer; public KafkaConsumer(String zookeeper, String group) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", group); props.put("zookeeper.session.timeout.ms", "5000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "1000"); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); } // 获取消息流 public Map<String, List<KafkaStream<byte[], byte[]>>> getMsgStreams(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(KafkaProducer.TOPIC, numThreads); Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap); return streams; } // 开始消费 public void startConsuming(List<KafkaStream<byte[], byte[]>> streams) { for (final KafkaStream<byte[], byte[]> stream : streams) { new Thread(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } } }).start(); } } // 关闭消费者 public void shutdown() { consumer.shutdown(); } } ``` `KafkaConsumer`类创建了消费者实例,通过`getMsgStreams`获取消息流,然后在`startConsuming`中启动多线程进行消息消费。 在实际应用中,你可能需要根据自己的需求对这些示例代码进行调整,例如设置不同的序列化类、调整消费者分组策略、处理错误等。同时,确保Kafka服务器已正确安装和配置,Zookeeper也在运行,以便生产者和消费者能够正常通信。