Spark调优多线程并行处理任务实现方式调优多线程并行处理任务实现方式
主要介绍了Spark调优多线程并行处理任务实现方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作
具有一定的参考学习价值,需要的朋友可以参考下
方式方式1:
1. 明确明确 Spark中中Job 与与 Streaming中中 Job 的区别的区别
1.1 Spark Core
一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)
一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算
Job在spark里应用里是一个被调度的单位
1.2 Streaming
一个 batch 的数据对应一个 DStreamGraph
而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作
每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job
2. Streaming Job的并行度的并行度
Job的并行度由两个配置决定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job
所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念
这些 Job 由 jobExecutor 依次提交执行
而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job
这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 决定了向 Spark Core 提交Job的并行度
提交一个Job,必须等这个执行完了,才会提交第二个
假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core
Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)
默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job
虽然如此,如果资源够两个job运行,还是会并行运行两个Job
Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:
有延时发生了,batch无法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源
Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。
方式方式2: