flume监听文件写入hdfs,并且将文件中的,修改为|
时间: 2023-12-10 17:40:40 浏览: 94
可以通过Flume的Spooling Directory Source来实现这个功能。Spooling Directory Source会监控一个指定的目录,当有新的文件被写入时,就会将文件内容作为事件传递给Flume的Channel。然后再通过Flume的Interceptor将文件中的逗号修改为竖杠,并存储到HDFS中。
具体的配置如下:
1. 在Flume的配置文件中添加一个Spooling Directory Source:
```
# define the Spooling Directory Source
agent.sources = spoolDir
agent.sources.spoolDir.type = spooldir
agent.sources.spoolDir.spoolDir = /path/to/spool/directory
agent.sources.spoolDir.fileHeader = true
agent.sources.spoolDir.fileSuffix = .COMPLETED
```
2. 添加一个Interceptor来修改文件内容
```
# define the Interceptor to modify the file content
agent.sources.spoolDir.interceptors = interceptor1
agent.sources.spoolDir.interceptors.interceptor1.type = regex_replace
agent.sources.spoolDir.interceptors.interceptor1.regex = ,
agent.sources.spoolDir.interceptors.interceptor1.replaceString = |
```
3. 添加一个HDFS Sink
```
# define the HDFS Sink
agent.sinks = hdfsSink
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /path/to/hdfs/directory
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.rollInterval = 0
```
最后将Spooling Directory Source和HDFS Sink连接起来:
```
# connect the Source and Sink
agent.sources.spoolDir.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel
```
这样配置后,Flume就会监听指定的目录,当有新文件写入时,将文件内容作为事件传递给内存Channel。Interceptor会修改文件内容,然后将修改后的事件存储到HDFS中。
阅读全文