实验内容:统计timemachine.txt文件里的所有单词出现的次数,将统计结果写入到mysql中的wordCount表中。wordCount表结构为id word count,其中id为主键,word为单词,count为出现次数。 具体内容: 在mysql中新建spark数据库,在spark数据库中创建表wordCount 在wordCount表中插入一条标记数据(如下图) 编写代码,并运行编写的代码,将统计数据写入数据库 在mysql命令行查询wordCount表中的数据数量 在mysql命令行中验证是否有非法单词(select * from wordCount where word like '%\'%') 使用pyspark,连接数据库,完成下列操作 timemachine.txt中一共有多少个单词(参考课本106页) timemachine.txt中一共出现过多少个单词 timemachine.txt中出现次数最多的10个单词 timemachine.txt中出现次数超过200次的单词 timemachine.txt中平均每个单词出现的次数
时间: 2024-03-23 08:37:50 浏览: 95
好的,以下是参考代码实现:
```python
from pyspark.sql import SparkSession
import re
# 创建SparkSession对象
spark = SparkSession.builder.appName("Word Count").getOrCreate()
# 读取文件并转化为RDD对象
lines = spark.sparkContext.textFile("timemachine.txt")
# 定义函数,用于清洗单词
def clean_word(word):
return re.sub(r'[\W_]+', '', word).lower()
# 对读入的数据进行清洗和处理,得到单词列表
words = lines.flatMap(lambda line: line.split(" ")).map(clean_word)
# 统计每个单词出现的次数,并将结果存储在一个字典中
word_counts = words.countByValue()
# 将字典中的键值对逐一插入到MySQL数据库的wordCount表中,其中id为自增主键
def save_to_mysql(key_value):
import pymysql
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
sql = "INSERT INTO wordCount (word, count) VALUES (%s, %s)"
cursor.execute(sql, (key_value[0], key_value[1]))
conn.commit()
cursor.close()
conn.close()
for key_value in word_counts.items():
save_to_mysql(key_value)
# 查询wordCount表中的数据数量
import pymysql
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM wordCount")
count = cursor.fetchone()[0]
print("wordCount表中的数据数量为:", count)
cursor.close()
conn.close()
# 查询wordCount表中有多少个单词
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
cursor.execute("SELECT COUNT(DISTINCT word) FROM wordCount")
count = cursor.fetchone()[0]
print("timemachine.txt中一共有", count, "个单词")
cursor.close()
conn.close()
# 查询wordCount表中一共出现过多少个单词
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
cursor.execute("SELECT SUM(count) FROM wordCount")
count = cursor.fetchone()[0]
print("timemachine.txt中一共出现过", count, "个单词")
cursor.close()
conn.close()
# 查询wordCount表中出现次数最多的10个单词
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
cursor.execute("SELECT word, count FROM wordCount ORDER BY count DESC LIMIT 10")
results = cursor.fetchall()
print("出现次数最多的10个单词为:")
for result in results:
print(result[0], result[1])
cursor.close()
conn.close()
# 查询wordCount表中出现次数超过200次的单词
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
cursor.execute("SELECT word, count FROM wordCount WHERE count > 200")
results = cursor.fetchall()
print("出现次数超过200次的单词为:")
for result in results:
print(result[0], result[1])
cursor.close()
conn.close()
# 查询timemachine.txt中平均每个单词出现的次数
conn = pymysql.connect(host='localhost', user='root', password='123456', database='spark')
cursor = conn.cursor()
cursor.execute("SELECT AVG(count) FROM wordCount")
count = cursor.fetchone()[0]
print("timemachine.txt中平均每个单词出现的次数为:", count)
cursor.close()
conn.close()
```
需要注意的是,需要提前在MySQL中创建好spark数据库和wordCount表,并插入一条标记数据,以便程序能够正确地执行。
阅读全文