示范kafka生产者代码
时间: 2023-10-03 14:05:46 浏览: 37
好的,下面是示范的kafka生产者代码:
```
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Asynchronous producer
future = producer.send('test-topic', b'Hello, World!')
```
请注意,这是一个简单示例代码,您需要根据实际情况进行修改和调整。如果您需要更多的帮助和指导,请告诉我。
相关问题
Springboot Kafka生产者代码封装
在Spring Boot中封装Kafka生产者代码可以通过创建一个KafkaProducerService类来实现。
首先,需要在项目中引入Kafka客户端依赖,如前面所述。
然后,创建一个KafkaProducerService类,示例如下:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
```
上述代码中,KafkaProducerService类使用了@Autowired注解来自动注入KafkaTemplate实例。KafkaTemplate是Spring Kafka提供的用于发送消息的模板类。
sendMessage方法用于发送消息到指定的topic。在方法中调用kafkaTemplate.send方法来发送消息。
接下来,可以在其他地方使用KafkaProducerService来发送消息,示例如下:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
@Autowired
public KafkaController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@PostMapping("/send")
public void sendMessage(@RequestBody String message) {
kafkaProducerService.sendMessage("my-topic", message);
}
}
```
上述代码中,KafkaController类使用@Autowired注解来自动注入KafkaProducerService实例。
在sendMessage方法中调用kafkaProducerService.sendMessage方法来发送消息到指定的topic。
这样就完成了Spring Boot中Kafka生产者代码的封装。通过调用KafkaProducerService的sendMessage方法,可以很方便地发送消息到Kafka。
kafka生产者消费者代码
Kafka的生产者和消费者代码可以通过使用Kafka提供的Java客户端API来实现。以下是一个简单的示例代码:
1. 生产者代码示例:
```
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置Kafka生产者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到指定的主题
String topic = "hello";
String message = "hello kafka";
producer.send(new ProducerRecord<>(topic, message));
// 关闭生产者连接
producer.close();
}
}
```
2. 消费者代码示例:
```
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置Kafka消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅指定的主题
String topic = "hello";
consumer.subscribe(Collections.singletonList(topic));
// 持续消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("消费消息: " + record.value());
}
}
}
}
```
这些示例代码演示了如何使用Java编写Kafka生产者和消费者。生产者代码创建一个KafkaProducer对象,并使用send方法发送消息到指定的主题。消费者代码创建一个KafkaConsumer对象,并订阅指定的主题,然后使用poll方法不断地消费消息。请根据实际需求修改主题名称和Kafka服务器地址等配置信息。<span class="em">1</span><span class="em">2</span><span class="em">3</span>
#### 引用[.reference_title]
- *1* *2* *3* [Kafka06:【案例】Java操作Kafka:Java代码实现生产者代码、Java代码实现消费者代码、消费者代码扩展、...](https://blog.csdn.net/weixin_40612128/article/details/123455387)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 100%"]
[ .reference_list ]