SpoolDirectorySource使用及源码分析
时间: 2024-01-08 21:04:57 浏览: 95
flume log4f示例源码
SpoolDirectorySource 是 Flume 框架中的一个 Source 组件,用于监控指定目录下的文件,当有新文件出现时,将文件内容读取并发送到 Flume 的 Channel 中供后续处理。
使用 SpoolDirectorySource 需要配置以下参数:
- spoolDir:监控的目录路径。
- fileHeader:文件头信息。
- fileSuffix:文件后缀名。
- batchSize:每次读取文件的批量大小。
- ignorePattern:忽略的文件匹配模式。
- deserializer:文件内容的反序列化方式。
SpoolDirectorySource 的工作原理如下:
- 不断轮询指定目录下的文件列表,如果有新文件出现,则将文件信息加入到待处理列表中。
- 从待处理列表中取出文件信息,读取文件内容并发送到 Channel 中。
- 处理完成后将文件信息从待处理列表中删除。
SpoolDirectorySource 的源码分析:
SpoolDirectorySource 的源码位于 flume-ng-core 模块中的 org.apache.flume.source 目录下,主要类包括 SpoolDirectorySource 和 SpoolDirectoryRunnable。
SpoolDirectorySource 继承自 AbstractSource,重写了 doConfigure、doStart 和 doStop 方法,其中 doStart 方法启动了一个新的线程来监控指定目录下的文件。具体实现可参考以下代码:
```java
@Override
protected void doStart() throws FlumeException {
logger.info("SpoolDirectorySource source starting with directory:{}",
spoolDirectory);
try {
directory = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(spoolDirectory)
.deserializer(deserializer)
.checkpointDir(new File(spoolDirectory + ".checkpoint"))
.ignorePattern(ignorePattern)
.trackerDir(new File(spoolDirectory + ".tracker"))
.annotateFileName(fileHeader)
.fileNameHeader(fileHeader)
.consumeOrder(consumeOrder)
.bufferSize(bufferSize)
.fileSuffix(fileSuffix)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser", ioe);
}
executor = Executors.newSingleThreadExecutor();
runner = new SpoolDirectoryRunnable(directory, sourceCounter);
executor.execute(runner);
super.doStart();
logger.debug("SpoolDirectorySource source started");
}
```
SpoolDirectoryRunnable 实现了 Runnable 接口,实现了 SpoolDirectorySource 监控目录下文件的具体逻辑。具体实现可参考以下代码:
```java
@Override
public void run() {
logger.debug("SpoolDirectoryRunnable starts");
while (!Thread.interrupted()) {
try {
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
continue;
}
sourceCounter.addToEventReceivedCount(events.size());
source.getChannelProcessor().processEventBatch(events);
reader.commit();
sourceCounter.addToEventAcceptedCount(events.size());
} catch (Throwable t) {
logger.error("Unable to read events from directory " + reader.getSpoolDirectory(), t);
if (t instanceof Error) {
throw (Error) t;
}
}
}
try {
reader.close();
} catch (IOException e) {
logger.error("Error while closing the directory reader", e);
}
}
```
以上代码中,reader.readEvents(batchSize) 会读取 batchSize 个文件,并将文件内容转化成 Event 对象,存储在 events 列表中;source.getChannelProcessor().processEventBatch(events) 将 events 发送到 Channel 中;reader.commit() 会提交当前读取的文件,避免重复读取;sourceCounter.addToEventReceivedCount 和 sourceCounter.addToEventAcceptedCount 用于计数。
综上所述,SpoolDirectorySource 的工作原理是通过一个新的线程不断轮询指定目录下的文件,将新文件内容读取并转化成 Event 对象发送到 Channel 中,实现了 Flume 对文件的监控和实时处理。
阅读全文