flink的HBase的sink java代码
时间: 2023-04-02 13:01:11 浏览: 128
可以通过以下代码实现Flink与HBase的Sink:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.flink.streaming.connectors.hadoop.FsStateBackend;
import org.apache.flink.streaming.connectors.hadoop.HadoopFileSystem;
import org.apache.flink.streaming.connectors.hadoop.HadoopOutputFormat;
import org.apache.flink.streaming.connectors.hadoop.HadoopSink;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.Properties;
public class FlinkHBaseSink {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set up the HBase configuration
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.master", "localhost:60000");
// create a connection to the HBase database
Connection connection = ConnectionFactory.createConnection(config);
// create a table instance
Table table = connection.getTable(TableName.valueOf("mytable"));
// create a stream from Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
// map the stream to a HBase Put object
DataStream<Put> hbaseStream = stream.map(new MapFunction<String, Put>() {
@Override
public Put map(String value) throws Exception {
String[] parts = value.split(",");
Put put = new Put(Bytes.toBytes(parts[]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes(parts[1]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col2"), Bytes.toBytes(parts[2]));
return put;
}
});
// write the stream to HBase
hbaseStream.addSink(new HBaseSinkFunction(table));
// execute program
env.execute("Flink HBase Sink");
}
public static class HBaseSinkFunction extends org.apache.flink.streaming.api.functions.sink.RichSinkFunction<Put> {
private Table table;
public HBaseSinkFunction(Table table) {
this.table = table;
}
@Override
public void invoke(Put value, Context context) throws Exception {
table.put(value);
}
@Override
public void close() throws IOException {
table.close();
}
}
}
```
阅读全文