如何设置KafkaListener 的properties ,每次拉取10条
时间: 2024-05-06 15:15:26 浏览: 122
Kafka消费者探秘:深入消息拉取机制
在 Spring Boot 中使用 KafkaListener,你可以通过在 KafkaListener 注解中设置 containerFactory 属性,来指定 KafkaListenerContainerFactory 实例,然后在 KafkaListenerContainerFactory 实例中设置 ConsumerFactory 的属性,以控制消费者的行为。具体来说,你可以在 ConsumerFactory 实例中设置 consumer.poll.records 属性,来指定每次拉取的最大消息数。
以下是一个示例代码:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
});
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1000L);
factory.getContainerProperties().setIdleBetweenPolls(100L);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setCommitSyncRetries(3);
factory.getContainerProperties().setCommitInterval(100L);
factory.getContainerProperties().setLogContainerConfig(true);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setMonitorInterval(5000L);
factory.getContainerProperties().setNoPollThreshold(5);
factory.getContainerProperties().setPollTimeout(5000L);
factory.getContainerProperties().setPollTimeout(null);
factory.getContainerProperties().setRecordFilterStrategy(r -> {
System.out.println("Received message: " + r);
return r.value().toLowerCase().contains("error");
});
factory.getContainerProperties().setConsumerTaskExecutor(new ThreadPoolExecutor(
10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()));
return factory;
}
@KafkaListener(topics = "my-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> messages) {
System.out.println("Received " + messages.size() + " messages:");
messages.forEach(System.out::println);
}
}
```
在上面的代码中,我们通过设置 consumerFactory 的 consumer.poll.records 属性为 10,来指定每次拉取的最大消息数。同时,我们也设置了一些其他的属性,来控制消费者的行为,这些属性可以根据你的需求进行调整。
阅读全文