利用flink从kafka消费json数据并写入habse怎么操作?
时间: 2024-05-06 22:20:36 浏览: 175
1. 首先需要创建一个Flink作业,导入flink-kafka-connector和flink-hbase依赖。
2. 创建一个Kafka数据源,用于消费JSON数据:
```java
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), kafkaProps);
```
3. 解析JSON数据,将其转换为HBase的Put对象:
```java
DataStream<Put> hbasePuts = env.addSource(kafkaConsumer)
.flatMap(new FlatMapFunction<String, Put>() {
@Override
public void flatMap(String value, Collector<Put> collector) throws Exception {
JSONObject jsonObject = new JSONObject(value);
String rowKey = jsonObject.getString("rowKey");
String columnFamily = jsonObject.getString("columnFamily");
String column = jsonObject.getString("column");
String value = jsonObject.getString("value");
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
collector.collect(put);
}
});
```
4. 将Put对象写入HBase:
```java
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", "localhost");
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
hbasePuts.addSink(new HBaseSink<>(hbaseConfig, TableName.valueOf("my-table"), new HBasePutMapper()));
```
完整代码示例:
```java
public class KafkaToHBaseJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), kafkaProps);
DataStream<Put> hbasePuts = env.addSource(kafkaConsumer)
.flatMap(new FlatMapFunction<String, Put>() {
@Override
public void flatMap(String value, Collector<Put> collector) throws Exception {
JSONObject jsonObject = new JSONObject(value);
String rowKey = jsonObject.getString("rowKey");
String columnFamily = jsonObject.getString("columnFamily");
String column = jsonObject.getString("column");
String value = jsonObject.getString("value");
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
collector.collect(put);
}
});
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", "localhost");
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
hbasePuts.addSink(new HBaseSink<>(hbaseConfig, TableName.valueOf("my-table"), new HBasePutMapper()));
env.execute("Kafka to HBase Job");
}
}
```
阅读全文
相关推荐
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![pdf](https://img-home.csdnimg.cn/images/20241231044930.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![-](https://img-home.csdnimg.cn/images/20241231044736.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)