请在这段代码的基础上,向一个ip和端口组成列表发送SYN包,一段时间内检测哪些ip和端口没有返回
时间: 2024-03-13 15:42:58 浏览: 155
masscan:TCP端口扫描程序,异步发出SYN数据包,在5分钟内扫描整个Internet
好的,可以使用 `socket` 模块创建原始套接字,构造 `SYN` 数据包,向指定的 IP 和端口列表发送 `SYN` 包,然后通过 `select.select` 函数等待一段时间,检测哪些 IP 和端口没有返回。具体实现方式可以参考如下代码片段:
```python
import select
import socket
import struct
import time
# 构造 IP 报文头部
def make_ip_header(src_ip, dest_ip):
ip_version = 4
ip_header_len = 5
ip_tos = 0
ip_total_len = 0
ip_id = 0
ip_frag_off = 0
ip_ttl = 255
ip_proto = socket.IPPROTO_TCP
ip_checksum = 0
src_ip_bytes = socket.inet_aton(src_ip)
dest_ip_bytes = socket.inet_aton(dest_ip)
ip_header = struct.pack('!BBHHHBBH4s4s',
(ip_version << 4) | ip_header_len,
ip_tos,
ip_total_len,
ip_id,
ip_frag_off,
ip_ttl,
ip_proto,
ip_checksum,
src_ip_bytes,
dest_ip_bytes)
return ip_header
# 构造 TCP 报文头部
def make_tcp_header(src_port, dest_port, seq_num, ack_num, flags):
tcp_header_len = 5
tcp_reserved = 0
tcp_data_offset = (tcp_header_len << 4) | tcp_reserved
tcp_window_size = socket.htons(8192)
tcp_checksum = 0
tcp_urgent_ptr = 0
tcp_header = struct.pack('!HHLLBBHHH',
src_port,
dest_port,
seq_num,
ack_num,
tcp_data_offset,
flags,
tcp_window_size,
tcp_checksum,
tcp_urgent_ptr)
return tcp_header
# 计算校验和
def checksum(data):
if len(data) % 2 != 0:
data += b'\0'
res = sum(struct.unpack('!{}H'.format(len(data) // 2), data))
res = (res >> 16) + (res & 0xffff)
res += (res >> 16)
return ~res & 0xffff
# 构造 SYN 数据包
def make_syn_packet(src_ip, src_port, dest_ip, dest_port, seq_num):
ip_header = make_ip_header(src_ip, dest_ip)
tcp_header = make_tcp_header(src_port, dest_port, seq_num, 0, 2)
pseudo_header = struct.pack('!4s4sBBH',
socket.inet_aton(src_ip),
socket.inet_aton(dest_ip),
0,
socket.IPPROTO_TCP,
len(tcp_header))
tcp_checksum = checksum(pseudo_header + tcp_header)
syn_packet = ip_header + tcp_header[:16] + struct.pack('!H', tcp_checksum) + tcp_header[18:]
return syn_packet
# 设置要扫描的目标 IP 和端口列表
target_list = [('127.0.0.1', 80), ('127.0.0.1', 443)]
# 创建原始套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
# 设置源 IP 和端口号
src_ip = '127.0.0.1'
src_port = 12345
# 发送 SYN 包
for dest_ip, dest_port in target_list:
seq_num = int(time.time() * 1000) & 0xffffffff
syn_packet = make_syn_packet(src_ip, src_port, dest_ip, dest_port, seq_num)
sock.sendto(syn_packet, (dest_ip, dest_port))
# 等待一段时间,检查哪些 IP 和端口没有返回 SYN+ACK 包
timeout = 5
start_time = time.time()
inputs = [sock]
unreachable_list = []
while True:
if time.time() - start_time > timeout:
break
readable, _, _ = select.select(inputs, [], [], timeout)
for r in readable:
if r is sock:
data, addr = sock.recvfrom(65535)
ip_header_len = (data[0] & 0xf) * 4
tcp_header_len = (data[ip_header_len + 12] >> 4) * 4
src_ip = socket.inet_ntoa(data[12:16])
src_port = struct.unpack('!H', data[ip_header_len:ip_header_len+2])[0]
dest_ip = socket.inet_ntoa(data[16:20])
dest_port = struct.unpack('!H', data[ip_header_len+2:ip_header_len+4])[0]
flags = data[ip_header_len+tcp_header_len+13]
if (src_ip, src_port) in target_list and flags == 18:
print('{}:{} is reachable'.format(src_ip, src_port))
elif (src_ip, src_port) in target_list:
unreachable_list.append((src_ip, src_port))
# 输出未返回 SYN+ACK 包的 IP 和端口
for ip, port in unreachable_list:
print('{}:{} is unreachable'.format(ip, port))
```
在上述代码中,我们首先定义了一些函数来构造 IP、TCP 报文头部,计算校验和,以及构造 `SYN` 数据包。然后,我们设置了要扫描的目标 IP 和端口列表,并通过原始套接字向列表中的每个 IP 和端口发送 `SYN` 包。接着,我们使用 `select.select` 函数等待一段时间,检测哪些 IP 和端口没有返回 `SYN+ACK` 包。最后,我们输出未返回 `SYN+ACK` 包的 IP 和端口。
阅读全文