python发送websocket
时间: 2024-08-14 22:02:56 浏览: 67
python实现websocket的客户端压力测试
在Python中,你可以使用`websockets`库来创建WebSocket服务器和客户端。WebSocket是一种在单个TCP连接上提供全双工通信的协议,适用于需要实时双向通信的应用场景,比如聊天应用、在线游戏等。
### 发送WebSocket消息(服务器端示例):
**使用 `websockets` 库创建服务器:**
```python
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
await websocket.send(message)
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("WebSocket server is running on http://localhost:8765")
await asyncio.Future() # 阻塞运行,直到收到关闭信号
if __name__ == "__main__":
asyncio.run(main())
```
在这个例子中,`echo` 函数是一个异步协程,当接收到客户端的消息时,会立即原样转发回去。
### 发送WebSocket消息(客户端示例):
**连接到服务器:**
```python
import asyncio
import websockets
async def send_message():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
greeting = "Hello, Server!"
await websocket.send(greeting)
received = await websocket.recv()
print(f"Received: {received}")
if __name__ == "__main__":
asyncio.run(send_message())
```
这里,客户端通过`websockets.connect`打开一个到服务器的连接,然后发送一条消息并等待接收响应。
### 相关问题 --
1. 如何处理WebSocket连接异常?
2. 如何在Python中解析和处理WebSocket事件?
3. 如何在Python中实现安全的WebSocket通信,例如加密?
阅读全文