ip = struct.unpack('4s', data[16:20])
时间: 2024-01-17 16:06:07 浏览: 110
这行代码用于将字节流中从第16个字节到第20个字节(不包括第20个字节)的4个字节数据解析成一个IP地址。具体地,使用Python标准库中的struct模块的unpack函数,将字节流按照指定的格式进行解包。其中,'4s'表示将4个字节解析成一个字符串。解析后得到的是一个bytes类型的对象,需要使用socket库中的inet_ntoa函数将其转换成IP地址的字符串形式。
相关问题
编写程序,监听本机收到的所有网络数据,解析每个数据的IP、TCP、UDP头部信息,同时记录不同IP地址发来数据得次数。60秒后停止监听,输出不同IP地址发来数据的次数。 扩展:对实验代码进行精简或增加功能。
这是一个比较复杂的任务,需要使用网络编程和数据解析相关的知识,下面给出一个简单的实现思路:
1. 使用Python的socket库创建一个TCP套接字,并绑定到本机的IP地址和端口号。
2. 使用socket的listen方法开始监听网络数据,并使用select模块实现非阻塞IO,以便同时处理多个连接。
3. 对于每个接收到的数据包,使用Python的struct库解析其IP、TCP、UDP头部信息,并记录下源IP地址。
4. 使用Python的time库定时器功能,实现60秒后停止监听,并输出不同IP地址发来数据的次数。
下面是一个简单的示例代码,其中省略了一些数据解析和计数的细节:
```python
import socket
import select
import struct
import time
IP_HDR_LEN = 20
TCP_HDR_LEN = 20
UDP_HDR_LEN = 8
def parse_packet(data):
ip_header = data[:IP_HDR_LEN]
iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
src_addr = socket.inet_ntoa(iph[8])
protocol = iph[6]
if protocol == 6: # TCP
tcp_header = data[IP_HDR_LEN:IP_HDR_LEN+TCP_HDR_LEN]
# parse TCP header
elif protocol == 17: # UDP
udp_header = data[IP_HDR_LEN:IP_HDR_LEN+UDP_HDR_LEN]
# parse UDP header
return src_addr
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
sock.bind(('0.0.0.0', 0))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
sock.setblocking(False)
inputs = [sock]
start_time = time.time()
ip_count = {}
while True:
if time.time() - start_time >= 60:
break
readable, _, _ = select.select(inputs, [], [], 0.1)
for conn in readable:
data = conn.recv(65535)
if not data:
inputs.remove(conn)
continue
src_addr = parse_packet(data)
if src_addr in ip_count:
ip_count[src_addr] += 1
else:
ip_count[src_addr] = 1
for ip, count in ip_count.items():
print(f'{ip}: {count}')
if __name__ == '__main__':
main()
```
这个示例代码可以监听TCP数据包,如果需要监听UDP数据包,只需要将socket的第三个参数改为socket.IPPROTO_UDP即可。同时,也可以根据需要对数据解析和计数等细节进行优化和修改。
请在这段代码的基础上,向一个ip和端口组成列表发送SYN包,一段时间内检测哪些ip和端口没有返回
好的,可以使用 `socket` 模块创建原始套接字,构造 `SYN` 数据包,向指定的 IP 和端口列表发送 `SYN` 包,然后通过 `select.select` 函数等待一段时间,检测哪些 IP 和端口没有返回。具体实现方式可以参考如下代码片段:
```python
import select
import socket
import struct
import time
# 构造 IP 报文头部
def make_ip_header(src_ip, dest_ip):
ip_version = 4
ip_header_len = 5
ip_tos = 0
ip_total_len = 0
ip_id = 0
ip_frag_off = 0
ip_ttl = 255
ip_proto = socket.IPPROTO_TCP
ip_checksum = 0
src_ip_bytes = socket.inet_aton(src_ip)
dest_ip_bytes = socket.inet_aton(dest_ip)
ip_header = struct.pack('!BBHHHBBH4s4s',
(ip_version << 4) | ip_header_len,
ip_tos,
ip_total_len,
ip_id,
ip_frag_off,
ip_ttl,
ip_proto,
ip_checksum,
src_ip_bytes,
dest_ip_bytes)
return ip_header
# 构造 TCP 报文头部
def make_tcp_header(src_port, dest_port, seq_num, ack_num, flags):
tcp_header_len = 5
tcp_reserved = 0
tcp_data_offset = (tcp_header_len << 4) | tcp_reserved
tcp_window_size = socket.htons(8192)
tcp_checksum = 0
tcp_urgent_ptr = 0
tcp_header = struct.pack('!HHLLBBHHH',
src_port,
dest_port,
seq_num,
ack_num,
tcp_data_offset,
flags,
tcp_window_size,
tcp_checksum,
tcp_urgent_ptr)
return tcp_header
# 计算校验和
def checksum(data):
if len(data) % 2 != 0:
data += b'\0'
res = sum(struct.unpack('!{}H'.format(len(data) // 2), data))
res = (res >> 16) + (res & 0xffff)
res += (res >> 16)
return ~res & 0xffff
# 构造 SYN 数据包
def make_syn_packet(src_ip, src_port, dest_ip, dest_port, seq_num):
ip_header = make_ip_header(src_ip, dest_ip)
tcp_header = make_tcp_header(src_port, dest_port, seq_num, 0, 2)
pseudo_header = struct.pack('!4s4sBBH',
socket.inet_aton(src_ip),
socket.inet_aton(dest_ip),
0,
socket.IPPROTO_TCP,
len(tcp_header))
tcp_checksum = checksum(pseudo_header + tcp_header)
syn_packet = ip_header + tcp_header[:16] + struct.pack('!H', tcp_checksum) + tcp_header[18:]
return syn_packet
# 设置要扫描的目标 IP 和端口列表
target_list = [('127.0.0.1', 80), ('127.0.0.1', 443)]
# 创建原始套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
# 设置源 IP 和端口号
src_ip = '127.0.0.1'
src_port = 12345
# 发送 SYN 包
for dest_ip, dest_port in target_list:
seq_num = int(time.time() * 1000) & 0xffffffff
syn_packet = make_syn_packet(src_ip, src_port, dest_ip, dest_port, seq_num)
sock.sendto(syn_packet, (dest_ip, dest_port))
# 等待一段时间,检查哪些 IP 和端口没有返回 SYN+ACK 包
timeout = 5
start_time = time.time()
inputs = [sock]
unreachable_list = []
while True:
if time.time() - start_time > timeout:
break
readable, _, _ = select.select(inputs, [], [], timeout)
for r in readable:
if r is sock:
data, addr = sock.recvfrom(65535)
ip_header_len = (data[0] & 0xf) * 4
tcp_header_len = (data[ip_header_len + 12] >> 4) * 4
src_ip = socket.inet_ntoa(data[12:16])
src_port = struct.unpack('!H', data[ip_header_len:ip_header_len+2])[0]
dest_ip = socket.inet_ntoa(data[16:20])
dest_port = struct.unpack('!H', data[ip_header_len+2:ip_header_len+4])[0]
flags = data[ip_header_len+tcp_header_len+13]
if (src_ip, src_port) in target_list and flags == 18:
print('{}:{} is reachable'.format(src_ip, src_port))
elif (src_ip, src_port) in target_list:
unreachable_list.append((src_ip, src_port))
# 输出未返回 SYN+ACK 包的 IP 和端口
for ip, port in unreachable_list:
print('{}:{} is unreachable'.format(ip, port))
```
在上述代码中,我们首先定义了一些函数来构造 IP、TCP 报文头部,计算校验和,以及构造 `SYN` 数据包。然后,我们设置了要扫描的目标 IP 和端口列表,并通过原始套接字向列表中的每个 IP 和端口发送 `SYN` 包。接着,我们使用 `select.select` 函数等待一段时间,检测哪些 IP 和端口没有返回 `SYN+ACK` 包。最后,我们输出未返回 `SYN+ACK` 包的 IP 和端口。
阅读全文