编程要求 打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,执行程序,读取套接字流数据,按空格进行分词,完成词频统计。补充代码,将词频统计的输出内容存储到 /data/workspace/myshixun/project/step1/result 文件中。 代码文件目录: /data/workspace/myshixun/project/step1/work.py 套接字流相关信息: 地址:localhost 端口:8888 输入数据: 程序启动后(5s),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 ssc.awaitTermination(timeout=60) 的 timeout 指定时间。 It is believed that the computer is bringing the world into a brand new era. At the time the computer was invented, scientists, marveling at its calculating speed, felt that they had created a miracle. Nowadays, the function of the computer is no longer confined to calculation; It permeates peoples daily lives and has become an inseparable part of human society. 输入内容后,注意按回车。 检查点存放本地目录:/root/mylog/ 请在程序运行完成后再点击评测,否则会影响评测结果。 小贴士: pprint() 方法中可以设置数据输出显示的数量。
时间: 2024-02-21 14:59:27 浏览: 212
C#套接字编程共13页.pdf.zip
以下是代码实现:
```python
import socket
import time
import os
from collections import Counter
from pprint import pprint
# 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定地址和端口
sock.bind(('localhost', 8888))
# 监听
sock.listen(1)
# 接受连接
print('等待连接...')
conn, addr = sock.accept()
print('已连接:', addr)
# 接收数据
data = ''
while True:
buf = conn.recv(1024).decode()
if not buf:
break
data += buf
# 分词统计
words = data.split()
word_count = Counter(words)
# 输出结果
pprint(word_count.most_common(10))
# 存储结果
result_dir = '/data/workspace/myshixun/project/step1/result'
if not os.path.exists(result_dir):
os.makedirs(result_dir)
with open(os.path.join(result_dir, 'result.txt'), 'w') as f:
for word, count in word_count.items():
f.write(f'{word}: {count}\n')
# 关闭连接
conn.close()
# 停止Spark StreamingContext
ssc.stop()
```
请注意,以上代码是在 Spark StreamingContext 环境中执行的,需要将其放在一个 `if __name__ == '__main__':` 语句中,并且创建一个 StreamingContext 对象。同时,需要在代码中加入以下内容以启动 Spark:
```python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建SparkContext
sc = SparkContext(appName='WordCount')
# 创建StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
# ... 以上代码 ...
# 启动Spark StreamingContext
ssc.start()
ssc.awaitTermination(timeout=60)
```
另外,这段代码需要在 Spark 环境中执行,而不是在普通的 Python 环境中执行。你需要在服务器上安装 Spark 并以集群模式启动,然后在 Spark 集群中提交这段代码。
阅读全文