hadoop mapreduce按照院系分类听课学时统计输出每个院系听课总学时Java代码应该怎么修改
时间: 2024-11-03 12:18:58 浏览: 18
Hadoop MapReduce适用于大数据处理任务,对于按院系分类统计听课学时这样的场景,你需要对原始数据进行解析、映射(map)、规约(reduce),并最终输出结果。这里假设你已经有了一个文本文件,其中包含学生信息(如姓名、院系、课程名、学时等),可以参考以下步骤来修改Java代码:
首先,定义Mapper类:
```java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CourseHourMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text department = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
if (line.length >= 4) { // 假设每行有院系、姓名等四个字段
department.set(line[0]); // 设置院系作为键
context.write(department, one); // 输出院系和1,表示该院系有1个学时
}
}
}
```
接下来,定义Reducer类,因为在这种场景下通常不会有合并操作,所以Reducer简化为IdentityReducer:
```java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CourseHourReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text department, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int totalHours = 0;
for (IntWritable hour : values) {
totalHours += hour.get();
}
context.write(department, new IntWritable(totalHours)); // 输出院系及其总学时
}
}
```
配置Job Configuration:
```java
import org.apache.hadoop.conf.Configuration;
// ...
Configuration conf = new Configuration();
conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
conf.set("mapreduce.job.outputformat.class", "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
Job job = Job.getInstance(conf, "Course Hour Statistics");
job.setJarByClass(MyDriver.class);
job.setMapperClass(CourseHourMapper.class);
job.setReducerClass(CourseHourReducer.class);
```
最后,在驱动类中运行Job,并设置输入输出路径:
```java
// ...
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: <inputdir> <outputdir>");
System.exit(-1);
}
Job job = Job.getInstance();
job.waitForCompletion(true);
}
阅读全文