代码:# 定义parse_news_file函数 def parse_news_file(file_path): # 读取文本文件内容 #text_file = open(file_path, 'r', encoding='utf-8') text_rdd = sc.textFile(file_path) text = ''.join(text_rdd.collect()) # 分解文件路径 parts = file_path.split('/') # 获取类别和文件名 category = parts[-2] filename = parts[-1] print(filename) # 对文本内容进行分词和过滤停用词 seg_list = jieba.cut(text) filtered_list = [word for word in seg_list if word not in stopwords] # 计算tf-idf特征 hashingTF = HashingTF() tf = hashingTF.transform(filtered_list) idf = IDF() idfModel = idf.fit(tf) tfidf = idfModel.transform(tf) # 返回LabeledPoint对象 return LabeledPoint(category, tfidf) # 获取或创建全局的SparkContext sc = SparkContext.getOrCreate() # 读取数据集,调用parse_news_file函数处理每个文件,使用LabeledPoint定义文本的类别和向量 data = sc.wholeTextFiles('hdfs://spark01:9000/project/data//').map(lambda x: parse_news_file(x[0])) print("hello",data.count())报错Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.给出具体的改正措施
时间: 2024-01-08 15:02:37 浏览: 98
这个错误提示是因为在转换过程中尝试使用SparkContext,而SparkContext只能在驱动程序中使用,不能在工作节点上使用。为了解决这个问题,您可以将SparkContext作为参数传递给parse_news_file函数,并在函数内部使用传递的SparkContext。具体实现如下:
```
# 定义parse_news_file函数
def parse_news_file(file_path, sc):
# 读取文本文件内容
text_rdd = sc.textFile(file_path)
text = ''.join(text_rdd.collect())
# 分解文件路径
parts = file_path.split('/')
# 获取类别和文件名
category = parts[-2]
filename = parts[-1]
print(filename)
# 对文本内容进行分词和过滤停用词
seg_list = jieba.cut(text)
filtered_list = [word for word in seg_list if word not in stopwords]
# 计算tf-idf特征
hashingTF = HashingTF()
tf = hashingTF.transform(filtered_list)
idf = IDF()
idfModel = idf.fit(tf)
tfidf = idfModel.transform(tf)
# 返回LabeledPoint对象
return LabeledPoint(category, tfidf)
# 获取或创建全局的SparkContext
sc = SparkContext.getOrCreate()
# 读取数据集,调用parse_news_file函数处理每个文件,使用LabeledPoint定义文本的类别和向量
data = sc.wholeTextFiles('hdfs://spark01:9000/project/data//').map(lambda x: parse_news_file(x[0], sc))
print("hello",data.count())
```
在这个版本的代码中,我们将SparkContext作为参数传递给parse_news_file函数,并将其用于textFile方法中。这样就可以避免在转换过程中使用SparkContext而导致的错误。
阅读全文