kafka读写操作代码
时间: 2024-03-28 22:34:18 浏览: 106
在springboot中对kafka进行读写的示例代码
Kafka是一个分布式流处理平台,它提供了高吞吐量、可持久化的消息传递系统。下面是Kafka读写操作的代码示例:
1. 生产者(Producer)代码示例:
```java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置Kafka生产者的属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
// 创建Kafka生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到指定主题
String topic = "my-topic";
String key = "key1";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Error sending message: " + exception.getMessage());
} else {
System.out.println("Message sent successfully! Topic: " + metadata.topic() +
", Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
}
}
});
// 关闭Kafka生产者
producer.close();
}
}
```
2. 消费者(Consumer)代码示例:
```java
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置Kafka消费者的属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器
props.put("group.id", "my-group"); // 消费者组ID
// 创建Kafka消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
String topic = "my-topic";
consumer.subscribe(Collections.singletonList(topic));
// 持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: Key = " + record.key() +
", Value = " + record.value() +
", Topic = " + record.topic() +
", Partition = " + record.partition() +
", Offset = " + record.offset());
}
}
// 关闭Kafka消费者
consumer.close();
}
}
```
以上是Java语言的示例代码,你可以根据自己的需求进行修改和扩展。如果你使用其他编程语言,可以参考对应语言的Kafka客户端库文档进行开发。
阅读全文