读取student表的数据内容,将其转为JSON格式,发送给Kafka;再从Kafka中获取到JSON格式数据,打印出来;实现过程
时间: 2024-02-24 11:55:42 浏览: 221
好的,下面我会给出一个简单的Java示例程序,演示如何实现读取student表数据、将其转为JSON格式、发送到Kafka、从Kafka中获取JSON格式数据并打印出来的过程。
首先,需要引入以下依赖:
```xml
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
```
接下来是代码实现:
```java
import java.sql.*;
import java.util.Properties;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
public class KafkaProducerExample {
private final static String TOPIC = "my-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private static Connection conn = null;
private static Statement stmt = null;
private static ResultSet rs = null;
public static void main(String[] args) throws Exception {
// 读取student表中的数据
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password");
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT * FROM student");
// 将结果集转为JSON格式
ObjectMapper objectMapper = new ObjectMapper();
while (rs.next()) {
Student student = new Student(rs.getInt("id"), rs.getString("name"), rs.getInt("age"));
String json = objectMapper.writeValueAsString(student);
// 发送JSON格式数据到Kafka
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, json);
producer.send(record);
System.out.println("Sent message successfully: " + json);
Thread.sleep(1000);
}
producer.close();
// 从Kafka中获取JSON格式数据并打印出来
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", BOOTSTRAP_SERVERS);
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
private static class Student {
private int id;
private String name;
private int age;
public Student(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
}
```
上面的代码中,我们使用了Jackson库来将ResultSet对象转为JSON格式字符串。然后,使用KafkaProducer类将JSON格式数据发送到Kafka中。接下来,使用KafkaConsumer类从Kafka中读取JSON格式数据,并将其打印出来。
需要注意的是,本示例程序中使用了MySQL作为数据库,需要先在本地安装MySQL,并创建名为"test"的数据库,并在其中创建名为"student"的表。同时,需要在本地启动Kafka服务,创建名为"my-topic"的主题。
希望这个示例程序可以帮助到你,如果有任何问题,欢迎随时向我提问。
阅读全文