开发一个flink消费kafka并存储到hbase的代码
时间: 2024-05-14 08:14:42 浏览: 178
spark streamming消费kafka数据存入hbase示例代码
以下是一个使用Apache Flink消费Kafka并将数据存储到HBase的示例代码:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Properties;
public class KafkaToHBase {
public static void main(String[] args) throws Exception {
// 获取命令行参数
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 设置运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置Kafka消费者属性
Properties props = new Properties();
props.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
props.setProperty("group.id", parameterTool.get("group.id"));
// 创建Kafka消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
parameterTool.get("topic"),
new SimpleStringSchema(),
props);
// 从最新的数据开始消费
kafkaConsumer.setStartFromLatest();
// 从Kafka中读取数据
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
// 将数据写入HBase
kafkaStream.map(new MapFunction<String, Put>() {
@Override
public Put map(String value) throws Exception {
// 解析数据
String[] fields = value.split(",");
// 创建Put对象
Put put = new Put(Bytes.toBytes(fields[0])); // rowkey
// 添加列族和列
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(fields[0]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(fields[1]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(fields[2]));
return put;
}
}).addSink(new HBaseSink(parameterTool.get("hbase.table")));
// 执行任务
env.execute("KafkaToHBase");
}
/**
* HBase Sink
*/
public static class HBaseSink extends org.apache.flink.streaming.api.functions.sink.RichSinkFunction<Put> {
private final String tableName;
private transient Connection connection;
private transient Table table;
public HBaseSink(String tableName) {
this.tableName = tableName;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// 创建HBase连接
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost"); // HBase的ZooKeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181"); // ZooKeeper端口
config.set("hbase.client.write.buffer", "1000"); // 批量写入缓存大小
connection = ConnectionFactory.createConnection(config);
// 获取表对象
table = connection.getTable(TableName.valueOf(tableName));
}
@Override
public void invoke(Put put, Context context) throws Exception {
// 将数据写入HBase
table.put(put);
}
@Override
public void close() throws Exception {
// 关闭连接和表对象
if (table != null) {
table.close();
}
if (connection != null) {
connection.close();
}
}
}
}
```
在这个示例中,我们从Kafka中读取数据,然后使用`MapFunction`将数据转换为HBase的`Put`对象,并使用`HBaseSink`将数据写入HBase。在`HBaseSink`中,我们使用HBase的Java API创建连接和表对象,并在`invoke`方法中将数据写入HBase。最后,在`close`方法中关闭连接和表对象。
阅读全文