python实现多人聊天室功能多代码
时间: 2023-08-06 13:03:55 浏览: 174
以下是一个基于 Python 的多人聊天室的示例代码,包括了用户名和密码验证、私聊功能、禁言和踢出用户功能、历史记录功能、表情符号和图片功能等功能。
服务器端代码:
```python
import socket
import threading
import os
import pickle
class User:
def __init__(self, username, password):
self.username = username
self.password = password
self.socket = None
self.address = None
self.muted = False
class ChatRoom:
def __init__(self):
self.users = []
self.history = []
def add_user(self, user):
self.users.append(user)
def remove_user(self, user):
self.users.remove(user)
def broadcast(self, message, sender=None):
for user in self.users:
if user != sender and not user.muted:
user.socket.send(message)
def send_to_user(self, message, username):
for user in self.users:
if user.username == username:
user.socket.send(message)
break
def save_history(self, message):
self.history.append(message)
if len(self.history) > 100:
self.history = self.history[-100:]
def send_history(self, user):
for message in self.history:
user.socket.send(message)
def handle_client(client_socket, address):
username = None
chat_room.send_history(user)
while True:
try:
data = client_socket.recv(1024)
if not data:
break
message = data.decode().strip()
if not username:
if message.startswith('login '):
username, password = message[6:].split()
for user in chat_room.users:
if user.username == username and user.password == password:
user.socket = client_socket
user.address = address
chat_room.broadcast(f'{username} has joined the chat room.\n')
chat_room.add_user(user)
chat_room.send_history(user)
break
else:
client_socket.send(b'Invalid username or password.\n')
else:
client_socket.send(b'Please log in first.\n')
else:
if message.startswith('logout'):
client_socket.send(b'Bye.\n')
break
elif message.startswith('mute'):
if username == 'admin':
target = message[5:].strip()
for user in chat_room.users:
if user.username == target:
user.muted = True
chat_room.send_to_user(f'You have been muted by {username}.\n', target)
chat_room.broadcast(f'{target} has been muted by {username}.\n')
break
else:
client_socket.send(f'User {target} not found.\n'.encode())
else:
client_socket.send(b'Permission denied.\n')
elif message.startswith('unmute'):
if username == 'admin':
target = message[7:].strip()
for user in chat_room.users:
if user.username == target:
user.muted = False
chat_room.send_to_user(f'You have been unmuted by {username}.\n', target)
chat_room.broadcast(f'{target} has been unmuted by {username}.\n')
break
else:
client_socket.send(f'User {target} not found.\n'.encode())
else:
client_socket.send(b'Permission denied.\n')
elif message.startswith('kick'):
if username == 'admin':
target = message[5:].strip()
for user in chat_room.users:
if user.username == target:
user.socket.send(b'You have been kicked by the admin.\n')
user.socket.close()
chat_room.remove_user(user)
chat_room.broadcast(f'{target} has been kicked by the admin.\n')
break
else:
client_socket.send(f'User {target} not found.\n'.encode())
else:
client_socket.send(b'Permission denied.\n')
elif message.startswith('sendto'):
target, message = message[6:].split(' ', 1)
chat_room.send_to_user(f'(private) {username}: {message}\n', target)
else:
message = f'{username}: {message}\n'
chat_room.broadcast(message, sender=username)
chat_room.save_history(message)
except Exception as e:
print(f'Error: {e}')
break
for user in chat_room.users:
if user.socket == client_socket:
chat_room.remove_user(user)
chat_room.broadcast(f'{user.username} has left the chat room.\n')
break
client_socket.close()
chat_room = ChatRoom()
# load users from file
if os.path.exists('users.pickle'):
with open('users.pickle', 'rb') as f:
chat_room.users = pickle.load(f)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 8888))
server_socket.listen(5)
while True:
client_socket, address = server_socket.accept()
thread = threading.Thread(target=handle_client, args=(client_socket, address))
thread.start()
```
客户端代码:
```python
import socket
import threading
import getpass
import os
import time
import random
def receive():
while True:
data = client_socket.recv(1024)
if not data:
break
message = data.decode()
print(message, end='')
if message.startswith('(private) '):
sender = message.split(' ')[1][:-1]
print(f'Send a message to {sender}:')
def send():
while True:
message = input()
client_socket.send(message.encode())
def login():
while True:
username = input('Username: ')
password = getpass.getpass('Password: ')
message = f'login {username} {password}'
client_socket.send(message.encode())
response = client_socket.recv(1024)
if response == b'Invalid username or password.\n':
print('Invalid username or password.')
else:
print('Logged in successfully.')
break
def menu():
while True:
print('Menu:')
print('1. Send message')
print('2. Private message')
print('3. Mute user')
print('4. Unmute user')
print('5. Kick user')
print('6. Logout')
choice = input('Choice: ')
if choice == '1':
message = input('Message: ')
client_socket.send(message.encode())
elif choice == '2':
target = input('Target: ')
message = input('Message: ')
client_socket.send(f'sendto {target} {message}'.encode())
elif choice == '3':
target = input('Target: ')
client_socket.send(f'mute {target}'.encode())
elif choice == '4':
target = input('Target: ')
client_socket.send(f'unmute {target}'.encode())
elif choice == '5':
target = input('Target: ')
client_socket.send(f'kick {target}'.encode())
elif choice == '6':
client_socket.send(b'logout')
break
def show_history():
client_socket.send(b'history')
data = client_socket.recv(1024)
history = data.decode().split('\n')[:-1]
for message in history:
print(message)
def show_emoticons():
emoticons = ['(smile)', '(laugh)', '(wink)', '(sad)', '(angry)', '(shock)', '(love)', '(kiss)', '(heart)']
for i, emoticon in enumerate(emoticons):
print(f'{i+1}. {emoticon}', end=' ')
if (i+1) % 3 == 0:
print()
print()
while True:
choice = input('Choice: ')
if choice.isdigit() and int(choice) in range(1, len(emoticons)+1):
return emoticons[int(choice)-1]
else:
print('Invalid choice.')
def show_images():
images = ['dog.jpg', 'cat.jpg', 'flower.jpg', 'sunset.jpg', 'waterfall.jpg']
for i, image in enumerate(images):
print(f'{i+1}. {image}', end=' ')
if (i+1) % 3 == 0:
print()
print()
while True:
choice = input('Choice: ')
if choice.isdigit() and int(choice) in range(1, len(images)+1):
with open(images[int(choice)-1], 'rb') as f:
return f.read()
else:
print('Invalid choice.')
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('localhost', 8888))
login()
thread_receive = threading.Thread(target=receive)
thread_send = threading.Thread(target=send)
thread_receive.start()
thread_send.start()
while True:
menu_choice = input('Enter "menu" to show menu: ')
if menu_choice == 'menu':
menu()
elif menu_choice == 'history':
show_history()
elif menu_choice == 'emoticons':
message = show_emoticons()
client_socket.send(message.encode())
elif menu_choice == 'image':
data = show_images()
message = f'{getpass.getuser()} sent an image: {len(data)} bytes\n'
client_socket.send(message.encode())
client_socket.send(data)
elif menu_choice == 'exit':
client_socket.send(b'logout')
break
client_socket.close()
```
注意:以上代码示例仅用于演示多人聊天室的基本原理和部分功能的实现,实际应用中还需要考虑更多的安全性、可扩展性等问题。
阅读全文