nanka消费者手动提交 生成一个data jso文件,内容如下: ("name" *nony,"ge":21", hobbie" : (basetal" "nnia'"), ("ame":"Lisa","age":"20", hobbils”: ["ing,"dance"1) 根据上面给出的datajson文件,执行如下操作。 (1 )编写生产者程序,将JSON文件数据发送给Kafka。 (2)编写消费者程序,读取Kafka的JSON格式数据,并手动提交偏移量。 。Vnflon冶曲丰江阎公厅
时间: 2023-07-05 17:26:54 浏览: 165
很抱歉,我之前的回答有些错误。以下是正确的回答:
根据你提供的数据JSON文件,我们可以编写如下的Java代码来实现生产者和消费者:
生产者代码:
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
public class ProducerExample {
public static void main(String[] args) throws Exception {
// 读取JSON文件
BufferedReader reader = new BufferedReader(new FileReader("data.json"));
StringBuilder builder = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
builder.append(line);
}
reader.close();
String jsonStr = builder.toString();
// 将JSON字符串转换为JSONObject对象
JSONObject jsonObject = new JSONObject(jsonStr);
// 设置Kafka生产者的配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 将JSON对象发送到Kafka
ProducerRecord<String, String> record = new ProducerRecord<>("test", jsonObject.toString());
producer.send(record);
producer.close();
}
}
```
消费者代码:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
// 设置Kafka消费者的配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅"test"主题
consumer.subscribe(Collections.singletonList("test"));
// 消费数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 将JSON字符串转换为JSONObject对象
JSONObject jsonObject = new JSONObject(record.value());
// 处理消息
System.out.println(jsonObject);
// 手动提交偏移量
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
}
}
}
}
```
这里假设Kafka集群运行在本地,主题名为"test",消费者组ID为"test-group"。生产者将读取"data.json"文件中的数据,将其转换为JSON字符串后发送到Kafka。消费者将订阅"test"主题,从中读取JSON格式数据,并手动提交偏移量。
希望以上代码对你有所帮助!
阅读全文