java读取kafka数据到mysql 详细代码
时间: 2023-08-11 16:04:16 浏览: 69
以下是一个简单的Java程序,用于从Kafka读取数据并将其写入MySQL数据库。您需要将程序中的配置参数替换为您自己的实际值,并根据需要进行修改。
```java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaToMysql {
public static void main(String[] args) throws SQLException {
// Kafka consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// MySQL connection configuration
String url = "jdbc:mysql://localhost:3306/test";
String user = "root";
String password = "root";
Connection conn = DriverManager.getConnection(url, user, password);
String insertSql = "INSERT INTO test_table (id, data) VALUES (?, ?)";
PreparedStatement ps = conn.prepareStatement(insertSql);
// Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// Insert Kafka record into MySQL
ps.setString(1, record.key());
ps.setString(2, record.value());
ps.executeUpdate();
}
// Commit offsets to Kafka
consumer.commitSync();
}
}
}
```
该程序使用Kafka Consumer API从`test-topic`主题中读取数据,并使用JDBC API将数据插入名为`test_table`的MySQL表中。程序还为每个Kafka分区使用`commitSync()`方法手动提交偏移量。
请注意,这只是一个简单的示例程序,并且您需要根据自己的实际需求进行修改。例如,您可能需要处理异常情况或优化插入性能。