提交 eca90f14 编写于 作者: F fuyw 提交者: Bo Zhou

Add xparl status & @remote_class(max_memory) (#134)

* fix css max-width bug

* check if monitor_port is used

* change master log dir to .parl_data/master/port

* Add docs/parallel_training/features.rst

* rename feature.rst to overview.rst

* Add xparl status & @remote_class(max_memory)

* fix yapf error

* use new remote/tests

* reply job_memory_info to client heartbeat

* add cluster_status_test.py

* update overview.rst

* Update docs
上级 eff8e974
......@@ -65,6 +65,7 @@ Abstractions
:maxdepth: 2
:caption: Parallel Training
parallel_training/overview.rst
parallel_training/setup.rst
parallel_training/recommended_practice.rst
......@@ -81,4 +82,3 @@ Abstractions
model.rst
algorithm.rst
agent.rst
Summary
=======
Easy-to-use
###########
| With a single @parl.remote_class decorator, users can implement parallel
training easily, and do not have to care about stuff of multi-processes,
network communication.
High performance
################
| `@parl.remote_class` enable us to achieve real multi-thread computation
efficiency without modifying our codes. As shown in figure (a), python's
original multi-thread computation performs poorly due to the limitation
of the GIL, while PARL empowers us to realize real parallel computation
efficiency.
Web UI for computation resources
################################
| PARL provides a web monitor to watch the status of any resources connected
to the cluster. Users can view the cluster status at a WEB UI. It shows the
detailed information for each worker(e.g, memory used) and each task submitted.
Board compatibility
###################
| Our framework for distributed training is compatible with any other
frameworks, like tensorflow, pytorch or mxnet. By adding `@parl.remote_class`
decorator to their codes, users can easily convert their codes to distributed
computation.
Why PARL
########
High throughput
###############
| PARL uses a point-to-point connection for network communication in the
cluster. Unlike other framework like RLlib which replies on redis for
communication, PARL is able to achieve much higher throughput. The results
can be found in figure (b). With the same implementation in IMPALA, PARL
achieved an increase of 160% on data throughout over Ray(RLlib).
Automatic deployment
####################
| Unlike other parallel frameworks which fail to import modules from
external file, PARL will automatically package all related files and send
them to remote machines.
.. image:: ./comparison.png
:width: 600px
:align: center
......@@ -147,7 +147,6 @@ class Client(object):
to_byte(str(self.actor_num)),
to_byte(str(elapsed_time))
])
except zmq.error.Again as e:
logger.warning("[Client] Cannot connect to the master."
"Please check if it is still alive.")
......@@ -195,7 +194,17 @@ class Client(object):
try:
job_heartbeat_socket.send_multipart(
[remote_constants.HEARTBEAT_TAG])
_ = job_heartbeat_socket.recv_multipart()
job_message = job_heartbeat_socket.recv_multipart()
stop_job = to_str(job_message[1])
job_address = to_str(job_message[2])
if stop_job == 'True':
logger.error(
'Job {} exceeds max memory usage, will stop this job.'.
format(job_address))
self.actor_num -= 1
job_is_alive = False
else:
time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
except zmq.error.Again as e:
......
......@@ -106,6 +106,21 @@ class ClusterMonitor(object):
self.status['clients'].pop(client_address)
self.lock.release()
def get_status_info(self):
"""Return a message of current cluster status."""
self.lock.acquire()
worker_num = len(self.status['workers'])
clients_num = len(self.status['clients'])
used_cpus = 0
vacant_cpus = 0
for worker in self.status['workers'].values():
used_cpus += worker['used_cpus']
vacant_cpus += worker['vacant_cpus']
self.lock.release()
status_info = "has {} used cpus, {} vacant cpus.".format(
used_cpus, vacant_cpus)
return status_info
def get_status(self):
"""Return a cloudpickled status."""
self.lock.acquire()
......
......@@ -18,6 +18,8 @@ os.environ['XPARL'] = 'True'
import argparse
import cloudpickle
import pickle
import psutil
import re
import sys
import tempfile
import threading
......@@ -45,9 +47,15 @@ class Job(object):
"""
Args:
worker_address(str): worker_address for sending job information(e.g, pid)
Attributes:
pid (int): Job process ID.
max_memory (float): Maximum memory (MB) can be used by each remote instance.
"""
self.job_is_alive = True
self.worker_address = worker_address
self.pid = os.getpid()
self.max_memory = None
self.lock = threading.Lock()
self._create_sockets()
......@@ -104,14 +112,15 @@ class Job(object):
initialized_job = InitializedJob(
self.job_address, worker_heartbeat_address,
client_heartbeat_address, self.ping_heartbeat_address, None,
os.getpid())
self.pid)
self.job_socket.send_multipart(
[remote_constants.NORMAL_TAG,
cloudpickle.dumps(initialized_job)])
message = self.job_socket.recv_multipart()
worker_thread.start()
assert message[0] == remote_constants.NORMAL_TAG
tag = message[0]
assert tag == remote_constants.NORMAL_TAG
# create the kill_job_socket
kill_job_address = to_str(message[1])
self.kill_job_socket = self.ctx.socket(zmq.REQ)
......@@ -119,6 +128,16 @@ class Job(object):
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
self.kill_job_socket.connect("tcp://{}".format(kill_job_address))
def _check_used_memory(self):
"""Check if the memory used by this job exceeds self.max_memory."""
stop_job = False
if self.max_memory is not None:
process = psutil.Process(self.pid)
used_memory = float(process.memory_info()[0]) / (1024**2)
if used_memory > self.max_memory:
stop_job = True
return stop_job
def _reply_ping(self, socket):
"""Create a socket server that reply the ping signal from client.
This signal is used to make sure that the job is still alive.
......@@ -145,11 +164,18 @@ class Job(object):
If the job losts connection with the client, it will exit too.
"""
self.client_is_alive = True
while self.client_is_alive:
while self.client_is_alive and self.job_is_alive:
try:
message = socket.recv_multipart()
socket.send_multipart([remote_constants.HEARTBEAT_TAG])
stop_job = self._check_used_memory()
socket.send_multipart([
remote_constants.HEARTBEAT_TAG,
to_byte(str(stop_job)),
to_byte(self.job_address)
])
if stop_job == True:
socket.close(0)
os._exit(1)
except zmq.error.Again as e:
logger.warning(
"[Job] Cannot connect to the client. This job will exit and inform the worker."
......@@ -178,7 +204,6 @@ class Job(object):
try:
message = socket.recv_multipart()
socket.send_multipart([remote_constants.HEARTBEAT_TAG])
except zmq.error.Again as e:
logger.warning("[Job] Cannot connect to the worker{}. ".format(
self.worker_address) + "Job will quit.")
......@@ -234,6 +259,9 @@ class Job(object):
if tag == remote_constants.INIT_OBJECT_TAG:
cls = cloudpickle.loads(message[1])
args, kwargs = cloudpickle.loads(message[2])
max_memory = to_str(message[3])
if max_memory != 'None':
self.max_memory = float(max_memory)
try:
obj = cls(*args, **kwargs)
......
......@@ -58,10 +58,11 @@ class Master(object):
"""
def __init__(self, port):
logger.set_dir(os.path.expanduser('~/.parl_data/master/'))
self.ctx = zmq.Context()
self.master_ip = get_ip_address()
logger.set_dir(
os.path.expanduser('~/.parl_data/master/{}:{}'.format(
self.master_ip, port)))
self.client_socket = self.ctx.socket(zmq.REP)
self.client_socket.bind("tcp://*:{}".format(port))
self.client_socket.linger = 0
......@@ -178,8 +179,14 @@ class Master(object):
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG, status])
elif tag == remote_constants.WORKER_INITIALIZED_TAG:
# `xparl status` command line API
elif tag == remote_constants.STATUS_TAG:
status_info = self.cluster_monitor.get_status_info()
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(status_info)])
elif tag == remote_constants.WORKER_INITIALIZED_TAG:
initialized_worker = cloudpickle.loads(message[1])
worker_address = initialized_worker.worker_address
self.job_center.add_worker(initialized_worker)
......
......@@ -92,7 +92,7 @@ def cluster():
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--monitor_port', default=1234, type=int)
parser.add_argument('--address', default='localhost:1234', type=str)
parser.add_argument('--address', default='localhost:8010', type=str)
args = parser.parse_args()
CLUSTER_MONITOR = ClusterMonitor(args.address)
......
......@@ -17,6 +17,7 @@ CONNECT_TAG = b'[CONNECT]'
HEARTBEAT_TAG = b'[HEARTBEAT]'
KILLJOB_TAG = b'[KILLJOB]'
MONITOR_TAG = b'[MONITOR]'
STATUS_TAG = b'[STATUS]'
WORKER_CONNECT_TAG = b'[WORKER_CONNECT]'
WORKER_INITIALIZED_TAG = b'[WORKER_INITIALIZED]'
......
......@@ -28,7 +28,7 @@ from parl.remote.exceptions import RemoteError, RemoteAttributeError,\
from parl.remote.client import get_global_client
def remote_class(cls):
def remote_class(*args, **kwargs):
"""A Python decorator that enables a class to run all its functions
remotely.
......@@ -43,7 +43,7 @@ def remote_class(cls):
.. code-block:: python
@remote_class
@parl.remote_class
class Actor(object):
def __init__(self, x):
self.x = x
......@@ -55,6 +55,16 @@ def remote_class(cls):
actor = Actor()
actor.step()
# Set maximum memory usage to 300 MB for each object.
@parl.remote_class(max_memory=300)
class LimitedActor(object):
...
Args:
max_memory (float): Maximum memory (MB) can be used by each remote
instance, the unit is in MB and default value is
none(unlimited).
Returns:
A remote wrapper for the remote class.
......@@ -63,6 +73,7 @@ def remote_class(cls):
by `parl.connect(master_address)` beforehand.
"""
def decorator(cls):
class RemoteWrapper(object):
"""
Wrapper for remote class in client side.
......@@ -105,7 +116,8 @@ def remote_class(cls):
self.job_socket.send_multipart([
remote_constants.INIT_OBJECT_TAG,
cloudpickle.dumps(cls),
cloudpickle.dumps([args, kwargs])
cloudpickle.dumps([args, kwargs]),
to_byte(str(max_memory))
])
message = self.job_socket.recv_multipart()
tag = message[0]
......@@ -124,11 +136,14 @@ def remote_class(cls):
self.job_socket.close(0)
except AttributeError:
pass
except zmq.error.ZMQError:
pass
def send_file(self, socket):
try:
socket.send_multipart([
remote_constants.SEND_FILE_TAG, self.GLOBAL_CLIENT.pyfiles
remote_constants.SEND_FILE_TAG,
self.GLOBAL_CLIENT.pyfiles
])
_ = socket.recv_multipart()
except zmq.error.Again as e:
......@@ -142,7 +157,8 @@ def remote_class(cls):
if job_address is not None:
return job_address
if cnt % 30 == 0:
logger.warning("No vacant cpu resources at the moment, "
logger.warning(
"No vacant cpu resources at the moment, "
"will try {} times later.".format(cnt))
cnt -= 1
return None
......@@ -197,3 +213,8 @@ def remote_class(cls):
return wrapper
return RemoteWrapper
max_memory = kwargs.get('max_memory')
if len(args) == 1 and callable(args[0]):
return decorator(args[0])
return decorator
......@@ -13,18 +13,20 @@
# limitations under the License.
import click
import socket
import locale
import sys
import random
import os
import multiprocessing
import os
import random
import re
import socket
import subprocess
import sys
import threading
import warnings
import zmq
from multiprocessing import Process
from parl.utils import get_ip_address
from parl.utils import get_ip_address, to_str
from parl.remote.remote_constants import STATUS_TAG
# A flag to mark if parl is started from a command line
os.environ['XPARL'] = 'True'
......@@ -43,7 +45,7 @@ def get_free_tcp_port():
tcp.bind(('', 0))
addr, port = tcp.getsockname()
tcp.close()
return port
return str(port)
def is_port_available(port):
......@@ -86,10 +88,18 @@ def cli():
type=int,
help="Set number of cpu manually. If not set, it will use all "
"cpus of this machine.")
def start_master(port, cpu_num):
@click.option(
"--monitor_port", help="The port to start a cluster monitor.", type=str)
def start_master(port, cpu_num, monitor_port):
if not is_port_available(port):
raise Exception(
"The master address localhost:{} already in use.".format(port))
"The master address localhost:{} is already in use.".format(port))
if monitor_port and not is_port_available(monitor_port):
raise Exception(
"The input monitor port localhost:{} is already in use.".format(
monitor_port))
cpu_num = cpu_num if cpu_num else multiprocessing.cpu_count()
start_file = __file__.replace('scripts.pyc', 'start.py')
start_file = start_file.replace('scripts.py', 'start.py')
......@@ -105,7 +115,7 @@ def start_master(port, cpu_num):
FNULL = open(os.devnull, 'w')
p = subprocess.Popen(command, stdout=FNULL, stderr=subprocess.STDOUT)
monitor_port = get_free_tcp_port()
monitor_port = monitor_port if monitor_port else get_free_tcp_port()
command = [
sys.executable, '{}/monitor.py'.format(__file__[:__file__.rfind('/')]),
......@@ -115,23 +125,28 @@ def start_master(port, cpu_num):
p = subprocess.Popen(command, stdout=FNULL, stderr=subprocess.STDOUT)
FNULL.close()
master_ip = get_ip_address()
cluster_info = """
# The Parl cluster is started at localhost:{}.
# A local worker with {} CPUs is connected to the cluster.
## If you want to check cluster status, visit:
## If you want to check cluster status, please view:
http://{}:{}.
## If you want to add more CPU resources, call:
or call:
xparl status
xparl connect --address localhost:{}
## If you want to add more CPU resources, please call:
## If you want to shutdown the cluster, call:
xparl connect --address {}:{}
xparl stop""".format(port, cpu_num, get_ip_address(), monitor_port,
port)
## If you want to shutdown the cluster, please call:
xparl stop
""".format(port, cpu_num, master_ip, monitor_port, master_ip, port)
click.echo(cluster_info)
......@@ -168,9 +183,40 @@ def stop():
subprocess.call([command], shell=True)
@click.command("status")
def status():
cmd = r'ps -ef | grep remote/monitor.py\ --monitor_port'
content = os.popen(cmd).read()
pattern = re.compile('--monitor_port (.*?)\n', re.S)
monitors = pattern.findall(content)
if len(monitors) == 0:
click.echo('No active cluster is found.')
else:
ctx = zmq.Context()
status = []
for monitor in monitors:
monitor_port, _, master_address = monitor.split(' ')
master_ip = master_address.split(':')[0]
monitor_address = "{}:{}".format(master_ip, monitor_port)
socket = ctx.socket(zmq.REQ)
socket.connect('tcp://{}'.format(master_address))
socket.send_multipart([STATUS_TAG])
cluster_info = to_str(socket.recv_multipart()[1])
msg = """
# Cluster {} {}
# If you want to check cluster status, please view: http://{}
""".format(master_address, cluster_info, monitor_address)
status.append(msg)
socket.close(0)
for monitor_status in status:
click.echo(monitor_status)
cli.add_command(start_worker)
cli.add_command(start_master)
cli.add_command(stop)
cli.add_command(status)
def main():
......
......@@ -138,8 +138,8 @@ function addPlots(res, record, imgHandle, begin, end) {
]
};
if (i < record_num && worker.worker_address === record[i].worker_address) {
if (worker.cpu_num !== record[i].cpu_num) {
if (i < record_num && worker.hostname === record[i].hostname) {
if (worker.used_cpus !== record[i].used_cpus) {
imgHandle[`w${i}c0`].setOption(cpuOption);
}
if (worker.used_memory !== record[i].used_memory) {
......@@ -155,10 +155,10 @@ function addPlots(res, record, imgHandle, begin, end) {
}
record[i] = {
worker_address: worker.worker_address,
used_cpu: worker.used_cpu,
vacant_cpu: worker.vacant_cpu,
used_memory: worker.used_cpu,
hostname: worker.hostname,
used_cpus: worker.used_cpus,
vacant_cpus: worker.vacant_cpus,
used_memory: worker.used_memory,
vacant_memory: worker.vacant_memory
};
}
......
......@@ -44,10 +44,8 @@
var delta = 3;
$(document).ready(function() {
console.log('After ready.');
$.get("cluster", function(data, status) {
res = data;
console.log('Get first data.', res);
createDivs(res, start_num);
addPlots(res, record, imgHandle, 0, start_num);
});
......@@ -55,7 +53,6 @@
setInterval(function() {
$.get("cluster", function(data, status) {
res = data;
console.log('Interval', res);
createDivs(res, start_num);
addPlots(res, record, imgHandle, 0, start_num);
});
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import parl
import unittest
import time
import threading
from parl.remote.master import Master
from parl.remote.worker import Worker
from parl.remote.client import disconnect
from parl.remote.monitor import ClusterMonitor
@parl.remote_class(max_memory=200)
class Actor(object):
def __init__(self, x=10):
self.x = x
self.data = []
def add_100mb(self):
self.data.append(os.urandom(100 * 1024**2))
self.x += 1
return self.x
class TestMaxMemory(unittest.TestCase):
def tearDown(self):
disconnect()
def test_max_memory(self):
port = 3001
master = Master(port=port)
th = threading.Thread(target=master.run)
th.start()
time.sleep(1)
worker = Worker('localhost:{}'.format(port), 1)
cluster_monitor = ClusterMonitor('localhost:{}'.format(port))
time.sleep(1)
parl.connect('localhost:{}'.format(port))
actor = Actor()
time.sleep(30)
self.assertEqual(1, cluster_monitor.data['clients'][0]['actor_num'])
actor.add_100mb()
time.sleep(50)
self.assertEqual(0, cluster_monitor.data['clients'][0]['actor_num'])
actor.job_socket.close(0)
del actor
worker.exit()
master.exit()
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import parl
import unittest
import time
import threading
from parl.remote.master import Master
from parl.remote.worker import Worker
from parl.remote.client import disconnect
from parl.remote.monitor import ClusterMonitor
@parl.remote_class(max_memory=200)
class Actor(object):
def __init__(self, x=10):
self.x = x
self.data = []
def add_100mb(self):
self.data.append(os.urandom(100 * 1024**2))
self.x += 1
return self.x
class TestClusterStatus(unittest.TestCase):
def tearDown(self):
disconnect
def test_cluster_status(self):
port = 4321
master = Master(port=port)
th = threading.Thread(target=master.run)
th.start()
time.sleep(1)
worker = Worker('localhost:{}'.format(port), 1)
time.sleep(5)
status_info = master.cluster_monitor.get_status_info()
self.assertEqual(status_info, 'has 0 used cpus, 1 vacant cpus.')
parl.connect('localhost:{}'.format(port))
actor = Actor()
time.sleep(30)
status_info = master.cluster_monitor.get_status_info()
self.assertEqual(status_info, 'has 1 used cpus, 0 vacant cpus.')
worker.exit()
master.exit()
if __name__ == '__main__':
unittest.main()
......@@ -170,6 +170,7 @@ class Worker(object):
def _fill_job_buffer(self):
"""An endless loop that adds initialized job into the job buffer"""
while self.worker_is_alive:
if self.job_buffer.full() is False:
initialized_jobs = self._init_jobs(job_num=self.cpu_num)
for job in initialized_jobs:
self.job_buffer.put(job)
......@@ -269,7 +270,6 @@ class Worker(object):
[remote_constants.HEARTBEAT_TAG])
_ = job_heartbeat_socket.recv_multipart()
time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
except zmq.error.Again as e:
job.is_alive = False
logger.warning(
......@@ -291,7 +291,8 @@ class Worker(object):
while self.worker_is_alive and self.master_is_alive:
try:
message = self.kill_job_socket.recv_multipart()
assert message[0] == remote_constants.KILLJOB_TAG
tag = message[0]
assert tag == remote_constants.KILLJOB_TAG
to_kill_job_address = to_str(message[1])
self._kill_job(to_kill_job_address)
self.kill_job_socket.send_multipart(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册