import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text house_ownership = new Text(); private IntWritable label = new IntWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 跳过CSV文件的标题行 if (key.get() == 0) { return; } String[] fields = line.split(","); // 提取house_ownership和label字段 String house_ownershipValue = fields[6]; int labelValue = Integer.parseInt(fields[11]); // 输出键值对 house_ownership.set(house_ownershipValue); label.set(labelValue); context.write(house_ownership, label); } }
时间: 2024-04-03 18:33:20 浏览: 224
这是一个Hadoop MapReduce框架中的Mapper类,它继承自Mapper类,并重写了Mapper类中的map()方法。该类的作用是将输入的一行数据进行解析,提取出其中的房屋拥有情况和标签信息,并输出为键值对的形式。具体来讲,它首先将一行数据转化为字符串类型,然后通过split()方法将其按照逗号分隔成一个字符串数组fields。接着,它从fields数组中提取出所需的house_ownership和label字段,并将它们分别赋值给变量house_ownershipValue和labelValue。最后,它将这些值设置为输出的键和值,并通过context对象将它们输出到Reducer中进行汇总。需要注意的是,这个Mapper类跳过了CSV文件的标题行,只对数据行进行处理。
相关问题
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, Int
Writable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Splitting the input line into words
String[] words = value.toString().split(" ");
// Mapping each word with a count of 1
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
这是一个 Hadoop MapReduce 程序中的 Mapper 类,用于对输入的文本进行分词,并对每个单词进行计数。其中 LongWritable 表示输入的键类型,Text 表示输入的值类型,Text 表示输出的键类型,IntWritable 表示输出的值类型。在 map 方法中,将输入的文本按空格切分成单词,然后将每个单词映射成键值对,键为单词,值为 1。最后通过 context.write 方法输出键值对。
如何优化package com.hackdata; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable,Text, LongWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable v:values) { count += v.get(); } context.write(key, new LongWritable(count)); } }
在这段代码中,主要的计算逻辑已经很简洁明了了,但是我们仍然可以进行一些优化:
1. 使用Primitive类型替代Wrapper类型
在这段代码中,数据类型使用了IntWritable和LongWritable,它们是Hadoop框架提供的Wrapper类型,包含了一个int或long类型的值。这些Wrapper类型比较灵活,可以在多种场景下使用,但是它们也有一定的性能开销,因为每次读取或写入值时,需要进行类型转换。
因此,我们可以考虑使用Java原生的int和long类型,这样可以减少类型转换的开销。修改后的代码如下:
```
public class WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new LongWritable(count));
}
}
```
2. 使用Combiner进一步减少数据传输
在MapReduce框架中,Combiner是一种在Map阶段之后,在数据传输到Reduce阶段之前执行的简单Reducer。Combiner的作用是对Map输出进行本地聚合,减少数据传输量,从而进一步优化性能。
在这段代码中,我们可以添加一个Combiner,它的作用是对Map输出进行本地聚合,将相同的单词进行合并,减少传输到Reduce节点的数据量,从而减少网络带宽开销和Reduce节点的计算负担。修改后的代码如下:
```
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
long count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new LongWritable(count));
}
}
```
在Job中添加Combiner的代码如下:
```
job.setCombinerClass(WordCountCombiner.class);
```
3. 使用多个Reducer并行处理数据
在大规模数据处理中,单个Reduce节点可能会成为瓶颈,因为它需要处理所有键值对。如果数据量很大,Reduce节点的计算负担会非常重,导致处理时间变长。因此,我们可以在Job中设置多个Reduce节点,并行处理数据,从而进一步优化性能。
在Job中设置多个Reducer的代码如下:
```
job.setNumReduceTasks(10);
```
这里设置了10个Reduce节点,可以根据实际情况进行调整。
综上所述,以上优化措施可以提高程序的性能,使其更好地适应大规模数据处理的场景。
阅读全文