pyside6多线程 tcp
时间: 2024-12-29 07:25:54 浏览: 6
### PySide6 多线程 TCP 编程 示例
#### 创建多线程环境下的 TCP 客户端和服务端
为了在 PySide6 中实现多线程的 TCP 编程,可以利用 `QThread` 类来管理后台线程中的操作。下面是一个简单的例子展示如何设置一个多线程的服务端和客户端。
#### 服务端代码示例:
```python
import sys
from PySide6.QtCore import QThread, Signal, Slot
from PySide6.QtWidgets import QApplication, QMainWindow, QTextEdit, QVBoxLayout, QWidget
from PyQt6.QtNetwork import QTcpServer, QTcpSocket, QHostAddress
class TcpServer(QTcpServer):
message_received = Signal(str)
def __init__(self):
super().__init__()
self.clients = []
@Slot()
def incomingConnection(self, socketDescriptor: int):
client_socket = QTcpSocket()
client_socket.setSocketDescriptor(socketDescriptor)
self.clients.append(client_socket)
client_socket.readyRead.connect(lambda: self.read_client_data(client_socket))
client_socket.disconnected.connect(lambda: self.remove_client(client_socket))
def read_client_data(self, client_socket):
data = client_socket.readAll().data().decode()
if data:
print(f"Received {data}")
self.message_received.emit(data)
def remove_client(self, client_socket):
if client_socket in self.clients:
self.clients.remove(client_socket)
class MainWindow(QMainWindow):
send_message = Signal(str)
def __init__(self):
super().__init__()
central_widget = QWidget()
layout = QVBoxLayout()
self.text_edit = QTextEdit(readOnly=True)
layout.addWidget(self.text_edit)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
server_thread = ServerThread(parent=self)
server_thread.start()
class ServerThread(QThread):
def run(self):
tcp_server = TcpServer()
if not tcp_server.listen(QHostAddress.Any, 12345):
print("Unable to start the server!")
return
print("Listening...")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
```
此部分展示了如何构建一个基本的服务端应用程序,在单独的工作线程中启动并监听来自客户端的消息[^1]。
#### 客户端代码示例:
```python
import sys
from PySide6.QtCore import QTimer, QObject, Signal, Slot
from PySide6.QtWidgets import QApplication, QLineEdit, QPushButton, QVBoxLayout, QWidget
from PyQt6.QtNetwork import QTcpSocket, QHostAddress
class Client(QObject):
connected = Signal(bool)
received_message = Signal(str)
def __init__(self, parent=None):
super().__init__(parent=parent)
self.socket = QTcpSocket()
self.socket.connected.connect(lambda: self.connected.emit(True))
self.socket.errorOccurred.connect(lambda err: (print(err), self.connected.emit(False)))
self.socket.readyRead.connect(self._read_data)
def _read_data(self):
while self.socket.canReadLine():
line = str(self.socket.readLine(), encoding='utf8').strip('\n')
self.received_message.emit(line)
def connect_to_host(self, address="127.0.0.1", port=12345):
self.socket.connectToHost(address, port)
def disconnect_from_host(self):
self.socket.disconnectFromHost()
def write(self, msg):
self.socket.write(msg.encode('utf-8'))
self.socket.flush()
class MainWidget(QWidget):
def __init__(self):
super().__init__()
layout = QVBoxLayout()
self.line_edit = QLineEdit()
button_send = QPushButton("Send")
layout.addWidget(self.line_edit)
layout.addWidget(button_send)
self.client = Client()
button_send.clicked.connect(
lambda: (
self.client.write(self.line_edit.text()),
self.line_edit.clear(),
)
)
self.client.received_message.connect(print)
self.client.connect_to_host()
self.setLayout(layout)
if __name__ == '__main__':
app = QApplication([])
widget = MainWidget()
widget.setWindowTitle("TCP Client")
widget.resize(300, 100)
widget.show()
timer = QTimer.singleShot(0, lambda: None) # Ensure event loop starts immediately.
exit_code = app.exec()
sys.exit(exit_code)
```
这段程序实现了客户端的功能,它可以在主线程之外发送消息给服务器,并接收响应数据。
阅读全文