Spark实战:从核心到SQL、Streaming的深度探索

1星 需积分: 9 106 下载量 110 浏览量 更新于2024-07-20 收藏 2.95MB PDF 举报
"Spark in Action 是一份专注于Spark学习和实战的资源,涵盖了SparkCore、SparkSQL、SparkStreaming等核心模块,旨在分享Spark的使用心得和项目经验。文档深入介绍了Spark平台的发展历程,从其在伯克利大学的研究背景到成为Apache顶级项目的过程,强调了Spark在数据科学领域的学术渊源和技术创新。" Spark是一种分布式计算框架,以其高效、易用和多模态处理能力而闻名。在SparkCore中,SparkContext是启动Spark应用的关键,它是连接Spark集群和用户代码的桥梁,负责资源调度和任务管理。RDD(Resilient Distributed Datasets)是Spark的基本数据抽象,它代表了一组不可变、分区的记录集合,具有容错性,可以在集群中并行处理。 Spark的combineByKey操作用于对键值对数据集进行聚合,它可以自定义组合规则,允许用户聚合分区内和跨区的数据。结合这个操作,可以实现诸如求和、平均值等统计计算。在介绍中提到的PageRank算法是网络分析中的一个重要应用,Spark可以通过RDD的转换和行动操作高效地实现PageRank的迭代计算。 SparkSQL扩展了Spark的功能,引入DataFrame,它提供了一种结构化的数据处理方式,支持SQL查询和DataFrame API。DataFrame可以与多种数据源交互,包括HDFS、Cassandra、HBase等,这些数据源在SparkSQL中被统称为DataSources。ExternalDataSources允许Spark访问外部存储系统中的数据,增强了Spark的灵活性和可扩展性。此外,SparkSQL的性能调优和Catalyst优化器也是关键话题,Catalyst是一个基于规则的查询优化框架,能显著提升查询性能。 SparkStreaming是Spark处理实时数据流的模块,它将数据流分解成微小的批处理作业,利用SparkCore的并行处理能力实现低延迟的流处理。Spark的运维部分则关注如何部署和管理Spark集群,确保系统的稳定运行。 Spark in Action提供了全面的Spark学习路径,从基础概念到高级特性,再到实战经验,对于希望深入理解和应用Spark的开发者来说是一份宝贵的资料。通过这份资源,读者不仅可以掌握Spark的核心技术,还能了解到如何在实际项目中有效地使用Spark解决大数据问题。

print("开始执行推荐算法....") #spark.sql(etl_sql).write.jdbc(mysql_url, 'task888', 'overwrite', prop) # 获取:用户ID、房源ID、评分 etl_rdd = spark.sql(etl_sql).select('user_id', 'phone_id', 'action_core').rdd rdd = etl_rdd.map(lambda x: Row(user_id=x[0], book_id=x[1], action_core=x[2])).map(lambda x: (x[2], x[1], x[0])) # 5.训练模型 model = ALS.train(rdd, 10, 10, 0.01) # 7.调用模型 products_for_users_list = model.recommendProductsForUsers(10).collect() # 8.打开文件,将推荐的结果保存到data目录下 out = open(r'data_etl/recommend_info.csv', 'w', newline='', encoding='utf-8') # 9.设置写入模式 csv_write = csv.writer(out, dialect='excel') # 10.设置用户csv文件头行 user_head = ['user_id', 'phone_id', 'score'] # 12.写入头行 csv_write.writerow(user_head) # 13.循环推荐数据 for i in products_for_users_list: for value in i[1]: rating = [value[0], value[1], value[2]] # 写入数据 csv_write.writerow(rating) print("推荐算法执行结束,开始加工和变换推荐结果....") # 14.读取推荐的结果 recommend_df = spark \ .read \ .format('com.databricks.spark.csv') \ .options(header='true', inferschema='true', ending='utf-8') \ .load("data_etl/recommend_info.csv") # 注册临时表 recommend_df.createOrReplaceTempView("recommend") # 构造 spark执行的sql recommend_sql = ''' SELECT a.user_id, a.phone_id, bid,phone_name, phone_brand, phone_price, phone_memory ,phone_screen_size,ROUND(score,1) score FROM recommend a,phone b WHERE a.phone_id=b.phone_id ''' # 执行spark sql语句,得到dataframe recommend_df = spark.sql(recommend_sql) # 将推荐的结果写入mysql recommend_df.write.jdbc(mysql_url, 'recommend', 'overwrite', prop) 解释一下这段代码

2023-06-08 上传