kafka实现聊天功能代码
时间: 2023-10-13 21:14:47 浏览: 105
这里提供一个简单的Kafka聊天室示例代码,使用Java编写:
1. 首先需要引入Kafka的Java客户端库,可以使用Maven进行引入:
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
```
2. 创建Kafka生产者和消费者:
```
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaChatRoom {
private final static String TOPIC_NAME = "chat_room";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// 创建Kafka生产者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// 创建Kafka消费者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "chat_room_consumer");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 启动Kafka聊天室
new KafkaChatRoom(producer, consumer).start();
}
private final Producer<String, String> producer;
private final Consumer<String, String> consumer;
public KafkaChatRoom(Producer<String, String> producer, Consumer<String, String> consumer) {
this.producer = producer;
this.consumer = consumer;
}
public void start() {
// 启动发送消息的线程
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String message = scanner.nextLine();
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
}
}).start();
// 启动接收消息的线程
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(record -> {
System.out.println(record.value());
});
}
}).start();
}
}
```
3. 启动Kafka聊天室:
```
public static void main(String[] args) {
// 创建Kafka生产者和消费者
// ...
// 启动Kafka聊天室
new KafkaChatRoom(producer, consumer).start();
}
```
4. 运行聊天室后,可以在控制台输入消息进行聊天,消息将会广播给所有连接到聊天室的客户端。
阅读全文