请使用 springboot 2.6.8 实现 : 仿照 KafkaListener 注解创建消费者方式,根据 topic 列表配置 创建不同消费者类
时间: 2024-05-12 10:19:53 浏览: 87
Springboot集成Kafka实现producer和consumer的示例代码
1. 创建消费者接口
```java
public interface Consumer<T> {
void consume(T message);
}
```
2. 创建不同消费者类
```java
@Component
public class TopicAConsumer implements Consumer<String> {
@Override
public void consume(String message) {
System.out.println("TopicAConsumer consume: " + message);
}
}
@Component
public class TopicBConsumer implements Consumer<String> {
@Override
public void consume(String message) {
System.out.println("TopicBConsumer consume: " + message);
}
}
@Component
public class TopicCConsumer implements Consumer<Integer> {
@Override
public void consume(Integer message) {
System.out.println("TopicCConsumer consume: " + message);
}
}
```
3. 创建消费者配置类
```java
@Configuration
public class ConsumerConfig {
private final Map<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
@Autowired
private List<Consumer<?>> allConsumers;
@PostConstruct
public void init() {
allConsumers.forEach(consumer -> {
Topic topic = consumer.getClass().getAnnotation(Topic.class);
if (topic != null) {
String[] topics = topic.value();
for (String t : topics) {
consumers.put(t, consumer);
}
}
});
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultConsumerFactory(consumers);
}
@Bean
public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {
return new KafkaListenerAnnotationBeanPostProcessor(consumerFactory());
}
}
```
4. 创建消费者工厂类
```java
public interface ConsumerFactory {
Consumer<?> getConsumer(String topic);
}
public class DefaultConsumerFactory implements ConsumerFactory {
private final Map<String, Consumer<?>> consumers;
public DefaultConsumerFactory(Map<String, Consumer<?>> consumers) {
this.consumers = consumers;
}
@Override
public Consumer<?> getConsumer(String topic) {
return consumers.get(topic);
}
}
```
5. 创建注解
```java
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Topic {
String[] value();
}
```
6. 创建测试类
```java
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
}
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private KafkaProducer kafkaProducer;
@Override
public void run(String... args) throws Exception {
kafkaProducer.send("topicA", "hello topicA");
kafkaProducer.send("topicB", "hello topicB");
kafkaProducer.send("topicC", 123);
}
}
```
7. 测试结果
```
TopicAConsumer consume: hello topicA
TopicBConsumer consume: hello topicB
TopicCConsumer consume: 123
```
阅读全文