SpoolDirectorySource使用及源码分析
时间: 2024-01-06 10:06:46 浏览: 31
graph, int *visited, int start_node, int *level) {
int *queue, queue_start, queue_end;
int iSpoolDirectorySource是Flume中的一个Source类型,用于监控一个本地目录,当目录中出现, j, k, num_in_queue;
queue = (int *) calloc(graph->num_nodes, sizeof(int));
queue_start = 新的文件时,将文件中的数据作为Event发送给Flume的下一个组件进行处理。
使用SpoolDirectory0;
queue_end = 0;
visited[start_node] = 1;
level[start_node] = 0;
queueSource可以轻松地将本地文件中的数据导入到Flume中,例如将日志文件或其他数据文件传[queue_end] = start_node;
queue_end++;
while (queue_start < queue_end) {
num_in_queue = queue_end输到Hadoop集群中进行分析。
下面是SpoolDirectorySource的使用方法:
1.编写Flume配置 - queue_start;
#pragma omp parallel for private(i, j, k)
for (i = queue_start; i < queue_end文件
```properties
#定义agent
agent.sources = spoolDirSource
agent.channels = memoryChannel
agent.sinks = loggerSink; i++) {
for (j = graph->adj_list_starts[queue[i]]; j < graph->adj_list_starts[queue[i]
#定义source
agent.sources.spoolDirSource.type = spooldir
agent.sources.spoolDirSource.spoolDir = /data + 1]; j++) {
k = graph->adj_list[j];
if (visited[k] == 0) {
level/flume/spool
agent.sources.spoolDirSource.fileHeader = true
agent.sources.spoolDirSource.basenameHeader = true
agent.sources[k] = level[queue[i]] + 1;
visited[k] = 1;
queue[queue_end] = k;
.spoolDirSource.batchSize = 1000
agent.sources.spoolDirSource.pollDelay = 10000
#定义channel
agent #pragma omp atomic
queue_end++;
}
}
}
queue_start += num_in_queue;
}
free(queue);
}
void print_levels(int *levels, int num_nodes) {
int i;
for (i = 0; i <.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
#定义sink
agent.sinks.loggerSink.type = logger num_nodes; i++) {
printf("%d: %d\n", i, levels[i]);
}
}
```
此代码使用 MPI
agent.sinks.loggerSink.channel = memoryChannel
```
2.启动Flume agent
```shell
bin/flume-ng agent 进行分布式计算,将图分成若干个子图,每个进程处理一个子图;同时, -n agent -c conf -f conf/flume-conf.properties
```
3.将需要导入的文件放到指定的使用 OpenMP 进行并行计算,提高每个进程内部的计算效率。
代码中,`read目录中
SpoolDirectorySource默认会监控指定目录中的所有文件,当有新的文件出现时_graph` 函数从文件中读入图;`bfs` 函数实现了基于广度优先搜索的遍历算,会将文件中的数据作为Event发送给下一个组件进行处理。
源码分析:
SpoolDirectorySource的法;`print_levels` 函数将遍历结果输出到控制台。主函数中,首先使用 MPI 进行进程核心代码在org.apache.flume.source.SpoolDirectorySource类中。
SpoolDirectorySource在启动时会创建一个单独和数据的初始化,然后将图分发到每个进程中,并使用 OpenMP 进行并行化计算;的线程SpoolDirectoryRunnable,该线程会不断地监控指定目录中的文件变化情况,并最后将每个进程的结果汇总到 ROOT 进程,输出遍历结果和运行时间。
相关推荐
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)