用Java读取kafka数据写入Oracle数据库中
时间: 2024-05-15 09:12:26 浏览: 114
首先,你需要使用Kafka的Java API来消费Kafka中的数据。以下是一个简单的示例代码,用于消费Kafka中的数据:
```java
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topicName = "my-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//在这里你可以将数据写入Oracle数据库
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
```
接下来,你需要使用Java中的JDBC API将数据写入Oracle数据库。以下是一个简单的示例代码,用于将数据插入Oracle数据库:
```java
import java.sql.*;
public class OracleDatabaseExample {
public static void main(String[] args) throws Exception {
String url = "jdbc:oracle:thin:@localhost:1521:xe";
String user = "username";
String password = "password";
Connection con = DriverManager.getConnection(url, user, password);
Statement stmt = con.createStatement();
String sql = "INSERT INTO my_table (key, value) VALUES ('key1', 'value1')";
stmt.executeUpdate(sql);
stmt.close();
con.close();
}
}
```
将这两个示例代码结合起来,你可以消费Kafka中的数据并将其写入Oracle数据库中。注意,你需要根据你的具体情况修改代码中的连接字符串、用户名、密码、表名和字段名等。
阅读全文