job.setNumReduceTasks(Integer.parseInt(args[2]));
时间: 2024-06-08 12:10:09 浏览: 38
这行代码是用来设置 MapReduce 任务中 Reduce 的数量的。在 Hadoop 中,MapReduce 任务通常分为两个阶段:Map 阶段和 Reduce 阶段。Map 阶段将输入数据切分成若干个数据块进行并行处理,产生中间结果。Reduce 阶段将 Map 阶段输出的中间结果进行合并,生成最终的输出结果。在这里,使用了 `job.setNumReduceTasks()` 方法来设置 Reduce 任务的数量。`args[2]` 表示从命令行参数中获取的 Reduce 任务的数量。该参数越大,Reduce 任务的并行度越高,处理速度也会更快,但是会占用更多的系统资源。
相关问题
使用mr程序统计每年入职的人数。 最终结果要求如下: 1. 格式如下: 年份:1980 人数:xxx 年份:1981 人数:xxx ....... 2. 两个分区: 0分区存储 入职年份<1982年的 1分区存储 入职年份>=1982年的
以下是使用MapReduce程序实现统计每年入职人数的步骤:
1. Mapper阶段:
mapper读取输入文件中的每一行,提取出入职年份作为键,将值设置为1,将键值对输出。
2. Reducer阶段:
reducer接收来自mapper传来的键值对,将同一年份的值进行累加并输出。
3. Partitioner阶段:
partitioner根据入职年份的大小,将数据分发到0或1分区。
以下是示例代码:
Mapper类:
```
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable year = new IntWritable();
private IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
int yearValue = Integer.parseInt(fields[1]);
year.set(yearValue);
context.write(year, one);
}
}
```
Reducer类:
```
public class EmployeeReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
```
Partitioner类:
```
public class EmployeePartitioner extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
if (key.get() < 1982) {
return 0;
} else {
return 1;
}
}
}
```
驱动程序:
```
public class EmployeeCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "employee count");
job.setJarByClass(EmployeeCount.class);
job.setMapperClass(EmployeeMapper.class);
job.setPartitionerClass(EmployeePartitioner.class);
job.setReducerClass(EmployeeReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
在运行程序时,需要将输入文件和输出目录作为程序参数传入。例如:
```
hadoop jar EmployeeCount.jar input/employee.csv output
```
输出结果将会保存在output文件夹中,可以使用cat命令查看输出:
```
hadoop fs -cat output/part-r-00000
hadoop fs -cat output/part-r-00001
```
其中,part-r-00000为0分区的输出,part-r-00001为1分区的输出。
实现Reduce输出结果的全排序
实现Reduce输出结果的全排序,需要在Reducer中进行一定的修改。以下是一个基本的MapReduce全排序的代码示例:
Mapper函数:
```
public static class SortMapper extends Mapper<Object, Text, IntWritable, IntWritable>{
private IntWritable num = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
num.set(Integer.parseInt(line));
context.write(num, new IntWritable(1));
}
}
```
Reducer函数:
```
public static class SortReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
context.write(key, new IntWritable(1));
}
}
}
```
驱动函数:
```
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1); // 设置Reduce任务数为1,确保全局排序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
```
在这个示例中,Mapper函数与之前的示例相同。在Reducer函数中,我们直接输出键值对,因为它们已经按照键进行了排序。在驱动函数中,我们设置了Reduce任务数为1,以确保全局排序。
相关推荐
![docx](https://img-home.csdnimg.cn/images/20210720083331.png)
![pdf](https://img-home.csdnimg.cn/images/20210720083512.png)
![pdf](https://img-home.csdnimg.cn/images/20210720083512.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)