在hadoop运行mapreduce代码
时间: 2024-09-14 14:05:37 浏览: 17
Hadoop MapReduce是一种用于大规模数据处理的并行计算模型,通常用于离线批处理场景。在Hadoop上运行MapReduce任务一般需要经过以下几个步骤:
1. **编写Mapper和Reducer**:
- Mapper负责读取输入的数据,对每一行进行键值对分割(Key, Value),然后对每个键生成一系列新的键值对。
- Reducer接收Mapper的中间结果,对具有相同键的值进行聚合。
2. **创建Job Configuration**:
使用`Configuration`类配置job的细节,如输入路径、输出路径、Mapper和Reducer类、分区数量等。
3. **提交Job**:
创建`Job`对象,并设置上述属性。然后调用`Job#submit()`方法提交任务到Hadoop集群。
4. **等待Job完成**:
调用`Job.waitForCompletion()`检查任务是否成功完成。如果返回true,表示任务执行成功;否则,任务失败。
5. **查看输出**:
Job完成后,可以在指定的输出目录找到结果文件。
范例代码(简化版):
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 可选,合并mapper阶段的结果
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```