kafka的通道输入通道及通配符yml配置示例 Java
时间: 2023-07-05 07:15:24 浏览: 241
Kafka客户端开发实例java源码.zip
以下是Kafka通道输入通道和通配符的Java配置示例:
1. 生产者端通道输入通道的配置示例
```java
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
```
2. 消费者端通道输入通道的配置示例
```java
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.auto-commit-interval-ms}")
private int autoCommitIntervalMs;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
3. 消费者订阅主题的通配符配置示例
```java
@KafkaListener(topics = "test*", groupId = "group1")
public void listenGroup1(String message) {
System.out.println("Group1 Received Message in topic test*: " + message);
}
@KafkaListener(topics = "*test", groupId = "group2")
public void listenGroup2(String message) {
System.out.println("Group2 Received Message in topic *test: " + message);
}
@KafkaListener(topics = "*test*", groupId = "group3")
public void listenGroup3(String message) {
System.out.println("Group3 Received Message in topic *test*: " + message);
}
```
以上示例中,订阅了以test开头、以test结尾、包含test的三个主题。
阅读全文