请写一个flink消费kafka写入hive的demo
时间: 2024-05-03 15:22:15 浏览: 229
以下是一个基于Flink消费Kafka并将数据写入Hive的示例代码:
```java
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
public class FlinkKafkaToHiveDemo {
public static void main(String[] args) throws Exception {
// 获取命令行参数
final ParameterTool params = ParameterTool.fromArgs(args);
// 设置检查点配置
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
env.getConfig().setGlobalJobParameters(params);
// 设置Kafka配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", params.get("bootstrap.servers", "localhost:9092"));
kafkaProps.setProperty("group.id", params.get("group.id", "my-flink-consumer-group"));
// 创建FlinkKafkaConsumer
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
params.get("topic", "my-kafka-topic"),
new SimpleStringSchema(),
kafkaProps);
// 将Kafka数据转换为Flink Table
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Table kafkaTable = tableEnv.fromDataStream(env.addSource(kafkaConsumer), "value");
// 将Flink Table写入Hive
kafkaTable.executeInsert("my_hive_table");
// 执行Flink作业
env.execute("Flink Kafka to Hive Demo");
}
// 定义Hive Sink
public static class HiveSink extends RichSinkFunction<Row> {
private Connection conn;
private PreparedStatement stmt;
@Override
public void open(Configuration parameters) throws Exception {
// 获取Hive连接
Class.forName("org.apache.hive.jdbc.HiveDriver");
conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/default", "hive", "");
stmt = conn.prepareStatement("INSERT INTO my_hive_table VALUES(?)");
}
@Override
public void invoke(Row row, SinkFunction.Context context) throws Exception {
// 写入Hive
stmt.setString(1, row.getField(0).toString());
stmt.executeUpdate();
}
@Override
public void close() throws SQLException {
// 关闭连接
if (conn != null) {
conn.close();
}
if (stmt != null) {
stmt.close();
}
}
}
}
```
上面的示例代码使用FlinkKafkaConsumer将Kafka数据转换为Flink Table,然后使用executeInsert将Flink Table写入Hive。此外,我们还定义了一个HiveSink来将数据写入Hive。请注意,为了使该作业正常运行,您需要在本地启动一个Hive服务,并创建一个名为“my_hive_table”的表。
阅读全文