编写一个python服务器程序。该程序在9999端口上监听,当建立连接后,每隔10s读取iris.txt文件,将三行记录发送到连接上;写一个spark streaming程序,该程序利用套接字,主动向前一个服务器程序建立连接;当连接建立后,从连接上读取记录,并显示在控制台上,然后将记录写入到mysql数据库中。
时间: 2023-06-19 07:08:42 浏览: 102
下面是一个简单的Python服务器程序,用于监听9999端口,并每隔10秒读取iris.txt文件并将其中三行记录发送到连接上:
```python
import socket
import time
HOST = 'localhost'
PORT = 9999
def send_records(conn):
with open('iris.txt', 'r') as f:
for i in range(3):
line = f.readline()
conn.sendall(line.encode())
f.close()
def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
print('Listening on port', PORT)
while True:
conn, addr = s.accept()
print('Connected by', addr)
while True:
send_records(conn)
time.sleep(10)
if __name__ == '__main__':
main()
```
下面是一个Spark Streaming程序,用于连接上述Python服务器程序,并将记录显示在控制台上并写入MySQL数据库:
```python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import mysql.connector
import socket
HOST = 'localhost'
PORT = 9999
def insert_into_mysql(records):
cnx = mysql.connector.connect(user='root', password='password', host='localhost', database='test')
cursor = cnx.cursor()
for r in records:
query = "INSERT INTO iris (sepal_length, sepal_width, petal_length, petal_width, species) VALUES (%s, %s, %s, %s, %s)"
data = (float(r[0]), float(r[1]), float(r[2]), float(r[3]), r[4])
cursor.execute(query, data)
cnx.commit()
cursor.close()
cnx.close()
def process_records(records):
for r in records:
print(r)
insert_into_mysql(records)
def main():
sc = SparkContext(appName='IrisStreaming')
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream(HOST, PORT)
records = lines.map(lambda x: x.split(','))
records.foreachRDD(lambda rdd: rdd.foreachPartition(lambda records: process_records(records)))
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
main()
```
请注意,这只是一个简单的示例程序,应根据具体要求进行更改和优化。
阅读全文