2.编写一个python服务器程序。该程序在9999端口上监听,当建立连接后,每隔10s读取iris.txt文件,将三行记录发送到连接上;写一个spark streaming程序,该程序利用套接字,主动向前一个服务器程序建立连接;当连接建立后,从连接上读取记录,并显示在控制台上,然后将记录写入到mysql数据库中。
时间: 2023-06-17 10:08:52 浏览: 163
以下是 Python 服务器程序的代码:
```python
import socket
import time
# 创建套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定端口
server_socket.bind(('localhost', 9999))
# 监听连接
server_socket.listen(1)
while True:
# 接受连接
client_socket, addr = server_socket.accept()
print(f"Connection from {addr} has been established.")
while True:
# 读取 iris.txt 文件
with open('iris.txt', 'r') as f:
lines = f.readlines()[:3]
# 将三行记录发送到连接上
for line in lines:
client_socket.send(line.encode())
# 等待 10 秒
time.sleep(10)
# 关闭连接
client_socket.close()
server_socket.close()
```
以下是 Spark Streaming 程序的代码:
```python
from pyspark.sql import SparkSession
import mysql.connector
import socket
# 创建 SparkSession
spark = SparkSession.builder.appName("SocketStream").getOrCreate()
# 创建 StreamingContext
ssc = StreamingContext(spark.sparkContext, 1)
# 连接服务器
socket_stream = ssc.socketTextStream("localhost", 9999)
# 显示记录
socket_stream.pprint()
# 将记录写入 MySQL 数据库
def write_to_mysql(rdd):
if not rdd.isEmpty():
# 连接数据库
cnx = mysql.connector.connect(user='user', password='password',
host='localhost', database='database')
cursor = cnx.cursor()
# 插入数据
for row in rdd.collect():
query = f"INSERT INTO iris (sepal_length, sepal_width, petal_length, petal_width, species) VALUES ({row})"
cursor.execute(query)
# 提交更改
cnx.commit()
# 关闭连接
cursor.close()
cnx.close()
# 处理记录
socket_stream.foreachRDD(write_to_mysql)
# 启动 StreamingContext
ssc.start()
ssc.awaitTermination()
```
阅读全文