3. System architecture
Fig. 1 depicts our cluster model, in which a fixed number of multi-core workstations are connected by a high speed net-
work. We assume the communication costs between any two nodes are the same. Thus, the multi-core cluster can be viewed
as a two-level hierarchical system. One is the distributed memory level which consists of the cluster nodes. Another is the
shared memory level which consists of multiple cores within a node. To exploit both inter-node parallelism and intra-node
parallelism on these two levels, we propose a hierarchical task scheduling scheme in which tasks are scheduled with differ-
ent approaches on these two levels in order to achieve dynamic load balancing. Briefly, work-stealing is used for load bal-
ancing inside a node and an adaptive approach that supports both work-sharing and work-stealing is used for load balancing
among the cluster nodes.
Assume all the program and data files have been deployed on each node in Fig. 1. The user logs on to a node to start the
program. Then this node is viewed as a master node. The master node could be any node in the cluster or a specific node,
such as a resource manager node.
2
A global scheduler (GS) works on the master node, which is responsible for inter-node task
scheduling, including the initial partitioning and the redistribution of tasks between worker nodes. The novel techniques used in
the GS distinguish our system from the existing work-stealing systems [14,15,20].
There is no initial partitioning phase in traditional work-stealing schemes. Under traditional work-stealing, one initial
task runs on a processing element (PE), new tasks are spawned continually and stolen by idle PEs during the execution.
In shared memory systems, a spawned task can migrate to an idle thread very quickly. The absence of initial partitioning
just increases a few task migrations and the cost of these migrations is negligible. In distributed memory systems, however,
task migration has much higher overhead which is no longer negligible. Initial partitioning could balance the load statically
before the parallel execution of the tasks and thus reduce the frequency of dynamic task stealing across the nodes. An ideal
partitioning could even make the inter-node task migrations never happen. Therefore, we adopt an initial partitioning phase
in our system. The global scheduler partitions the parallel portions of an application adaptively in this phase, according to the
pattern of task parallelism. The detail of our initial partitioning is described in the next section.
After initial partitioning, the tasks which are ready to run will be scheduled onto the cluster nodes and be executed in
parallel on each multi-core node. In Fig. 1, every node, including the master node, has a local scheduler (LS) which is respon-
sible for intra-node task scheduling and cooperates with the GS. When LS receives a task from GS, a classical work-stealing
scheme is applied on the shared memory multi-core node to split and schedule subtasks.
The LS keeps track of the amount of tasks on the node. According to this value, the LS determines whether an inter-node
task migration is necessary.
(1) If the amount of tasks is greater than a threshold, the LS sends a work-sharing request message to the GS and hopes to
transfer a task in the local task queues to another cluster node which is lightly loaded.
(2) If all the local task queues become empty, the LS sends a work-stealing request message to the GS and hopes to help
other worker nodes by stealing a task from a heavily loaded node.
Whether the LS decides to push a task to or steal a task from another node, the target node (victim) needs to be deter-
mined before the task migration. As mentioned in the above section, random victim selection is optimal for shared memory
systems, but it is inefficient when applied to distributed memory systems. In our system, the victim is not randomly selected
by the LS, but determined by the GS. As shown in Fig. 1, when the GS receives a work-sharing or work-stealing request, it
selects the most lightly loaded node as the victim for work-sharing request or the busiest node as the victim for work-
stealing request. Then the GS notifies the victim to migrate a task between it and the requester. Thus, the task migration
among the cluster nodes is centralized controlled by the GS. To support such centralized control, the GS needs the real-time
information of tasks on all the nodes. The information includes task sizes and task migration costs. However, such informa-
tion is neither cheap nor easy to get. A simplified approach is using the number of tasks to measure the workload on a node.
In our implementation, the GS maintains a task counter for each worker node. Each node periodically updates the task coun-
ter with the number of tasks existing in the local task queues of the node, by sending a message to the GS. The task counters
are used for (1) determining whether a work-sharing or work-stealing operation should be conducted or not, (2) selecting
the victim, and (3) detecting the global termination. The details of the implementation can be found in Section 5.
The use of GS does not mean that the system scalability is limited. On the contrary, our GS/LS design is suitable for build-
ing a scalable system. The reasons are as follows. First, tasks are not transferred through the GS, but transferred between two
LSs directly. Only a few short messages are exchanged between GS and LS (see the implementation section). Second, as the
number of the cluster nodes increases, we can deploy multiple GSs on the system to construct a hierarchical architecture to
achieve scalability. Each GS controls a limited number of LSs and the GSs communicate with each other or with an upper
level scheduler. Moreover, the multi-level schedulers can be adapted to the topology of the system architecture to improve
data locality.
2
In this case, our framework can be improved by utilizing the information of available resources and real-time loads, which is obtained from the resource
manager.
614 Y. Wang et al. / Parallel Computing 40 (2014) 611–627