java kafka
时间: 2023-08-27 18:13:56 浏览: 120
Java Kafka面试专题
Java Kafka是一个用于构建实时数据流应用程序和数据管道的开源分布式流处理平台。它使用高吞吐量、低延迟的方式来处理大规模的数据流。Kafka可以被用于构建实时流数据管道,以及处理实时数据流应用程序的管理和处理。
举个例子,可以使用拦截器来实现按量付费的功能。可以编写一个实现了ProducerInterceptor接口的拦截器类,在发送消息的时候触发onSend方法,在这个方法中可以实现扣费的逻辑。然后通过配置,在Kafka生产者中使用这个拦截器类来实现按量付费的功能。具体的代码示例如下:
```java
public class ChargingInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("1分钱1条消息,不管那么多反正先扣钱");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("消息被服务端接收啦");
}
@Override
public void close() {
System.out.println("生产者关闭了");
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println("configure...");
}
}
```
另外,在Kafka中,消息有一个key和一个value,可以通过指定发送的主题、消息的key和消息的value来发送消息。具体的示例代码如下:
```java
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.221.131:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("group.id", "con-1");
prop.put("auto.offset.reset", "latest");
prop.put("auto.commit.intervals.ms","true");
prop.put("auto.commit.interval.ms","1000")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
ArrayList<String> topics = new ArrayList<>();
topics.add("hello");
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
for(ConsumerRecord<String,String> consumerRecord : poll){
System.out.println(consumerRecord);
}
}
```
此外,Kafka引入了Consumer Offset的概念来标记消费者组在一个分区即将消费的下一条记录。Consumer Offset的信息会保存在一个特殊的topic中,叫做__consumer_offsets。这个topic默认会创建50个分区。这样,当一个消费者组在消费一半时重启了,就可以通过Consumer Offset来继续上一次的位置读取消息。
以上就是Java Kafka的一些基本原理和使用方法的介绍。希望对你有帮助。<span class="em">1</span><span class="em">2</span><span class="em">3</span>
#### 引用[.reference_title]
- *1* *3* [JAVA知识体系之分布式篇(四)——Kafka](https://blog.csdn.net/qsmiley10/article/details/115000474)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v92^chatsearchT0_1"}}] [.reference_item style="max-width: 50%"]
- *2* [通过java方式使用Kafka](https://blog.csdn.net/libaowen609/article/details/126529414)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v92^chatsearchT0_1"}}] [.reference_item style="max-width: 50%"]
[ .reference_list ]
阅读全文