java 使用kafka
时间: 2023-11-08 09:51:05 浏览: 117
Java使用kafka发送和生产消息的示例
Java使用Kafka进行消息发送和生产的示例代码如下:
```java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
// 1. 创建生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 3. 发送1-100数字到Kafka的test主题中
for (int i = 1; i <= 100; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
// 阻塞程序往下执行,直到Kafka生产数据成功
RecordMetadata recordMetadata = future.get();
System.out.println("生产到" + recordMetadata.topic() + "成功!");
}
// 4. 关闭生产者连接
producer.close();
}
}
```
以上是一个简单的Java程序,使用Kafka的Java客户端库发送1到100的数字消息到名为"test"的Kafka主题中。程序首先创建一个生产者配置对象,设置了Kafka集群地址、消息确认方式以及序列化器等属性。然后,创建生产者对象,并使用`send`方法发送消息到Kafka主题。程序会阻塞直到消息发送成功,然后打印出发送成功的信息。最后,关闭生产者连接。
请注意,为了运行以上代码,你需要在项目中引入Kafka的Java客户端库。
阅读全文