# 在Spark-shell中 读取上传到HDFS上的数据 val input = sc.textFile("/Tipdm/Spark/Data/online_retail.txt") # 去除csv数据文件的第一行 val cutinput = input.mapPartitionsWithIndex((ix, it) => { if (ix == 0) it.drop(1) it }) cutinput.collect val re = cutinput.map(line => {val data = line.split(",");(data(0), data(1).toDouble)}).reduceByKey(_+_) val re_sort = re.filter(x=> !(x._1=="")).sortBy(x=>x._2,false) re_sort.take(50)
时间: 2024-03-31 09:37:16 浏览: 101
这是一个Spark-shell的代码段,用于读取上传到HDFS上的数据,并对数据进行处理和分析。具体来说,代码中首先使用sc.textFile()函数读取HDFS上的数据文件,然后使用mapPartitionsWithIndex()函数去除csv数据文件的第一行。接下来,使用map()函数将数据文件中的每一行转换成键值对的形式,其中键为数据文件中的第一个字段,值为数据文件中的第二个字段转换成double类型的结果。然后,使用reduceByKey()函数对数据进行聚合操作,将相同键的值相加。最后,使用filter()函数去除键为空的数据,使用sortBy()函数将数据按照值从大到小排序,最后使用take()函数取出前50个结果。
相关问题
Spark - shell中 读取上传到HDFS上的数据
Spark Shell 是Apache Spark的一个交互式命令行环境,它允许用户直接运行Spark作业并查看结果。如果你想从Hadoop分布式文件系统 (HDFS) 上读取数据,Spark Shell 提供了简单易用的操作。
首先,你需要通过 `spark.read` API 加载 HDFS 文件。如果你的数据是以文本格式存在,你可以使用 `textFile()` 或 `csv()` 函数;如果数据存储为其他二进制格式,如 Parquet、ORC 等,可以使用相应的加载函数。
例如,如果你的数据是一个CSV文件:
```scala
val data = spark.read.text("hdfs://<your-hdfs-uri>/<your-file>")
```
这里的 `<your-hdfs-uri>` 是HDFS地址,`<your-file>` 是你要读取的具体文件名。如果你想要读取Parquet文件,可以这样做:
```scala
val parquetData = spark.read.parquet("hdfs://<your-hdfs-uri>/<your-parquet-file>")
```
读取完成后,你可以对数据进行各种操作,比如转换、过滤、聚合等,然后保存回HDFS或者进一步分析。
代码:# 定义parse_news_file函数 def parse_news_file(file_path): # 读取文本文件内容 #text_file = open(file_path, 'r', encoding='utf-8') text_rdd = sc.textFile(file_path) text = ''.join(text_rdd.collect()) # 分解文件路径 parts = file_path.split('/') # 获取类别和文件名 category = parts[-2] filename = parts[-1] print(filename) # 对文本内容进行分词和过滤停用词 seg_list = jieba.cut(text) filtered_list = [word for word in seg_list if word not in stopwords] # 计算tf-idf特征 hashingTF = HashingTF() tf = hashingTF.transform(filtered_list) idf = IDF() idfModel = idf.fit(tf) tfidf = idfModel.transform(tf) # 返回LabeledPoint对象 return LabeledPoint(category, tfidf) # 获取或创建全局的SparkContext sc = SparkContext.getOrCreate() # 读取数据集,调用parse_news_file函数处理每个文件,使用LabeledPoint定义文本的类别和向量 data = sc.wholeTextFiles('hdfs://spark01:9000/project/data/*/*').map(lambda x: parse_news_file(x[0])) print("hello",data.count())报错Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
这个错误提示说明在你的代码中使用了SparkContext,而SparkContext只能在Driver程序中使用,不能在Worker节点中使用。你可以尝试将SparkContext的创建和数据读取操作放在Driver程序中,然后使用Spark RDD的转换操作在Worker节点中进行处理。具体来说,你可以将data = sc.wholeTextFiles('hdfs://spark01:9000/project/data/*/*').map(lambda x: parse_news_file(x[0]))这一行代码放到Driver程序中。
阅读全文