hadoopMapReduce统计词频
时间: 2024-12-25 09:22:57 浏览: 12
### 使用 Hadoop MapReduce 进行词频统计
#### 1. 环境准备
为了运行基于 Java 的 MapReduce 应用程序,需先安装配置好 Hadoop 集群环境。确保集群中的每台机器都已正确设置并能正常启动 HDFS 和 YARN。
#### 2. 创建项目结构
创建一个新的 Maven 或 Gradle 工程,在工程目录下建立源文件夹 `src/main/java` 来放置 Mapper 和 Reducer 类以及驱动类[^1]。
#### 3. 编写 WordCount 程序
##### Mapper 类定义
Mapper 负责读取输入数据并将每一行拆分为单词键值对输出:
```java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().replaceAll("[^a-z]", ""));
if (!word.toString().isEmpty()) {
context.write(word, one);
}
}
}
}
```
##### Reducer 类定义
Reducer 接收来自 Mapper 的中间结果并对相同 Key 值的数据项求和得到最终的频率计数:
```java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
```
##### Driver 主函数编写
Driver 设置作业参数并提交给 Hadoop 执行框架:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 可选:用于本地聚合减少网络传输量
job.setReducerClass(IntSumReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
上述代码实现了完整的 Word Count 流程,其中包含了必要的日志记录配置以帮助调试与监控应用程序执行情况[^2]。
阅读全文