开发一个flink消费kafka并将数据存储到hbase代码
时间: 2024-05-12 19:19:40 浏览: 116
以下是一个示例代码,演示如何使用Flink从Kafka消费数据,并将其存储到HBase中。
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Properties;
public class KafkaToHBase {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka消费者属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromEarliest();
// 读取Kafka数据流
DataStream<String> kafkaStream = env.addSource(consumer);
// 将数据写入HBase
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort", "2181");
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("my-table"));
kafkaStream.map(str -> {
String[] parts = str.split(",");
Put put = new Put(Bytes.toBytes(parts[0]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes(parts[1]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col2"), Bytes.toBytes(parts[2]));
return put;
}).addSink(new HBaseSink(table));
// 执行任务
env.execute("Kafka to HBase");
}
private static class HBaseSink extends RichSinkFunction<Put> {
private final Table table;
public HBaseSink(Table table) {
this.table = table;
}
@Override
public void invoke(Put put, Context context) throws Exception {
table.put(put);
}
@Override
public void close() throws Exception {
super.close();
table.close();
}
}
}
```
注意:上述代码中的HBase连接配置和表名需要根据实际情况进行修改。
阅读全文