2.总成绩在500分以上的存储在part-r-00000,总成绩在400分以上的存储在part-r-00001,总成绩在300分以上的存储在part-r-00002,其它存储在part-r-00003。
时间: 2023-12-14 21:02:47 浏览: 89
学生成绩的输入和查询及保存
这是一个关于Hadoop MapReduce的问题。
在MapReduce任务中,Mapper负责将输入文件拆分成若干个key-value对,然后将这些key-value对交给Reducer进行处理。Reducer会将相同key的value合并在一起进行计算,并输出结果。
根据题目要求,需要将总成绩在不同分数段的学生信息分别存储到不同的输出文件中。这里需要自定义Mapper和Reducer。
Mapper的输出应该是一个由成绩段和学生信息组成的key-value对,例如:
```python
# 以总成绩在500分以上为例
# key: '500+' value: '学生信息'
key = '500+'
value = '张三,500,语文:90,数学:90,英语:80'
# 将成绩段和学生信息作为key-value输出
yield key, value
```
Reducer则需要将相同成绩段的学生信息合并在一起,并将结果输出到不同的文件中。可以使用MultipleOutputs来实现多个输出文件。
以下是示例代码:
```python
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import TextProtocol
from mrjob.protocol import RawValueProtocol
class ScoreDistribution(MRJob):
# 将Mapper的输出设置为文本格式
OUTPUT_PROTOCOL = TextProtocol()
def steps(self):
return [
MRStep(mapper=self.mapper,
reducer_init=self.reducer_init,
reducer=self.reducer,
reducer_final=self.reducer_final),
MRStep(reducer=self.output)
]
def mapper(self, _, line):
# 获取每行记录的总成绩
_, _, _, _, _, score1, score2, score3 = line.strip().split(',')
total_score = int(score1) + int(score2) + int(score3)
# 根据总成绩将学生信息分到不同的成绩段中
if total_score >= 500:
key = '500+'
elif total_score >= 400:
key = '400+'
elif total_score >= 300:
key = '300+'
else:
key = 'other'
# 输出成绩段和学生信息
yield key, line.strip()
def reducer_init(self):
# 初始化MultipleOutputs
self.outputs = {}
def reducer(self, key, values):
# 将相同成绩段的学生信息合并在一起
result = '\n'.join(values)
# 存储到不同的输出文件中
if key == '500+':
self.outputs['500'] = result
elif key == '400+':
self.outputs['400'] = result
elif key == '300+':
self.outputs['300'] = result
else:
self.outputs['other'] = result
def reducer_final(self):
# 关闭MultipleOutputs
for output in self.outputs.values():
output.close()
def output(self, _, __):
# 输出到不同的文件中
for key, output in self.outputs.items():
yield key, output
if __name__ == '__main__':
ScoreDistribution.run()
```
在执行MapReduce任务时,需要使用hadoop命令来指定输出文件名:
```bash
$ hadoop jar /path/to/hadoop-streaming.jar \
-input /path/to/input \
-output /path/to/output \
-mapper "python score_distribution.py" \
-reducer "python score_distribution.py" \
-file score_distribution.py \
-cmdenv mapreduce.job.reduces=4 \
-cmdenv mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-cmdenv mapreduce.partition.keycomparator.options="-k1,1nr" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf mapreduce.job.name="Score Distribution" \
-jobconf mapreduce.job.maps=4 \
-jobconf mapreduce.job.reduces=4 \
-jobconf mapreduce.map.memory.mb=2048 \
-jobconf mapreduce.reduce.memory.mb=2048 \
-jobconf mapreduce.task.io.sort.mb=512 \
-jobconf mapreduce.task.timeout=600000
```
其中,`-cmdenv mapreduce.job.reduces=4`表示使用4个Reducer进行计算,`-cmdenv mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator`表示按照key进行排序,`-cmdenv mapreduce.partition.keycomparator.options="-k1,1nr"`表示按照key的第一个字段进行倒序排序,`-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner`表示使用KeyFieldBasedPartitioner进行分区。
阅读全文