请给我一个sniff过滤表达式,获得从端口5050收到但不是本机发出的包
时间: 2024-03-15 07:47:25 浏览: 24
以下是一个使用 `sniff` 函数和过滤器获取从端口5050接收到的非本机发出的包的 Python 代码示例:
```python
from scapy.all import *
def packet_handler(packet):
if packet.haslayer(TCP) and packet[TCP].dport == 5050 and packet[IP].src != "<本机IP地址>":
print(packet.summary())
sniff(filter="tcp port 5050", prn=packet_handler)
```
请将 `<本机IP地址>` 替换为你的机器的实际IP地址。这个代码将使用 `sniff` 函数捕获从端口5050接收到的所有TCP包,并对每个包应用 `packet_handler` 函数进行处理。在 `packet_handler` 函数中,我们检查包是否包含TCP层并且目标端口为5050,同时源IP地址不是本机IP地址。如果是,则打印包的摘要信息。
相关问题
我想只检测收到包,过滤掉本机发出去的包,请修改代码
在 Scapy 中,可以使用 `sniff` 函数的 `lfilter` 参数来过滤捕获到的数据包。`lfilter` 参数接受一个函数作为参数,该函数接受一个数据包作为参数,返回一个布尔值,表示是否接受该数据包。
下面是一个示例代码,用于过滤掉本机发出去的包,只检测收到的包:
```python
from scapy.all import *
# 定义过滤函数,用于过滤掉本机发出去的包
def filter_packet(packet):
return not packet.haslayer(IP) or packet[IP].src != socket.gethostbyname(socket.gethostname())
# 定义回调函数,用于处理捕获到的数据包
def packet_callback(packet):
src_ip = packet[IP].src
src_port = packet[TCP].sport
print(f"Received packet from {src_ip}:{src_port}")
# 使用 sniff 函数捕获网络数据包
sniff(prn=packet_callback, lfilter=filter_packet, count=10)
```
在这个示例代码中,我们定义了一个过滤函数 `filter_packet`,用于过滤掉本机发出去的包。在过滤函数中,我们使用 `haslayer` 方法判断数据包是否包含 IP 层,如果不包含 IP 层,则返回 True,表示接受该数据包。如果包含 IP 层,则使用 `socket.gethostbyname(socket.gethostname())` 获取本机 IP 地址,然后判断数据包的源 IP 是否等于本机 IP,如果不等于,则返回 True,表示接受该数据包。
注意,在使用 `socket.gethostbyname(socket.gethostname())` 获取本机 IP 地址时,可能会有多个 IP 地址,其中可能有一个是 127.0.0.1,表示本地回环地址。如果想要排除本地回环地址,可以在过滤函数中加入判断条件,如下所示:
```python
def filter_packet(packet):
return (not packet.haslayer(IP)) or \
(packet[IP].src != socket.gethostbyname(socket.gethostname()) and packet[IP].src != '127.0.0.1')
```
这样,过滤函数就可以过滤掉本机发出去的包,并只检测收到的包。
sniff并没有停止,请给我一个强制结束进程的方法
如果 `stop_filter` 参数无法停止 `sniff` 函数的执行,您可以使用 `terminate()` 方法来强制结束线程。下面是一个示例代码:
```python
import threading
import time
from scapy.all import *
stop_sniffing = False
def packet_callback(packet, port):
# 处理收到的数据包
...
def receive_packets(port, timeout):
global stop_sniffing
start_time = time.time()
while not stop_sniffing and time.time() - start_time < timeout:
sniff(prn=lambda packet: packet_callback(packet, port), filter="tcp port {}".format(port), count=1)
print("Stopped sniffing")
def stop_sniffing_process():
global stop_sniffing
stop_sniffing = True
# 启动嗅探线程
src_port = 80
timeout = 10
receive_thread = threading.Thread(target=receive_packets, args=(src_port, timeout))
receive_thread.start()
# 在另一个进程中停止嗅探
stop_sniffing_process()
# 等待嗅探线程结束
receive_thread.join(timeout=1)
# 如果线程没有结束,强制结束
if receive_thread.is_alive():
receive_thread.terminate()
```
在这个示例中,如果嗅探线程没有在 1 秒内结束,就会使用 `terminate()` 方法强制结束线程。注意,使用 `terminate()` 方法强制结束线程可能会导致一些未处理的异常,因此尽量避免使用它。如果可能的话,应该使用 `stop_filter` 参数来停止 `sniff` 函数的执行。