没有合适的资源?快使用搜索试试~ 我知道了~
首页尚硅谷Hadoop(MapReduce)V2.0:MapReduce编程框架及优缺点 - Java大数据前端python人工智能资料下载。
尚硅谷Hadoop(MapReduce)V2.0:MapReduce编程框架及优缺点 - Java大数据前端python人工智能...
需积分: 0 4 下载量 48 浏览量
更新于2023-11-24
收藏 65.9MB DOCX 举报
Hadoop(MapReduce)是尚硅谷大数据技术中的一个分布式运算程序的编程框架。它能够将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并运行在一个Hadoop集群上。MapReduce的优点包括易于编程、良好的扩展性和高容错性。它的编程接口简单,用户可以用相同的方式编写分布式程序和串行程序,非常流行。同时,当计算资源不足时,可以通过增加机器来扩展计算能力。另外,MapReduce具有很高的容错性,如果集群中某台机器出现故障,可以将该机器上的计算任务转移到其他机器上完成,不会导致任务失败,并且这个过程完全由Hadoop内部自动完成。
资源详情
资源推荐
尚硅谷大数据技术之 Hadoop(MapReduce)
—————————————————————————————
更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
(2)编写 Mapper 类
package com.atguigu.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowCountMapper extends Mapper<LongWritable,
Text, Text, FlowBean>{
FlowBean v = new FlowBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context
context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");
// 3 封装对象
// 取出手机号码
String phoneNum = fields[1];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length -
3]);
long downFlow = Long.parseLong(fields[fields.length -
2]);
k.set(phoneNum);
v.set(downFlow, upFlow);
// 4 写出
context.write(k, v);
}
}
(3)编写 Reducer 类
package com.atguigu.mapreduce.flowsum;
import java.io.IOException;
尚硅谷大数据技术之 Hadoop(MapReduce)
—————————————————————————————
更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowCountReducer extends Reducer<Text,
FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Context context)throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
// 1 遍历所用 bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}
// 2 封装对象
FlowBean resultBean = new FlowBean(sum_upFlow,
sum_downFlow);
// 3 写出
context.write(key, resultBean);
}
}
(4)编写 Driver 驱动类
package com.atguigu.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowsumDriver {
public static void main(String[] args) throws
IllegalArgumentException, IOException,
ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputflow",
"e:/output1" };
// 1 获取配置信息,或者 job 对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的 jar 包所在的本地路径
job.setJarByClass(FlowsumDriver.class);
尚硅谷大数据技术之 Hadoop(MapReduce)
—————————————————————————————
更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网
// 2 指定本业务 job 要使用的 mapper/Reducer 业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定 mapper 输出数据的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最终输出的数据的 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定 job 的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new
Path(args[1]));
// 7 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,
提交给 yarn 去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
第 3 章 MapReduce 框架原理
3.1 InputFormat 数据输入
MapReduce的数据流
Input Output
Mapper Reducer
InputFormat
Shuffle
OutputFormat
MapTask
ReduceTask
3.1.1 切片与 MapTask 并行度决定机制
1.问题引出
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
思考:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数
据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些
尚硅谷大数据技术之 Hadoop(MapReduce)
—————————————————————————————
更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网
因素影响了 MapTask 并行度?
2.MapTask 并行度决定机制
数据块:Block 是 HDFS 物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行
存储。
DataNode1
DataNode2
DataNode3
ss.avi
0
数据切片与MapTask并行度决定机制
MapTask
0
128M
128M 256M
256M 300M
MapTask
MapTask
300M
128M
256M
100M
200M
2)每一个Split切片分配一个MapTask并行实例处理
3)默认情况下,切片大小=BlockSize
4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
100M
ss2.avi
DataNode4
100M
0
100M
200M
1、假设切片大小设置为100M
2、假设切片大小设置为128M
图 4-11 MapTask 并行度决定机制
3.1.2 Job 提交流程源码和切片源码详解
1.Job 提交流程源码详解,如图 4-8 所示
waitForCompletion()
submit();
// 1 建立连接
connect();
// 1)创建提交 Job 的代理
new Cluster(getConfiguration());
// (1)判断是本地 yarn 还是远程
initialize(jobTrackAddr, conf);
// 2 提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
Path jobStagingArea =
JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid ,并创建 Job 路径
JobID jobId = submitClient.getNewJobID();
尚硅谷大数据技术之 Hadoop(MapReduce)
—————————————————————————————
更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 路径写 XML 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 Job,返回提交状态
status = submitClient.submitJob(jobId,
submitJobDir.toString(), job.getCredentials());
Job提交流程源码解析
Configuration conf=new Configuration();
Job=job.getInstance(conf);
… …
Job.waitForCompletion(true)
Job.submit();
MR程序运行
在本地模拟器
yarn
JobSubmiter
Cluster成员
proxy
YarnRunner
LocalJobRunner
stagingDir
File://..../.staging
hdfs://..../.staging
jobid
file://..../.staging/jobid
hdfs://..../.staging/jobid
调用
FileInputFormat.ge
tSplits()获取切片
规划,并序列化成
文件
Job.split
Job.xml
将Job相关参数
写到文件
如果是yarnRunner,
还需要获取Job的
jar包
xxx.jar
file://..../.staging/jobid/job.split
hdfs://..../.staging/jobid/job.split
file://..../.staging/jobid/job.xml
hdfs://..../.staging/jobid/job.xml
hdfs://..../.staging/jobid/job.jar
图 4-8 Job 提交流程源码分析
剩余130页未读,继续阅读
wxb0cf756a5ebe75e9
- 粉丝: 22
- 资源: 283
上传资源 快速赚钱
- 我的内容管理 收起
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
会员权益专享
最新资源
- VMP技术解析:Handle块优化与壳模板初始化
- C++ Primer 第四版更新:现代编程风格与标准库
- 计算机系统基础实验:缓冲区溢出攻击(Lab3)
- 中国结算网上业务平台:证券登记操作详解与常见问题
- FPGA驱动的五子棋博弈系统:加速与创新娱乐体验
- 多旋翼飞行器定点位置控制器设计实验
- 基于流量预测与潮汐效应的动态载频优化策略
- SQL练习:查询分析与高级操作
- 海底数据中心散热优化:从MATLAB到动态模拟
- 移动应用作业:MyDiaryBook - Google Material Design 日记APP
- Linux提权技术详解:从内核漏洞到Sudo配置错误
- 93分钟快速入门 LaTeX:从入门到实践
- 5G测试新挑战与罗德与施瓦茨解决方案
- EAS系统性能优化与故障诊断指南
- Java并发编程:JUC核心概念解析与应用
- 数据结构实验报告:基于不同存储结构的线性表和树实现
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功