java kafka消费指定字符集
时间: 2023-06-28 11:08:41 浏览: 165
xinzhuxiansheng#blog-notes#kafka 指定分区、指定偏移量消费1
在Java中,Kafka消费者默认使用UTF-8字符集来处理消息。如果你想要使用其他字符集来消费消息,可以在消费者配置中设置相应的字符集。
以下是一个简单的示例代码,演示如何使用GBK字符集来消费Kafka消息:
```
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 设置字符集
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer.encoding", "GBK");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
consumer.commitSync();
}
}
}
```
在上面的代码中,我们将`value.deserializer.encoding`设置为`GBK`,这样消费者就可以使用GBK字符集来处理消息。
阅读全文