没有合适的资源?快使用搜索试试~ 我知道了~
基于任务的分布式系统的高效容错集体通信
641→Hoplite:基于任务的分布式系统的高效容错集体通信庄思远1,李卓涵1,李丹阳卓2王颖1梁伟1罗伯特西原1菲利普莫里茨1离子斯托伊卡11加州大学伯克利分校2杜克大学摘要基于任务的分布式框架(例如,Ray,Dask,Hydro)在包含异步和动态工作负载的分布式应用程序中越来越受欢迎,包括异步梯度下降,强化学习和模型服务。随着越来越多的数据密集型应用程序运行在基于任务的系统之上,集体通信效率已成为一个重要的问题。不幸的是,传统的集体通信库(例如,MPI、Horovod、NCCL)是不合适的,因为它们要求在运行之前知道通信调度,并且它们不提供容错。我们设计并实现了Hoplite,一个有效的和容错的集体通信层的任务为基础的分布式系统。我们的关键技术是计算数据传输时间表的飞行和执行的时间表有效地通过细粒度的流水线。与此同时,当一个任务失败时,数据传输计划会快速调整,以允许其他任务继续前进。我们将Hoplite应用于一个流行的基于任务的分布式框架Ray。我们表明,Hoplite可以分别将异步随机梯度下降、强化学习和服务于传统集体通信难以有效执行的机器学习模型的集成速度提高7.8倍、3.9倍和3.3倍。CCS概念• 计算机系统组织独立和容错系统和网络;·计算方法学→分布式计算方法。关键词集体通信,分布式系统ACM参考格式:Siyuan Zhuang,Zhuohan Li,Danyang Zhuo,Stephanie Wang,EricLiang,Robert Nishihara,Philipp Moritz,Ion Stoica.2021年Hoplite:Efficient and Fault-Tolerant Collective Communication for Task-BasedDistributed Systems. 在ACM SIGCOMM 2021会议(SIGCOMM '21),2021年8月23日至27日,虚拟活动,美国。ACM,纽约,纽约,美国,16页。网址://doi. org/10. 1145/3452296。3472897允许制作部分或全部本作品的数字或硬拷贝供个人或课堂使用,无需付费,前提是复制品不以营利或商业利益为目的制作或分发,并且复制品在第一页上带有此通知和完整的引用。必须尊重本作品第三方组件的版权对于所有其他用途,请联系所有者/作者。SIGCOMM©2021版权归所有者/作者所有。ACM ISBN978-1-4503-8383-7/21/08。网址://doi. org/10. 1145/3452296。34728971介绍基于任务的分布式系统(例如, Ray [30],Hydro [19],Dask [44],CIEL [32])在开发和运行包含异步和动态计算和通信模式的分布式应用程序方面越来越受欢迎,包括异步随机梯度下降(SGD),强化学习(RL)和模型服务。如今,许多顶级技术公司已经开始采用基于任务的分布式框架来实现其分布式应用,例如Intel、Microsoft、Ericsson和JP。Morgan.例如,蚂蚁金服使用基于任务的分布式系统来运行其在线机器学习管道,并为数十亿用户提供金融交易服务。在基于任务的系统之上构建分布式应用程序有两个主要好处第一,它易于表达随机和动态的计算和通信模式。基于任务的系统实现了一个动态任务模型:调用者可以动态地调用一个任务队列,它立即返回一个对象未来,即对最终返回值的引用 通过将future作为参数传递,调用者可以指定另一个任务 ,该任务甚至在完成之前使用的返回值。基于任务的系统负责调度工人执行任务,并在相应的工作器之间传送to的结果。第二,基于任务的系统透明地提供容错。当任务失败时,基于任务的系统快速重建失败任务的状态并恢复执行[49,52]。行为良好的任务不需要回滚,因此故障恢复的成本很低。随着越来越多的数据密集型工作负载正在转移到基于任务的分布式系统,支持高效的集体通信(例如,广播,减少)变得至关重要。考虑一个RL应用程序,其中训练器进程向一组代理广播策略,这些代理使用此策略执行一系列模拟。在不支持集体广播的情况下,训练器进程需要向每个代理发送相同的策略,这在发送方侧导致网络瓶颈。高效的集体通信是HPC社区和分布式数据并行训练中的一个众所周知的问题。今天存在许多集体通信库,例如,开放-MPI[16]、MPICH [31]、Horovod [47]、Gloo [14]和NCCL [34]。然而,传统的集体通信实现存在两个局限性,这使得它们不适合动态的基于任务的系统。首先,使用传统集体通信的分布式应用程序必须在运行前指定通信模式。这允许库计算静态和有效的数据传输调度(例如,ring-allreduce)。例如,对于同步平等贡献。SIGCOMMSiyuan Zhuang等642在分布式数据并行训练中,应用程序指定所有工作者参与AllReduce通信,每训练一轮一次。然而,在基于任务的系统中,参与集体通信的任务或数据对象的集合在运行时之前是未知一种方法是等待,直到所有参与的任务和对象都准备好,然后计算静态数据传输调度。不幸的是,这种设计错过了在整个参与者集合准备好之前取得部分进展的机会,这对于现代异步应用程序的性能至关重要,例如,分布式RL其次,由于集体通信的同步特性,一个进程失败可能导致其余进程挂起。现有解决方案将恢复留给应用程序。典型的方法是周期性地检查应用程序的状态(例如, 每小时),并且当进程失败时,整个应用程序回滚到先前的检查点并重新启动。不幸的是,这对于大规模异步应用程序来说可能是昂贵的,并且不能利用在同一个集体通信组中仍然存活的任务作为失败任务的能力来取得进展。这就提出了一个重要的问题:我们如何将集体通信的效率带到动态和异步的基于任务的应用程序中?此设置有两个独特的要求。首先,必须允许应用动态地指定集体通信的参与者(即,在运行时)。其次,集体通信实现必须是异步的。这将允许任务取得进展,即使同一通信组中的其他任务失败。我们设计并实现了Hoplite,一个有效的和容错的集体通信层的任务为基础的分布式系统。Hoplite结合了两个关键思想:(1)Hoplite在任务和对象到达时实时计算数据传输调度,并且Hoplite使用细粒度流水线有效地执行数据传输调度 即使只有一小部分参与者准备好了,集体交流也能取得重大进展。(2)当检测到故障时,重装步兵动态地调整数据传输调度,以减轻故障任务对集体通信的影响。这允许实时任务取得进展。失败的任务可以在重新启动后重新加入集体通信,完成通信。我们将Hoplite应用于流行的基于任务的框架Ray [30]。这使我们能够评估Ray上的各种现有工作负载。我们的评估表明,Hoplite可以将异步SGD加速高达7.8倍,RLlib [27]上的两种流行的RL算法(IMPALA [ 13 ]和A3C [29])分别加速高达1.9倍和3.9倍,并将Ray Serve [42]上ML模型集合的服务吞吐量时间提高高达3.3倍,只有最小的代码更改和故障恢复中可忽略的额外延迟本文的贡献如下一种用于数据传输的分布式调度方案,它为动态任务系统提供有效的广播和减少原语。一种细粒度的流水线方案,可在位于同一节点或不同节点上的任务之间实现低延迟数据传输。deftrain(policy,num_agents,num_steps,batch_size):#并行启动一些rollouts。grad_ids =[rollout.remote(policy)]for_inrange(num_agents)]for_inrange(num_steps):for_inrange(batch_size):#等待第一个rollout完成。ready_id = ray.wait(grad_ids)#使用一个梯度更新策略。策略+= ray.get(ready_id)/batch_size#从剩余的渐变中移除这个渐变grad_ids.remove(ready_id)#一旦一批代理完成,将更新的#策略广播给完成的代理并开始新的部署。for_inrange(batch_size):grad_ids.append(rollout.remote(policy))返回策略(a) 动态任务(Ray)。for_in范围(步数):+#减少一批渐变+reduced_grad_id,unreduced_grad_ids =\+ray.reduce(grad_ids,num_return=batch_size,op=ray.ADD)+#使用平均梯度+策略+= ray.get(reduced_grad_id)/batch_size+#更新剩余梯度+grad_ids = ray.get(unreduced_grad_ids)-for_inrange(batch_size):-#等待第一个rollout完成。-ready_id = ray.wait(grad_ids)-#使用一个梯度更新策略。-策略+= ray.get(ready_id)/batch_size-#从剩余的渐变-grad_ids.remove(ready_id)#一旦一批代理完成,将更新的#策略广播给完成的代理并开始新的部署。for_inrange(batch_size):grad_ids.append(rollout.remote(policy))(b) 动态任务+集体通信(Ray + Hoplite).图1:学习策略的典型RL算法的伪代码(a)与Ray一起执行动态任务。 每个训练循环等待单个代理完成,然后异步更新当前策略。 将新策略广播给一批完成的代理。(b)修改(a)以启用重装步兵。每一步都从代理的子集减少梯度,更新当前策略,广播新策略。用于调整广播和减少操作的数据传输的调度的算法,其允许实时任务在参与集体通信的其他任务失败时取得进展,并且稍后允许那些失败的任务重新加入。我们展示了Hoplite在流行的基于任务的分布式系统上使用几个应用程序的好处,包括异步SGD,RL和服务ML模型的集合。2背景我们首先描述了基于任务的分布式系统和他们的好处,适合开发分布式应用程序。然后,我们描述了将有效的集体沟通整合到其中的挑战。2.1基于任务的分布式系统动态任务编程模型[4,19,30,32,44]允许应用程序表达异步和动态计算····Hoplite:基于任务的分布式系统的高效容错集体通信SIGCOMM643和沟通模式。例如,图1a显示了如何实现异步RL算法,该算法每次更新一个代理结果,并根据可用性顺序动态选择它们一旦应用了一批代理结果,将结果策略发送到每个完成的代理,以开始下一轮的推出。这允许具有快速推出的代理不需要等待具有缓慢推出的工作者(图2a)。今天,大多数强化学习算法[13,29]利用这种类型的异步执行来进行有效的训练.为了支持这种类型的异步通信,基于任务的分布式系统依赖于分布式对象存储来在任务之间传输对象对象存储由一组节点组成,每个节点缓冲一组(可能重叠的)应用程序对象。每个节点为多个工作者服务,这些工作者可以通过共享内存直接读取和写入其本地节点中的对象一1234(a) 动态任务(Ray)1234发送器任务将输出存储到对象存储器中并退出,从而允许它释放关键资源(例如,CPU,GPU,内存)之前,接收器的任务甚至被调度。当接收方任务准备就绪时,它们直接从分布式对象存储中获取对象作为标准[30,32],对象存储强制对象不变性,并使用分布式对象目录服务将每个对象映射到其节点位置集。此外,基于任务的分布式系统通过重建失败的任务来支持快速故障恢复[49,52]。行为良好的任务不会回滚以保持低恢复成本。然而,如果在上面的RL示例中梯度和模型足够大,则基于任务的分布式系统会因低效通信而产生显著的开销。例如,图2a中的训练器(代理2)可能成为网络吞吐量瓶颈,因为它必须接收梯度并且还单独地从/向每个代理发送新策略。当代理数量增加时,这个瓶颈变得更加严重。2.2高效的集体通信在HPC社区和分布式数据并行SGD中具有众所周知的解决方案。许多传统--现有的集体通信库包括 Gloo [14] ,Horovord [47] ,OpenMPI[16],MPICH [31]和NCCL [34]。它们可以使用有效的数据传输调度(例如,Ring-AllReduce、Tree-Broadcast)以减轻分布式应用中的通信瓶颈。使用传统的集体通信库有两个应用需求。首先,通信模式必须在运行时之前静态定义。 这对于具有大容量同步并行模型的应用程序来说很容易。例如,在同步数据并行SGD中,所有工作者在其分区的训练数据集上进行计算,并使用allreduce同步模型参数第二,当任何一个worker失败时,所有参与集体通信的worker都会挂起,应用程序负责容错。 对于HPC应用,这通常通过周期性地对整个应用设置检查点来解决(例如,每小时),并且当进程失败时,整个应用程序回滚到检查点并重新执行。不幸的是,这两个假设从根本上与基于任务的分布式系统不兼容。首先,任务由基于任务的系统的调度程序动态调用。这意味着它(b)动态任务+集体通信。(Ray + Hoplite).图2:分布式RL算法的执行。每一行是一个代理。方框表示计算,箭头表示数据传输。1 - 4是由试剂产生的梯度。(a)动态任务(Ray)。立即应用。在广播之前,对当前策略应用一批三个梯度(b)Hoplite中的动态任务,但具有有效的集体通信 为了减少代理2处的网络瓶颈,代理3部分地减少梯度 3和 4(黑匣子),并且代理3在广播期间将策略发送到代理4(黑点)。当集体通信被触发时,只有一小部分参与任务被调度是可能的例如,在现有的任务系统中,广播是隐式的:一组任务获取相同的对象。当仅调度接收器的子集时,不可能在不知道总共有多少接收器以及接收器将在何处和何时被调度的情况下构建静态广播树。因此,基于任务的系统的集体通信层应该在运行时基于任务和对象到达来调整数据传输调度。其次,快速故障恢复是基于任务的系统的重要设计目标[49,52],因为许多异步工作负载具有严格的SLO要求(例如,模型服务)。在现有的任务系统中,这是通过仅重建和重新执行失败的任务来完成的。如果使用传统的集体通信库,失败的任务会导致其余参与任务挂起。因此,一个集体基于任务的系统的通信层必须是容错的:当任务失败时,允许行为良好的任务取得进展,并允许失败的任务在恢复后重新加入集体通信3设计Hoplite是一个高效的和容错的集体通信层的任务为基础的分布式系统。 在高层次上,Hoplite使用了两种技术:(1)为reduce和broadcast进行数据传输的分散式容错协调,以及(2)跨节点和任务与对象存储之间的对象传输的流水线。我们首先使用Hoplite的核心API展示一个发送-接收示例工作流(表1)。然后,我们描述了HopliteG1G3G4政策推出推出推出推出个...推出适用适用适用推出推出G1G4G3+G4政策…SIGCOMMSiyuan Zhuang等644defapplication():x_id = send.remote()recv.remote(x_id)节点1节点2图3:2节点集群(N1和N2)上的发送和接收动态任务程序示例。基于任务的系统由每个物理节点的工人池和调度器组成。 Hoplite由每个节点一个本地对象存储和一个全局对象目录服务组成,该服务分布在物理节点上。接收方驱动的协调方案,以实现高效的对象传输。3.1Hoplite我们的示例创建了一个返回x_id(一个future)的send任务然后将其传递给recv任务。在Hoplite中,我们使用ObjectID来表示未来或对对象的引用。在执行过程中,应用程序首先将任务提交给任务调度程序。然后,调度器选择一个工作者来执行每个任务(步骤1,图3),例如,根据资源可用性。根据应用程序,recv在得到send返回的值之前不能开始执行。注意,基于任务的系统不要求调度器以特定位置或顺序调度任务,即,可以在发送之前调度RECV任务。在步骤2中,任务工作者调用Hoplite来存储和检索对象。在节点1上,sendworker返回一个具有唯一IDx_id的对象。这个对象必须被存储,直到recvworker接收到它。因此,发送worker在Hoplite上调用Put(x),它将对象从worker复制到本地对象存储中(图3中N1上的第2这释放了worker来执行另一个任务,但在进程之间产生了额外的内存副本来存储对象。同时,在节点2上,recvworker必须检索send返回的对象。为此,它在Hoplite上调用Get(x),该函数会阻塞,直到请求的对象被复制到worker 在步骤3中,Hoplite使用对象目录服务来发现对象位置并协调数据传输,以满足客户端的Put和Get请求。在本例中,节点1上的Hoplite对象存储将对象x的新位置发布到目录中(N1上的步骤3,图3)。同时,在节点2上,Hoplite对象存储查询目录中x的位置(N2上的步骤3,图3)。Hoplite 每个分片将一个ObjectID映射到当前节点位置集。当一个对象有多个位置时,direc- tory服务可以选择一个位置返回给客户端。的对象存储器还维护关于仅部分创建的对象的信息,以便于对象传输流水线(§3.3)。例如,在图3中,一旦调用Put(x),节点1上的对象存储就将其位置发布到对象目录,即使对象还没有完全复制到存储中。这允许节点1开始将对象发送到节点2,同时对象仍在从发送工作进程复制。最后,在步骤4中,Hoplite对象存储节点执行对象目录对节点的回复所指定的数据传输调度2. 节点1是x的唯一位置,因此节点2从节点1请求并接收副本(步骤4)。然后,节点2将对象从其本地存储复制到recv worker(图3中的步骤5),recvworker可以再次通过网络与副本进行流水线操作。Hoplite提供了两种有效的集体通信方案。Hoplite通过对象目录服务和worker之间的协调来实现有效的广播(第3.4节),而不需要显式的原语。 对于reduce,Hoplite向基于任务的系统公开一个显式的Reduce调用。这是必要的,因为这让Ho-plite知道这些对象确实是可约的(即, 操作是可交换的和关联的)。 由于ObjectID是对象 值 可 能 尚 未 准 备 好 的未来, 因 此 Reduce 调 用 也 有 一 个num_objects输入,以防用户想要减少对象的子集,从而使Hoplite能够灵活地选择减少哪些num_objects对象。 图1b显示了如何修改RL示例以使用Hoplite。 这允许训练器有效地从一组动态代理中聚合梯度(图2b)。每当一个任务失败时,Hoplite会重新计算数据传输计划,以避免在集体通信中使用失败的任务,而所有其余的任务都可以继续前进(§3.5)。Ho-plite并没有改变基于任务的分布式系统容忍故障的方式。底层的基于任务的分布式系统可以使用其内置机制快速重建失败任务的状态[52]。一旦任务的状态被重建,任务就恢复。3.2对象目录服务对象目录服务为每个对象维护两个字段:(1)对象的大小,以及(2)位置信息。位置信息是节点IP地址和该节点上对象的当前进度的列表我们使用一个位来表示对象的进度:节点包含部分对象或完整对象。我们将两者都存储起来,这样部分对象副本就可以立即充当广播和归约(第3.4节)。Hoplite 同步位置查询阻塞,直到创建了相应的对象并且知道了位置。异步位置查询立即返回,对象定向服务将对象的任何未来位置发布给客户端。节点在两种情况下将对象位置写入对象目录服务:当本地客户端通过Put创建对象时,以及当对象从远程节点复制在每种情况下,节点通知对象目录服务两次:一次是在将要在本地存储中创建对象时,另一次是在完整对象准备就绪时。我们区分部分和11Put(x)5Get(x)4 4对象目录对象目录32对象存储X对象存储X32工人(recv)工人(发送)任务调度器重装步Hoplite:基于任务的分布式系统的高效容错集体通信SIGCOMM645←核心接口:描述BufferbufferGet(ObjectID object_id)从对象ID 获取对象缓冲区。Put(ObjectID object_id,Buffer buffer)创建一个具有给定对象ID和对象缓冲区的对象Delete(ObjectIDobject_id)删除具有给定对象ID的对象的所有副本。当对象不再使用时由任务框架调用Reduce(ObjectID target_object_id,int num_objects,从一组对象中创建具有给定对象ID的新对象{ObjectID source_object_id,.},ReduceOp操作)使用归约操作(例如,sum、min、max)。表1:核心重装步兵API。应用程序生成一个具有唯一字符串的ObjectID,并可以通过发送字符串来传递ObjectID完整的对象,以便在广播或reduce期间可以优先使用具有完整副本的对象存储节点(§3.4)。优化小对象。查询对象位置可能会导致获取小对象的过度延迟损失,并且在我们的用例中,计算有效对象传输调度的开销通常不值得用于小对象。因此,我们在对象目录服务中实现了一个快速路径。 对于小对象(<64KB),我们只需将它们缓存在对象目录服务中,当节点查询它们的位置时,对象目录服务直接返回对象缓冲区。与每个节点存储中的对象类似,缓存的对象在不再使用时必须由应用程序通过Delete调用释放3.3流水线Hoplite使用流水线来实现大型对象在进程之间和节点之间的 这是通过使接收器节点能够获取源节点中不完整的对象来实现的。如果创建对象的操作(无论是来自客户端的Put还是对象存储节点之间的副本)仍在进行中,则对象可能是不完整的。 为了能够获取不完整的对象,如前一节(第3.2节)所示,对象目录服务还维护不完整副本的位置。然后,当对象存储接收到Get操作时,它可以选择从具有不完整副本的存储请求对象。通过使用对象目录服务作为中介在节点之间进行管道化数据传输,也可以简单地对更高级别的集体通信原语进行管道化,例如reduce后跟广播(图2b)。 在reduce中,节点可以计算输入对象的子集的reduce,并同时将中间结果发送到下游节点。然后,下游节点可以通过在接收到中间结果时对中间结果进行计算来计算最终归约结果,并且同时将最终结果发送到已经被调度的任何广播接收器。广播接收机然后还可以同时将最终结果发送到任何其他广播接收机。同一节点上的任务工作者和本地存储之间的管道连接对于隐藏大型对象的Put和Get延迟也很重要(图3中的步骤2和5 原因是使用分布式对象存储需要两个额外的数据副本,而不是通过网络传输数据所需的最小副本。发送方任务工作线程必须复制到其本地存储,然后接收方本地存储也必须复制到其本地工作线程。我们的观察是,如果内存复制是异步的,则额外的内存复制延迟可以被网络传输所掩盖。当发送方任务调用Put时,Hoplite立即通知对象目录服务该对象已准备好转移。然后,接收方可以在整个对象被复制到发送方节点的本地存储之前获取该对象。接收器端的流水线机制是类似的 当接收器任务调用Get时,接收器任务在本地存储具有完整对象之前开始从本地存储复制对象。通过结合跨节点和节点内的流水线,Hoplite可以在发送方和接收方任务之间实现端到端的对象流,即使其间有多轮集体通信。对不可变get的优化 虽然Hoplite对象是不可变的,但接收器任务仍然在Get期间从其本地存储复制对象数据,以防稍后修改缓冲区。但是,如果它只需要对对象的读访问,那么Hoplite可以直接返回本地存储内的指针。 只读访问可以通过前端编程语言来实施,例如,C++中的const3.4接收者驱动的集体通信Hoplite的接收器驱动协调方案使用分布式协议优化数据传输。在Hoplite中,数据传输发生在两种情况下:任务调用Get以检索具有给定ObjectID的对象,或者任务调用Reduce以通过使用reduce操作减少一组其他对象来创建新对象(例如,和,最小值,最大值)。3.4.1广播. 在基于任务的分布式系统中,当位于多个节点上的一组任务想要从其创建者任务获得相同的对象时,发生广播。具体地,来自节点S的发送器任务创建具有Put的对象和一组接收器任务R1、R2、. 使用Get获取它。 对于与发送任务位于不同节点上的接收任务,其对应的接收节点将从发送节点的本地对象存储中获取对象到接收节点的本地对象存储中。 为了简化我们的方法的描述,我们假设发送方任务和接收方任务位于不同的节点上,并且使用发送方S和接收方R1、R2、. 也引用节点上的本地对象存储在基于任务的分布式系统中,广播是具有挑战性的,因为我们不知道任务,包括这些任务位于何处以及这些任务何时获取对象。 如果所有的接收者只是简单地从发送者那里获取对象,则性能将受到发送者的上行带宽的限制。传统的集体通信库可以生成一个静态树,其中根节点是发送节点,以缓解吞吐量瓶颈。Hoplite的接收器驱动协调方案的目标是实现类似的效果,但使用分散的协议。启发SIGCOMMSiyuan Zhuang等646+(c ')R3(d ')对象和本地对象都存储在存储相应对象的节点上。注意,在基于任务的分布式系统中,要归约的对象可以以任意顺序进行归约。如何有效地减少对象以适应动态对象创建比广播更具挑战性 广播是简单的,因为接收器可以从任何发送器获取对象,Hoplite因此具有更大的灵活性来适应数据传输调度。对于reduce,我们需要确保所有对象都被reduce一次且仅一次:当一个对象被添加到部分reduce结果中时,该对象不应该被添加到任何其他部分结果中。在Hoplite中,我们选择使用树结构的reduce算法,而问题是使用什么类型的树让���在基于任务的分布式系统中,如果没有集体通信的支持,每个节点将对象发送到图4:广播对象的示例(整数数组{5,1,0})当接收器(R1-R3)在不同的时间到达时,从Hoplite中的发送器(S)(a)-(d) 显示广播过程无故障。(单一接收器假设��� 网络延迟为,网络带宽为,对象大小为 。这种方法运行时间为10+10分钟。延迟项是由于网络延迟,当R1在(b)之后失败时的广播过程。通过在使用高容量节点作为广播树中的中间节点的对等系统中的应用级广播[5,6],我们使用比其余节点更早接收对象的接收者作为中间节点来构造广播树。当接收者R想要获取一个远程对象时,它首先检查该对象是否在本地可用,或者本地是否有对该对象的持续 如果是这样,接收器就一直等待,直到它得到完整的对象。这避免了创建循环对象依赖关系。否则,R查询对象目录服务以获取对象对象目录服务首先尝试返回一个带有完整副本的位置如果不存在,则对象目录服务返回保存部分副本的位置之一这使得部分对象也可以充当中间副本,但具有完整副本的位置更受欢迎。当位置查询回复时,R也会删除从目录返回的位置而延迟是由于接收方 这是一种特殊的树,它的根的度数是 。 当对象尺寸非常小时(即,���可以忽略不计),这种树的性能是最好的为了缓解接收端的带宽瓶颈,我们可以将这种 二叉树推广为 二叉树。当我们使用二叉树时,总的运行时间是1000秒。���������由于带宽限制,它减少了延迟,但由于树的高度增长到log���,因此会导致额外的延迟 。如果对象非常大(即,��� 我们可以设置=1。���这意味着所有节点都在一个链,它的运行时间是1000+1000。请注意,我们只需要 在传输对象的实际内容时使用incurge,因为我们使用细粒度的流水线,即, 中间节点将部分缩减的对象发送到下一个节点。正如我们在这里可以看到的,最佳选择 取决于网络特性,对象的大小和参与者(对象)的数量。换句话说,我们选择 最小化总延迟:目录作为一个位置与部分副本,以使流水线。一旦.如果=1,则为+返回到对象目录服务,并将其自身标记为具有完整副本的位置。这确保了对于每个对象,一个节点一次只能发送给一个接收者,从而缓解了任何单个节点的瓶颈。图4显示了Hoplite中的广播场景示例在图4a中,第一接收方R1开始从发送方S获取对象。 在图4b中,S仍在向R1发送,因此当第二个接收方R2到达时,它不会出现在对象目录中。因此,R2从R1获取对象,即部分副本。 在图4c中,R1已完成接收,但仍在向R2发送。然后,对象目录包含S和R2,分别作为完整和部分位置。在图4d中,R3查询对象目录,它选择S而不是R2作为发送者,因为S有一个完整的对象。3.4.2减少。 Reduce发生在基于任务的分布式系统中的任务想要获得简化对象时(例如, 求和或最大对象)。在Hoplite中,这通过Reduce调用来实现与广播类似,我们假设每个要还原的对象位于单独的节点上,并且我们使用R1,R2,.代表log+否则。在运行时,Hoplite将自动选择最佳的配置,基于对这三个因素的经验测量树的拓扑结构确定后,我们需要将节点分配到树中。在这里,我们希望允许Reduce即使在对象的子集上也能取得显著的进展。要做到这一点,我们分配到达的对象与广义版本的有序树遍历。 对于一棵 -nary树,对于每个节点,我们遍历第一个孩子,节点本身,第二个孩子,第三个孩子,...,和第三个孩子。图5a示出了用二叉树减少6个对象的示例。请注意,尽管MPI也支持tree-reduce,但我们的方法完全不同:MPI的树是静态构建的,而我们的树是动态构建的,考虑了对象到达的顺序。如果任务只想减少对象的子集(即,num_object小于Reduce中源对象列表的大小),当树中存在num_object对象时,树构造过程停止。例如,如果任务希望减少10个对象中的6个R251 0510R2510数据传输完成后,接收方将发送方((一)S510R1510S510R1510R2510(一)(b)第(1)款S510R1510S510R1510R2510R3510R2510Hoplite:基于任务的分布式系统的高效容错集体通信SIGCOMM64712 3R3·20 1R5·22 312 3R3·20 1R5·22 3−R4010R40 106897 78R2121R6011R2121R7210R6011445234445534234R1(a) 减少树R1(b) Reduce Tree with failure.图5:reduce的例子,其中对象以R1,R2,.的顺序到达R6. 每个节点顶部的数字(以及叶节点中的数字)表示要减少的对象,绿色块表示准备好的对象的部分。每个节点底部的数字表示缩减的结果,黄色块表示已缩减的对象的分数每个中间节点负责减少以它为根的子树(a)一个示例reduce树由6个对象组成。(b)R2失败后重建的reduce树则最早到达的6个对象在如图5a所构造的归约树中。应用程序还可以指定递减的输入,即:通过传递一个Reduce操作的ObjectID结果作为后续Reduce操作的输入。组合Reduce操作的数据传输将自然地组合在一起。特别是,一旦第一个Reduce输出部分就绪,它将被添加到对象目录服务,在那里它将被下游的Reduce协调器发现。然后,第一个输出可以流到下游的Reduce中。3.4.3AllReduce。 AllReduce是一种同步集体通信操作,可用于同步数据并行训练。优化allreduce不是我们的设计目标:人们通常在专门的分布式系统上进行同步数据并行训练,这些系统针对批量同步工作负载进行了优化(例如,TensorFlow [1],PyTorch[37]),而不是基于任务的分布式系统。在Hoplite中,开发人员可以通过合并reduce和broadcast来表达allreduce。3.5容错集体通信在前面的小节中,我们假设没有任务失败。然而,在基于任务的分布式由于各种原因,包括(1)运行任务的节点崩溃,(2)节点耗尽可用存储器并且必须终止任务,以及(3)任务遇到运行时错误。基于任务的分布式系统已经支持对任务的透明容错[52],但是增加集体通信支持需要我们在参与集体通信时,当一部分任务失败时,动态地改变数据传输调度。这是因为我们不希望失败的任务阻塞集体通信,并且我们希望允许恢复的任务重新加入现有的集体通信。3.5.1广播. 当接收方在广播中检测到发送方失败时,接收方立即通过再次查询对象目录来定位另一个发送方新的发送方只需要发送接收方没有的剩余对象失败的任务可以透明地重新加入广播,因为失败的任务可以简单地在同一ObjectID上调用Get来获取对象。单纯地实现此特性将导致循环对象传输个依赖项例如,两个节点可能试图从彼此获取相同的对象。这是因为当一个接收者找到一个替代的发送者时,对象目录可以返回另一个节点的地址,该节点从接收者那里获取对象。为了避免循环依赖,如果发送者不是创建对象的原始任务,我们需要跟踪Get的依赖。如果发送方失败,接收方只有在找到另一个发送方的依赖关系不包括接收方本身时才能恢复。图4c R2恢复从S的获取,并且当R3到来时,R3可以从R2获取(图4d')。3.5.2减少。 当一个任务在Reduce过程中失败时,这个节点会被协调器立即从树中删除,并被下一个就绪的源对象替换。保证是,为了从源对象中减少源对象,只要至少���可以创建源对象(即, ������任务可能会失败),Reduce将自动返回。否则,当基于任务的底层系统的恢复机制重建了足够多的失败任务时,Reduce完成失败的树节点会导致其父节点、祖父节点和所有祖先节点清除简化对象。 在前面的示例中,图5b显示了R2发生故障后的适配树。如果任务Reduce 6 out of 10个对象,并且R2在R7到达后被恢复,则R7替换R2在树中的位置。(R7也可以是重新连接的R2)。R4必须清除所有当前减少的对象,因为最终结果应该是R1,R3,R4,.的Reduce结果R7 任何包含R2对象的中间结果都必须清除。总的来说,大多数日志������节点必须清除当前对象。4执行Hoplite的核心是用3957行C++实现的我们提供了一个C++和一个Python前端。Python前端使用645行Python和275行Cython实现我们构建Python前端,因为它更容易与Ray [30]和其他数据 处 理 库 ( 例 如 , Numpy [35] , TensorFlow [1] , PyTorch[37])。 Python前端和C++后端之间的接口与Hoplite的API相同(表1)。我们使用一组分布在节点上的gRPC [17]服务器进程来实现对象目录服务每个目录服务器可以将位置通知直接推送到对象存储节点。Hoplite中的每个对象存储节点都是一个gRPC服务器,SIGCOMMSiyuan Zhuang等6483.02.52.01.51.00.50.01.7 μs(a) 1 KB1086420(b) 1MB6543210(c) 1GB对于1 KB和1 MB的对象,OpenMPI比Hoplite快1.8倍和2.3倍。对于1 GB的对象,Hoplite比OpenMPI慢0.2% 雷和达斯克的速度明显慢了很多。OpenMPI是最快的,因为MPI知道要通信的进程的位置Ray、Dask和Hoplite需要通过对象目录服务来定位对象。重装步兵的表现优于雷和达斯克,(1)Hoplite将对象内容存储在对象目录服务中,用于小于64KB的对象(§3.2),(2)Hoplite使用流水线(§3.3)来减少端到端延迟。Ray不支持流水线,所以图6:点对点数据通信在Hoplite、OpenMPI、Ray和Dask。我们还包括理论上的最佳RTT(即,传输的总字节数除以带宽)。对象在来自远程节点的传送请求(例如,在获取期间),节点建立到远程节点的直接TCP连接,并通过TCP连接推送对象缓冲区。在我们的实验中,我们观察到在树缩减算法中设置 为1,2或对于我们的应用程序来说已经足够了。 当一个任务调用Reduce时,Hoplite 从1、2中选择, 并根据网络延迟 、带宽和对象大小 。附录B显示了不同选择的效果 。5评价我们首先在一组流行的传统网络原语上对Hoplite进行微
下载后可阅读完整内容,剩余1页未读,立即下载
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功