使用Python发送kafka消息,如何使用java接受
时间: 2024-11-24 12:41:31 浏览: 5
Python通过kerberos安全认证操作kafka方式
在Python中,你可以通过`kafka-python`库来发送Kafka消息。首先,你需要安装这个库,可以使用pip命令:
```bash
pip install kafka-python
```
然后,发送消息的示例代码通常会类似这样:
```python
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092') # 连接至你的Kafka服务器
# 消息内容
message = "Hello, Kafka!"
# 发送主题和消息
future = producer.send('your-topic', key=None, value=message.encode('utf-8'))
# 等待消息确认
record_metadata = future.get()
```
在Java中,你可以使用`kafka-clients`库接收Kafka消息。首先,需要添加依赖到你的Maven项目中:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your-kafka-version</version>
</dependency>
```
接收消息的Java代码可能会像这样:
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class KafkaConsumer {
private final Consumer<String, String> consumer;
public KafkaConsumer(String bootstrapServers) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "your-group-id"); // 消费者组ID
props.put("enable.auto.commit", "true");
this.consumer = new KafkaConsumer<>(props);
}
public void consume() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
```
启动消费线程并连接到Kafka即可。
阅读全文