请用python实现一个socks5的代理服务器,支持用户认证和域名白名单。
时间: 2024-03-25 20:37:10 浏览: 277
好的,基于 Python 的 PySocks 库,可以实现一个简单的 SOCKS5 代理服务器,并支持用户认证和域名白名单。
以下是代码实现:
```python
import socket
import select
import struct
import sys
import argparse
import socks # 需要安装 PySocks 库,可以通过 pip 安装
# 定义常量
SOCKS_VERSION = 5
USER_AUTHENTICATION = 2
NO_ACCEPTABLE_METHODS = 0xFF
SUCCESS = 0
# 定义命令行参数
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="127.0.0.1", help="代理服务器 IP 地址,默认为 127.0.0.1")
parser.add_argument("--port", type=int, default=1080, help="代理服务器端口号,默认为 1080")
parser.add_argument("--username", type=str, default=None, help="代理服务器用户名,默认为 None")
parser.add_argument("--password", type=str, default=None, help="代理服务器密码,默认为 None")
parser.add_argument("--whitelist", type=str, default=None, help="代理服务器域名白名单,多个域名用逗号分隔,默认为 None")
args = parser.parse_args()
# 解析域名白名单
whitelist = []
if args.whitelist is not None:
whitelist = args.whitelist.split(",")
# 创建代理服务器 Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((args.ip, args.port))
server_socket.listen(5)
print(f"代理服务器已启动,监听地址为 {args.ip}:{args.port},用户名为 {args.username},密码为 {args.password},域名白名单为 {whitelist}")
inputs = [server_socket]
# 处理 SOCKS5 协议的请求
def handle_socks5_request(client_socket):
# 接收客户端发来的协议版本和支持的身份验证方式
version, nmethods = struct.unpack("!BB", client_socket.recv(2))
# 校验协议版本是否为 SOCKS5
if version != SOCKS_VERSION:
print("无效的协议版本")
client_socket.close()
return
# 接收客户端发来的身份验证方式
methods = client_socket.recv(nmethods)
# 判断客户端是否支持用户名密码验证方式
if USER_AUTHENTICATION not in methods:
print("不支持用户名密码身份验证方式")
client_socket.sendall(struct.pack("!BB", SOCKS_VERSION, NO_ACCEPTABLE_METHODS))
client_socket.close()
return
# 向客户端发送支持的身份验证方式
client_socket.sendall(struct.pack("!BB", SOCKS_VERSION, USER_AUTHENTICATION))
# 接收客户端发来的用户名和密码
version, username_length = struct.unpack("!BB", client_socket.recv(2))
username = client_socket.recv(username_length).decode("utf-8")
password_length = struct.unpack("!B", client_socket.recv(1))[0]
password = client_socket.recv(password_length).decode("utf-8")
# 校验用户名和密码是否正确
if args.username is not None and args.password is not None:
if username != args.username or password != args.password:
print("用户名或密码错误")
client_socket.sendall(struct.pack("!BB", SOCKS_VERSION, NO_ACCEPTABLE_METHODS))
client_socket.close()
return
# 向客户端发送身份验证通过的消息
client_socket.sendall(struct.pack("!BB", SOCKS_VERSION, SUCCESS))
# 接收客户端发来的请求
version, cmd, _, address_type = struct.unpack("!BBBB", client_socket.recv(4))
if address_type == 1: # IPv4
address = socket.inet_ntoa(client_socket.recv(4))
elif address_type == 3: # 域名
address_length = struct.unpack("!B", client_socket.recv(1))[0]
address = client_socket.recv(address_length).decode("utf-8")
elif address_type == 4: # IPv6
address = socket.inet_ntop(socket.AF_INET6, client_socket.recv(16))
else:
print("不支持的地址类型")
client_socket.close()
return
port = struct.unpack("!H", client_socket.recv(2))[0]
print(f"收到请求:{address}:{port}")
# 校验域名是否在白名单中
if address in whitelist:
print(f"请求的域名 {address} 在白名单中,允许访问")
else:
print(f"请求的域名 {address} 不在白名单中,禁止访问")
client_socket.close()
return
# 连接远程服务器,并将请求转发过去
remote_socket = socks.socksocket()
remote_socket.set_proxy(socks.SOCKS5, args.ip, args.port)
remote_socket.connect((address, port))
client_socket.sendall(struct.pack("!BBBB", SOCKS_VERSION, SUCCESS, 0, address_type))
if address_type == 1:
client_socket.sendall(socket.inet_aton(address))
elif address_type == 3:
client_socket.sendall(struct.pack("!B", len(address)) + address.encode("utf-8"))
elif address_type == 4:
client_socket.sendall(socket.inet_pton(socket.AF_INET6, address))
client_socket.sendall(struct.pack("!H", port))
inputs.append(client_socket)
inputs.append(remote_socket)
print(f"请求 {address}:{port} 转发成功")
while True:
readable, _, _ = select.select(inputs, [], [])
for sock in readable:
if sock is server_socket:
# 有新的客户端连接进来
client_socket, client_address = sock.accept()
inputs.append(client_socket)
print(f"客户端 {client_address} 已连接")
else:
# 处理客户端发来的请求
try:
handle_socks5_request(sock)
except Exception as e:
print(f"请求处理失败:{e}")
sock.close()
inputs.remove(sock)
```
通过命令行参数,可以设置代理服务器的 IP 地址、端口号、用户名、密码和域名白名单。启动代理服务器后,可以使用浏览器或其他支持 SOCKS5 协议的应用程序进行访问,代理服务器会根据域名白名单进行筛选,只允许访问指定的域名。
阅读全文