用java 多线程技术 向Kafka发送数据 ,并3秒后停止
时间: 2023-05-22 22:00:38 浏览: 105
多线程发送数据
4星 · 用户满意度95%
可以使用 Kafka 的 Java 客户端库来实现向 Kafka 发送数据的功能。具体步骤如下:
1. 创建 Kafka 生产者对象,指定 Kafka 集群的地址和端口号。
2. 创建一个线程池,用于执行发送数据的任务。
3. 在线程池中创建一个任务,该任务的功能是向 Kafka 发送数据。
4. 在任务中使用生产者对象发送数据,并设置发送的消息内容。
5. 等待 3 秒钟后,停止发送数据的任务。
以下是示例代码:
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class KafkaProducerDemo {
public static void main(String[] args) {
// Kafka 集群的地址和端口号
String bootstrapServers = "localhost:9092";
// Kafka 主题名称
String topicName = "test-topic";
// 创建 Kafka 生产者配置对象
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 Kafka 生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 创建一个线程池,用于执行发送数据的任务
ExecutorService executor = Executors.newSingleThreadExecutor();
// 在线程池中创建一个任务,该任务的功能是向 Kafka 发送数据
executor.submit(() -> {
try {
// 发送数据
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
producer.send(record);
System.out.println("Sent message: " + message);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 等待 3 秒钟后,停止发送数据的任务
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
}
}
```
这段代码使用了 Kafka 的 Java 客户端库,创建了一个 Kafka 生产者对象,并使用线程池来执行发送数据的任务。任务中使用生产者对象发送数据,并设置发送的消息内容。等待 3 秒钟后,停止发送数据的任务。
阅读全文