springboot Kafka 生产者自动创建topic配置
时间: 2023-09-07 08:14:15 浏览: 592
在 Spring Boot 中,可以使用 `spring-kafka` 库来配置 Kafka 生产者自动创建 topic。
首先,在 `application.properties` 或 `application.yml` 配置文件中添加以下属性:
```
spring.kafka.producer.properties.topic.metadata.refresh.interval.ms=1000
spring.kafka.admin.properties.bootstrap.servers=<kafka-broker-host>:<kafka-broker-port>
spring.kafka.admin.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.admin.properties.sasl.mechanism=PLAIN
spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
```
其中,`spring.kafka.admin.properties.bootstrap.servers` 是 Kafka 服务器的地址和端口,`spring.kafka.admin.properties.security.protocol`、`spring.kafka.admin.properties.sasl.mechanism` 和 `spring.kafka.admin.properties.sasl.jaas.config` 是 Kafka 安全认证相关的配置,需要根据实际情况进行修改。
接着,定义一个 Kafka 配置类,使用 `@Configuration` 注解标注,使用 `@EnableKafka` 注解开启 Kafka 相关功能,并在类中定义一个 `KafkaAdmin` Bean,用于管理 Kafka。
```java
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.admin.properties.bootstrap.servers}")
private String bootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
configs.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";");
return new KafkaAdmin(configs);
}
}
```
最后,在生产者中使用 `KafkaTemplate` 发送消息时,如果发送的 topic 不存在,Kafka 将会自动创建该 topic。
```java
@Service
public class MyProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("my-topic", message);
}
}
```
在上面的例子中,如果 `my-topic` 这个 topic 不存在,Kafka 将自动创建它。
阅读全文