提交 5182881b 编写于 作者: R royxroy

Add cluster docs

上级 7318f63e
...@@ -66,6 +66,7 @@ Abstractions ...@@ -66,6 +66,7 @@ Abstractions
:caption: Parallel Training :caption: Parallel Training
parallel_training/overview.rst parallel_training/overview.rst
parallel_training/cluster.rst
parallel_training/setup.rst parallel_training/setup.rst
parallel_training/recommended_practice.rst parallel_training/recommended_practice.rst
......
Parl Cluster
============
Get Started
###########
Cluster Structure Overview
--------------------------
| There are three core concepts in a Parl cluster: master, worker and client.
- **Master:** The master node is the control center of a parl cluster, which
provides connections to workers and clients. It receives tasks from clients
and allocate vacant workers to run the tasks.
- **Worker:** A worker provides the cpu computation resources for the cluster.
It will initiate separate job subprocesses waiting for tasks from the master.
- **Client:** For each training program, there is a unique global client which
submits tasks to the master node.
.. image:: ./cluster_structure.png
:width: 600px
:align: center
Master
------
| There is only one master node in each parl cluster, we can start a master by
calling ``xparl start --port 1234`` with a assigned port number. This command
will also simultaneously start a local worker which connects to the new
master.
| **master socket** will receive all kinds of message from workers, clients or
cluster monitor, such as:
- A new worker connects the cluster. The master will start a heartbeat to check
worker's status, and worker's jobs will be added to master's job center.
- A new client connects the cluster: The master will start a heartbeat to check
client's status, and wait for client to submit a task.
- A worker updates its job buffer: The master will replace the new jobs for the
killed old jobs in the job center.
- Cluster monitor query cluster status: The master will return the detailed
status of the cluster (i.e. total cpu number, used cpu number, load average
...) to the monitor.
.. image:: ./master.png
:width: 600px
:align: center
Worker
------
| We can add more computation resources to a existed cluster by calling
``xparl --connect master_address`` command. This command will create a local
**Worker** object and then connect to the cluster.
| When we start a new worker, it will first initiate separate job subprocesses
in a job buffer. And then send the initialized worker to the master node.
| The worker will send a heartbeat signal to each job to check if it's still
alive. When the worker find a job subprocess is dead, it will drop the dead
job from the job buffer, start a new job and update worker information to
the master node.
.. image:: ./worker.png
:width: 600px
:align: center
Client
------
| We have a global client for each training program, it submits training tasks
to the master node. User do not need to interact with client object directly.
We can create a new global client or get an existed global client by calling
``parl.connect(master_address)``.
| The global client will read local python scripts and configuration files,
which will later be sent to remote jobs.
.. image:: ./client.png
:width: 600px
:align: center
Actor
-----
| **Actor** is an object defined by users which aims to solve a specific task.
We use ``@parl.remote_class`` decorator to convert an actor to a
remote class object, and each actor is connected to the global client.
.. code-block:: python
# connect global client to the master node
parl.connect(master_address)
@parl.remote_class
class Actor(object)
def __init__(self):
...
| When a decorated actor class object is instantiated, the global client will
submit a task to the master node. Then the master node will pick a vacant job
from the job center and send the job back to the client. The actor will make
a connection with the job and send local files, class definition and
initialization arguments to the job. Then the job will instantiate a local
actor in the job process.
| When the actor call a function, the real computation will be executed in the
job process by job's local actor.
.. image:: ./actor.png
:width: 600px
:align: center
...@@ -32,7 +32,7 @@ class Client(object): ...@@ -32,7 +32,7 @@ class Client(object):
connect to the same global client in a training task. connect to the same global client in a training task.
Attributes: Attributes:
submit_job_socket (zmq.Context.socket): A socket which submits job to submit_task_socket (zmq.Context.socket): A socket which submits job to
the master node. the master node.
pyfiles (bytes): A serialized dictionary containing the code of python pyfiles (bytes): A serialized dictionary containing the code of python
files in local working directory. files in local working directory.
...@@ -104,15 +104,15 @@ class Client(object): ...@@ -104,15 +104,15 @@ class Client(object):
def _create_sockets(self, master_address): def _create_sockets(self, master_address):
""" Each client has 1 sockets as start: """ Each client has 1 sockets as start:
(1) submit_job_socket: submits jobs to master node. (1) submit_task_socket: submits tasks to master node.
""" """
# submit_job_socket: submits job to master # submit_task_socket: submits job to master
self.submit_job_socket = self.ctx.socket(zmq.REQ) self.submit_task_socket = self.ctx.socket(zmq.REQ)
self.submit_job_socket.linger = 0 self.submit_task_socket.linger = 0
self.submit_job_socket.setsockopt( self.submit_task_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000) zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
self.submit_job_socket.connect("tcp://{}".format(master_address)) self.submit_task_socket.connect("tcp://{}".format(master_address))
self.start_time = time.time() self.start_time = time.time()
thread = threading.Thread(target=self._reply_heartbeat) thread = threading.Thread(target=self._reply_heartbeat)
thread.setDaemon(True) thread.setDaemon(True)
...@@ -121,12 +121,12 @@ class Client(object): ...@@ -121,12 +121,12 @@ class Client(object):
# check if the master is connected properly # check if the master is connected properly
try: try:
self.submit_job_socket.send_multipart([ self.submit_task_socket.send_multipart([
remote_constants.CLIENT_CONNECT_TAG, remote_constants.CLIENT_CONNECT_TAG,
to_byte(self.heartbeat_master_address), to_byte(self.heartbeat_master_address),
to_byte(socket.gethostname()) to_byte(socket.gethostname())
]) ])
_ = self.submit_job_socket.recv_multipart() _ = self.submit_task_socket.recv_multipart()
except zmq.error.Again as e: except zmq.error.Again as e:
logger.warning("[Client] Can not connect to the master, please " logger.warning("[Client] Can not connect to the master, please "
"check if master is started and ensure the input " "check if master is started and ensure the input "
...@@ -232,11 +232,11 @@ class Client(object): ...@@ -232,11 +232,11 @@ class Client(object):
job_heartbeat_socket.close(0) job_heartbeat_socket.close(0)
def submit_job(self): def submit_task(self):
"""Send a job to the Master node. """Send a task to the Master node.
When a `@parl.remote_class` object is created, the global client When a `@parl.remote_class` object is created, the global client
sends a job to the master node. Then the master node will allocate sends a task to the master node. Then the master node will allocate
a vacant job from its job pool to the remote object. a vacant job from its job pool to the remote object.
Returns: Returns:
...@@ -247,11 +247,11 @@ class Client(object): ...@@ -247,11 +247,11 @@ class Client(object):
while True: while True:
# A lock to prevent multiple actors from submitting job at the same time. # A lock to prevent multiple actors from submitting job at the same time.
self.lock.acquire() self.lock.acquire()
self.submit_job_socket.send_multipart([ self.submit_task_socket.send_multipart([
remote_constants.CLIENT_SUBMIT_TAG, remote_constants.CLIENT_SUBMIT_TAG,
to_byte(self.heartbeat_master_address) to_byte(self.heartbeat_master_address)
]) ])
message = self.submit_job_socket.recv_multipart() message = self.submit_task_socket.recv_multipart()
self.lock.release() self.lock.release()
tag = message[0] tag = message[0]
......
...@@ -30,8 +30,8 @@ class Master(object): ...@@ -30,8 +30,8 @@ class Master(object):
"""Base class for a master node, the control center for our cluster, which provides connections to workers and clients. """Base class for a master node, the control center for our cluster, which provides connections to workers and clients.
There is only one master node in each cluster, and it is responsible for There is only one master node in each cluster, and it is responsible for
receiving jobs from the clients and allocating computation resources to receiving tasks from the clients and allocating computation resources to
run the jobs. run these tasks.
To start a master node, we use the following xparl command line api: To start a master node, we use the following xparl command line api:
...@@ -44,9 +44,9 @@ class Master(object): ...@@ -44,9 +44,9 @@ class Master(object):
Attributes: Attributes:
job_center (JobCenter): A thread-safe data structure that stores the job address of vacant cpus. job_center (JobCenter): A thread-safe data structure that stores the job address of vacant cpus.
client_socket (zmq.Context.socket): A socket that receives submitted master_socket (zmq.Context.socket): A socket that receives all kinds of
job from the client, and later sends messages from workers, clients, and
job_address back to the client. cluster monitor.
master_ip(str): The ip address of the master node. master_ip(str): The ip address of the master node.
cpu_num(int): The number of available CPUs in the cluster. cpu_num(int): The number of available CPUs in the cluster.
worker_num(int): The number of workers connected to this cluster. worker_num(int): The number of workers connected to this cluster.
...@@ -63,9 +63,9 @@ class Master(object): ...@@ -63,9 +63,9 @@ class Master(object):
logger.set_dir( logger.set_dir(
os.path.expanduser('~/.parl_data/master/{}:{}'.format( os.path.expanduser('~/.parl_data/master/{}:{}'.format(
self.master_ip, port))) self.master_ip, port)))
self.client_socket = self.ctx.socket(zmq.REP) self.master_socket = self.ctx.socket(zmq.REP)
self.client_socket.bind("tcp://*:{}".format(port)) self.master_socket.bind("tcp://*:{}".format(port))
self.client_socket.linger = 0 self.master_socket.linger = 0
self.port = port self.port = port
self.job_center = JobCenter(self.master_ip) self.job_center = JobCenter(self.master_ip)
...@@ -167,22 +167,22 @@ class Master(object): ...@@ -167,22 +167,22 @@ class Master(object):
connection; (2) worker update; (3) client connection; (4) job connection; (2) worker update; (3) client connection; (4) job
submittion; (5) reset job. submittion; (5) reset job.
""" """
message = self.client_socket.recv_multipart() message = self.master_socket.recv_multipart()
tag = message[0] tag = message[0]
# a new worker connects to the master # a new worker connects to the master
if tag == remote_constants.WORKER_CONNECT_TAG: if tag == remote_constants.WORKER_CONNECT_TAG:
self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) self.master_socket.send_multipart([remote_constants.NORMAL_TAG])
elif tag == remote_constants.MONITOR_TAG: elif tag == remote_constants.MONITOR_TAG:
status = self._get_status() status = self._get_status()
self.client_socket.send_multipart( self.master_socket.send_multipart(
[remote_constants.NORMAL_TAG, status]) [remote_constants.NORMAL_TAG, status])
# `xparl status` command line API # `xparl status` command line API
elif tag == remote_constants.STATUS_TAG: elif tag == remote_constants.STATUS_TAG:
status_info = self.cluster_monitor.get_status_info() status_info = self.cluster_monitor.get_status_info()
self.client_socket.send_multipart( self.master_socket.send_multipart(
[remote_constants.NORMAL_TAG, [remote_constants.NORMAL_TAG,
to_byte(status_info)]) to_byte(status_info)])
...@@ -201,7 +201,7 @@ class Master(object): ...@@ -201,7 +201,7 @@ class Master(object):
args=(initialized_worker.worker_address, )) args=(initialized_worker.worker_address, ))
thread.start() thread.start()
self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) self.master_socket.send_multipart([remote_constants.NORMAL_TAG])
# a client connects to the master # a client connects to the master
elif tag == remote_constants.CLIENT_CONNECT_TAG: elif tag == remote_constants.CLIENT_CONNECT_TAG:
...@@ -215,7 +215,7 @@ class Master(object): ...@@ -215,7 +215,7 @@ class Master(object):
target=self._create_client_monitor, target=self._create_client_monitor,
args=(client_heartbeat_address, )) args=(client_heartbeat_address, ))
thread.start() thread.start()
self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) self.master_socket.send_multipart([remote_constants.NORMAL_TAG])
# a client submits a job to the master # a client submits a job to the master
elif tag == remote_constants.CLIENT_SUBMIT_TAG: elif tag == remote_constants.CLIENT_SUBMIT_TAG:
...@@ -224,7 +224,7 @@ class Master(object): ...@@ -224,7 +224,7 @@ class Master(object):
if self.cpu_num: if self.cpu_num:
logger.info("Submitting job...") logger.info("Submitting job...")
job = self.job_center.request_job() job = self.job_center.request_job()
self.client_socket.send_multipart([ self.master_socket.send_multipart([
remote_constants.NORMAL_TAG, remote_constants.NORMAL_TAG,
to_byte(job.job_address), to_byte(job.job_address),
to_byte(job.client_heartbeat_address), to_byte(job.client_heartbeat_address),
...@@ -232,14 +232,14 @@ class Master(object): ...@@ -232,14 +232,14 @@ class Master(object):
]) ])
self._print_workers() self._print_workers()
else: else:
self.client_socket.send_multipart([remote_constants.CPU_TAG]) self.master_socket.send_multipart([remote_constants.CPU_TAG])
# a worker updates # a worker updates
elif tag == remote_constants.NEW_JOB_TAG: elif tag == remote_constants.NEW_JOB_TAG:
initialized_job = cloudpickle.loads(message[1]) initialized_job = cloudpickle.loads(message[1])
last_job_address = to_str(message[2]) last_job_address = to_str(message[2])
self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) self.master_socket.send_multipart([remote_constants.NORMAL_TAG])
self.job_center.update_job(last_job_address, initialized_job, self.job_center.update_job(last_job_address, initialized_job,
initialized_job.worker_address) initialized_job.worker_address)
logger.info("A worker updated. cpu_num:{}".format(self.cpu_num)) logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))
...@@ -248,7 +248,7 @@ class Master(object): ...@@ -248,7 +248,7 @@ class Master(object):
# check before start a worker # check before start a worker
elif tag == remote_constants.NORMAL_TAG: elif tag == remote_constants.NORMAL_TAG:
self.client_socket.send_multipart([remote_constants.NORMAL_TAG]) self.master_socket.send_multipart([remote_constants.NORMAL_TAG])
else: else:
raise NotImplementedError() raise NotImplementedError()
...@@ -270,8 +270,8 @@ class Master(object): ...@@ -270,8 +270,8 @@ class Master(object):
3. A new client connects to the master node. 3. A new client connects to the master node.
4. A connected client submits a job after a remote object is created. 4. A connected client submits a job after a remote object is created.
""" """
self.client_socket.linger = 0 self.master_socket.linger = 0
self.client_socket.setsockopt( self.master_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000) zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
while self.master_is_alive: while self.master_is_alive:
......
...@@ -153,7 +153,7 @@ def remote_class(*args, **kwargs): ...@@ -153,7 +153,7 @@ def remote_class(*args, **kwargs):
"""Try to request cpu resource for 1 second/time for 300 times.""" """Try to request cpu resource for 1 second/time for 300 times."""
cnt = 300 cnt = 300
while cnt > 0: while cnt > 0:
job_address = global_client.submit_job() job_address = global_client.submit_task()
if job_address is not None: if job_address is not None:
return job_address return job_address
if cnt % 30 == 0: if cnt % 30 == 0:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册