springboot整合kafka代码
时间: 2023-06-03 09:06:53 浏览: 34
SpringBoot可以通过使用KafkaTemplate类来整合Kafka,提供给消息生产者和消费者使用。我们可以使用@EnableKafka注解来激活Kafka相关的Bean,并使用@KafkaListener注解来监听Kafka消息。
相关问题
springboot整合kafka带鉴权代码
非常感谢您的问题,以下是 Spring Boot 整合 Kafka 带鉴权的代码:
@Configuration
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.username}")
private String username;
@Value("${kafka.password}")
private String password;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
请注意,此代码需要您配置 `bootstrap-servers`、`username` 和 `password` 三个变量以配置连接 Kafak 的相关参数。
展示一下springboot整合kafka示例代码
以下是一个简单的Spring Boot应用程序,演示了如何使用Kafka生产者和消费者。
1. 添加Kafka依赖
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
2. 配置Kafka
```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
```
3. 编写Kafka生产者
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "test-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
this.kafkaTemplate.send(TOPIC, message);
}
}
```
4. 编写Kafka消费者
```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
```
5. 测试
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {
@Autowired
private KafkaProducer kafkaProducer;
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
kafkaProducer.sendMessage("Hello, Kafka!");
}
}
```
在运行应用程序后,将打印出接收到的消息。
相关推荐













