"message":"firewall,info input: in:pppoe-out2 out:(unknown 0), src-mac da:86:8e:97:00:36, proto UDP, 119.36.48.16:23548->119.122.115.56:43030, len 77" 使用logstash+grok提取message里面的所有ip和端口
时间: 2023-07-16 15:14:28 浏览: 46
可以使用以下的grok模式来提取message中的IP和端口:
```
%{GREEDYDATA} %{IPV4:src_ip}:%{NUMBER:src_port}->%{IPV4:dest_ip}:%{NUMBER:dest_port}, len %{NUMBER:len}
```
具体操作步骤如下:
1. 在Logstash的配置文件中添加grok过滤器:
```
filter {
grok {
match => { "message" => "%{GREEDYDATA} %{IPV4:src_ip}:%{NUMBER:src_port}->%{IPV4:dest_ip}:%{NUMBER:dest_port}, len %{NUMBER:len}" }
}
}
```
2. 使用Logstash启动服务,让Logstash对输入的数据进行处理:
```
bin/logstash -f /path/to/config/file.conf
```
这样,Logstash就会将输入的数据中的IP和端口提取出来,并存储在对应的字段中。可以在输出插件中查看这些字段的值。
相关问题
firewall,info input: in:pppoe-out2 out:(unknown 0), src-mac da:86:8e:97:00:36, proto UDP, 119.36.48.16:23548->119.122.115.56:43030, len 77 使用logstash+grok提取里面的所有ip和端口
假设你的日志已经被 Logstash 收集并发送到了 Elasticsearch 中,下面是一个示例的 Logstash 配置文件,将 Logstash 中的 `message` 字段按照 grok 模板进行解析,并将解析结果存储到 Elasticsearch 中:
```conf
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => "%{WORD:direction}:%{SPACE}in:%{WORD:input_interface}-%{WORD:output_interface}%{SPACE}out:%{GREEDYDATA:unknown},%{SPACE}src-mac%{SPACE}%{WORD:src_mac},%{SPACE}proto%{SPACE}%{WORD:protocol},%{SPACE}%{IPORHOST:src_ip}:%{NUMBER:src_port}->%{IPORHOST:dest_ip}:%{NUMBER:dest_port},%{SPACE}len%{SPACE}%{NUMBER:len}"
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "firewall-%{+YYYY.MM.dd}"
}
}
```
上述配置中,我们用 `beats` 输入插件接收 Beats 协议发送的数据,然后使用 `grok` 过滤器解析 `message` 字段。在 grok 模板中,我们使用了多个 grok 模式来匹配不同的字段,例如 `%{WORD:direction}` 会匹配 `in` 或 `out`,并将匹配结果存储到 `direction` 字段中。
最后,我们将解析结果存储到 Elasticsearch 中,其中 `hosts` 参数指定了 Elasticsearch 的地址,`index` 参数则按照日期将数据存储到不同的索引中。在 Elasticsearch 中,你可以使用 Kibana 或其他工具来查询和可视化这些数据。
"message":"firewall,info input_ip input: in:pppoe-1000M-4 out:(unknown 0), proto TCP (SYN), 111.7.96.136:28927->113.116.194.10:58000, len 52" "message":"firewall,info input: in:pppoe-out4 out:(unknown 0), src-mac da:86:8e:97:00:36, proto ICMP (type 8, code 0), 13.244.104.42->119.122.115.239, len 36" 使用logstash + grok 对上面两段message兼顾提取里面的全部ip和端口
可以使用以下的 grok 表达式来提取:
```
%{GREEDYDATA:message}, %{IPV4:input_ip}:%{NUMBER:input_port}->%{IPV4:output_ip}:%{NUMBER:output_port}, len %{NUMBER:length}
```
这个表达式可以匹配第一行的 message,提取出 input_ip、input_port、output_ip、output_port 和 length。
同样的,对于第二行的 message,可以使用以下的 grok 表达式来提取:
```
%{GREEDYDATA:message}, %{IPV4:source_ip}->%{IPV4:destination_ip}, len %{NUMBER:length}
```
这个表达式可以匹配第二行的 message,提取出 source_ip、destination_ip 和 length。
在 logstash 配置文件中,可以针对每条日志分别使用不同的 grok 表达式进行匹配。完整的配置文件示例如下:
```
input {
beats {
port => 5044
}
}
filter {
if "firewall,info input_ip input" in [message] {
grok {
match => { "message" => "%{GREEDYDATA:message}, %{IPV4:input_ip}:%{NUMBER:input_port}->%{IPV4:output_ip}:%{NUMBER:output_port}, len %{NUMBER:length}" }
}
} else if "firewall,info input: in:pppoe-out4" in [message] {
grok {
match => { "message" => "%{GREEDYDATA:message}, %{IPV4:source_ip}->%{IPV4:destination_ip}, len %{NUMBER:length}" }
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "firewall-%{+YYYY.MM.dd}"
}
}
```
这个配置文件中,使用了两个条件判断语句来选择不同的 grok 表达式。第一个条件判断语句匹配第一条日志,第二个条件判断语句匹配第二条日志。在匹配成功之后,将会提取出相应的字段,并将数据输出到 Elasticsearch 中。