帮我写一份消费kafka通过jdbc写入hive的java样例代码
时间: 2024-11-20 15:43:22 浏览: 8
Apache Flink 1.17 完整版资料包,含最新JDBC、Hive、Kafka连接器,加速数据驱动决策
当然可以。以下是一个简单的Java示例,展示了如何使用Kafka生产消息,并通过JDBC将数据插入到Hive表中。这个例子假设你已经有了Kafka消费者配置、Hive连接信息以及必要的依赖库。
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.hive.jdbc.HiveDriver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.Properties;
public class KafkaToHiveConsumer {
private static final String KAFKA_TOPIC = "your_topic";
private static final String HIVE_DB_URL = "jdbc:hive2://localhost:10000/default";
private static final String HIVE_TABLE = "your_table";
public static void main(String[] args) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "test_group");
consumerProps.put("enable.auto.commit", "true");
consumerProps.put("auto.commit.interval.ms", "1000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(KAFKA_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
// 消费Kafka消息并处理
processMessage(message);
// 使用JDBC将数据写入Hive
writeToHive(message);
}
}
}
private static void processMessage(String message) {
// 这里可以根据消息内容做进一步处理,例如解析JSON等
// ...
}
private static void writeToHive(String message) {
try {
Class.forName(HiveDriver.class.getName());
Connection connection = DriverManager.getConnection(HIVE_DB_URL);
String sql = "INSERT INTO " + HIVE_TABLE + " VALUES (?, ?)";
PreparedStatement statement = connection.prepareStatement(sql);
statement.setString(1, message); // 将Kafka消息作为值插入
statement.setString(2, "some_timestamp"); // 添加时间戳或其他额外字段
statement.executeUpdate();
statement.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
阅读全文