利用部署好的HDFS、Spark及HBase集群,将课程资料中“实训”->“交通大数据”目录中的“卡口表.csv”先上传至HDFS文件系统中。使用Scala、Python或者Java任一一种语言编写程序,将csv文件中的数据写入到HBase的卡口表中。
时间: 2023-05-28 13:01:09 浏览: 150
Scala版本的代码如下:
```scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
object CSVtoHBase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSVtoHBase")
.master("local[*]")
.getOrCreate()
// Define HBase configuration
val conf: Configuration = HBaseConfiguration.create()
conf.addResource(new Path("/usr/local/hadoop-2.8.5/etc/hadoop/core-site.xml"))
conf.addResource(new Path("/usr/local/hbase-2.2.4/conf/hbase-site.xml"))
val connection = ConnectionFactory.createConnection(conf)
val table: Table = connection.getTable(TableName.valueOf("traffic_data:station_vehicle"))
// Read CSV file from HDFS
val csvPath = "hdfs://localhost:9000/user/hadoop/实训/交通大数据/卡口表.csv"
val csvDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(csvPath)
// Write data to HBase
val putList = csvDF.rdd.map(row => {
val put = new Put(Bytes.toBytes(row.getAs[String]("id")))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("station_id"), Bytes.toBytes(row.getAs[String]("站点编号")))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("direction"), Bytes.toBytes(row.getAs[String]("方向")))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("vehicle_type"), Bytes.toBytes(row.getAs[String]("车型")))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("vehicle_count"), Bytes.toBytes(row.getAs[String]("车辆数")))
put
}).collect().toList
table.put(putList)
// Close connection
table.close()
connection.close()
}
}
```
说明:
1. 首先需要创建一个SparkSession对象。
2. 然后定义HBase的配置信息。
3. 读取HDFS中的CSV文件为DataFrame。
4. 将DataFrame的每一行数据转换成Put对象,并收集到列表中。
5. 将Put列表批量写入HBase中。
6. 关闭表和连接。