给我一个spark程序,消费kafka数据写入到hbase中
时间: 2024-04-29 13:24:10 浏览: 106
spark streamming消费kafka数据存入hbase示例代码
以下是一个基本的Spark程序,用于消费Kafka数据并将其写入HBase中:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
object KafkaToHBase {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KafkaToHBase")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = Set("my-topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
val spark = SparkSession.builder().appName("KafkaToHBase").getOrCreate()
stream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val df = spark.read.json(rdd.map(_._2))
val hbaseConf = HBaseConfiguration.create()
val connection: Connection = ConnectionFactory.createConnection(hbaseConf)
val table = connection.getTable(TableName.valueOf("my-table"))
df.select("id", "name", "age").foreach { row =>
val put = new Put(Bytes.toBytes(row.getAs[String]("id")))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(row.getAs[String]("name")))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(row.getAs[Int]("age")))
table.put(put)
}
table.close()
connection.close()
}
}
ssc.start()
ssc.awaitTermination()
}
}
```
这个程序假设Kafka主题中的消息是JSON格式的,并将其读入一个Spark DataFrame中。然后,它将DataFrame的每一行写入到HBase表中。在此示例中,HBase表中有一个名为“cf”的列族,包含两个列:“name”和“age”。您需要将程序中的“my-topic”和“my-table”替换为您自己的Kafka主题和HBase表名。
阅读全文