请帮我写一个flink将kafka数据存入hbase2.2.3,语言使用scala2.12,flink版本为1.14.0
时间: 2023-05-25 08:04:03 浏览: 269
SparkStreaming_HBase:将从Kafka收集过来的数据保存到HBase中
代码如下:
```
import java.nio.charset.StandardCharsets
import org.apache.flink.streaming.connectors.hbase._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.ConnectionFactory
case class KafkaData(key: String, value: String)
object FlinkKafkaHbase {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaProperties = new java.util.Properties()
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092")
kafkaProperties.setProperty("group.id", "flink_group")
val kafkaConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaProperties)
val kafkaStream: DataStream[String] = env.addSource(kafkaConsumer)
// Parse kafka data into case class
val kafkaData: DataStream[KafkaData] = kafkaStream.map(s => {
val values = s.split(",")
KafkaData(values(0), values(1))
})
// Write kafka data into HBase
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "localhost")
val tableName = "kafka_table"
val hbaseSink = new HBaseSinkFunction[KafkaData](tableName, (kafkaData: KafkaData) => {
val put = new Put(Bytes.toBytes(kafkaData.key))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("value"), kafkaData.value.getBytes(StandardCharsets.UTF_8))
put
}, conf)
kafkaData.addSink(hbaseSink)
env.execute("Flink Kafka HBase Demo")
}
}
```
注意:在运行此代码之前,您需要先部署并配置好Kafka和HBase,特别是在HBase中创建一个名为“kafka_table”的表,且包含一个名为“cf”的列族,以及定义一个名为“value”的列。
阅读全文