未验证 提交 c5f2802d 编写于 作者: C Chengmo 提交者: GitHub

【paddle.fleet】Update fleetrun & ps-heter (#27472)

* refine fleetrun.ps_launch

* update fleet run for multi device support

* ps_graph support ps-gpu

* fix heter save

* add heter save unittest

* fix unittest & simple code

* update fleetrun

* fix fleetrun

* fix launch barrier

* fix role maker

* add paddlecloud rolemaker unittest

* rename heter_worker_device_guard
上级 bbc837ee
...@@ -98,6 +98,7 @@ message AsyncConfig { ...@@ -98,6 +98,7 @@ message AsyncConfig {
optional int32 send_wait_times = 7 [ default = 1 ]; optional int32 send_wait_times = 7 [ default = 1 ];
optional bool runtime_split_send_recv = 8 [ default = false ]; optional bool runtime_split_send_recv = 8 [ default = false ];
optional bool launch_barrier = 9 [ default = true ]; optional bool launch_barrier = 9 [ default = true ];
optional string heter_worker_device_guard = 10 [ default = 'cpu' ];
} }
message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; }
......
...@@ -530,13 +530,6 @@ class RoleMakerBase(object): ...@@ -530,13 +530,6 @@ class RoleMakerBase(object):
return self._heter_trainer_endpoints[(self._current_id) % return self._heter_trainer_endpoints[(self._current_id) %
self._heter_worker_num()] self._heter_worker_num()]
def _get_heter_worker_device(self):
"""
Returns:
string: heter_trainer's device of current node, e.g: CPU/GPU/XPU
"""
return self._heter_trainer_device.upper()
class PaddleCloudRoleMaker(RoleMakerBase): class PaddleCloudRoleMaker(RoleMakerBase):
def __init__(self, is_collective=False, **kwargs): def __init__(self, is_collective=False, **kwargs):
...@@ -677,10 +670,9 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -677,10 +670,9 @@ class PaddleCloudRoleMaker(RoleMakerBase):
return self._role == Role.HETER_WORKER return self._role == Role.HETER_WORKER
def _ps_env(self): def _ps_env(self):
try:
# Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set
# format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002
self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST", None)
if self._server_endpoints is None: if self._server_endpoints is None:
# back to non_distributed execution. # back to non_distributed execution.
...@@ -696,14 +688,23 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -696,14 +688,23 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._server_endpoints = self._server_endpoints.split(",") self._server_endpoints = self._server_endpoints.split(",")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", None)
if self._worker_endpoints: if self._worker_endpoints != None:
self._worker_endpoints = self._worker_endpoints.split(",") self._worker_endpoints = self._worker_endpoints.split(",")
else: else:
self._worker_endpoints = [] self._worker_endpoints = []
trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) trainers_num = os.getenv("PADDLE_TRAINERS_NUM", None)
training_role = os.environ["TRAINING_ROLE"] if trainers_num == None:
raise ValueError(
"Can not find PADDLE_TRAINERS_NUM, please check your environment."
)
trainers_num = int(trainers_num)
training_role = os.getenv("TRAINING_ROLE", None)
if training_role == None:
raise ValueError(
"Can not find TRAINING_ROLE, please check your environment.")
if training_role not in ["TRAINER", "PSERVER", "HETER_TRAINER"]: if training_role not in ["TRAINER", "PSERVER", "HETER_TRAINER"]:
raise ValueError( raise ValueError(
...@@ -711,11 +712,9 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -711,11 +712,9 @@ class PaddleCloudRoleMaker(RoleMakerBase):
format(training_role)) format(training_role))
# For heter parameter server env setting # For heter parameter server env setting
heter_trainer_eplist = os.getenv( heter_trainer_eplist = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST",
"PADDLE_HETER_TRAINER_IP_PORT_LIST", None) "")
heter_trainer_device = os.getenv("PADDLE_HETER_TRAINER_DEVICE", if heter_trainer_eplist != "":
None)
if heter_trainer_eplist and heter_trainer_device:
try: try:
heter_trainer_eplist = os.environ[ heter_trainer_eplist = os.environ[
"PADDLE_HETER_TRAINER_IP_PORT_LIST"].split(",") "PADDLE_HETER_TRAINER_IP_PORT_LIST"].split(",")
...@@ -726,39 +725,44 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -726,39 +725,44 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._is_heter_parameter_server_mode = True self._is_heter_parameter_server_mode = True
heter_trainers_num = len(heter_trainer_eplist) heter_trainers_num = len(heter_trainer_eplist)
current_node_device = heter_trainer_device.upper()
if current_node_device not in ["CPU", "GPU", "XPU"]:
raise ValueError(
"Heter Trainer doesn't support {} device now, please use CPU / GPU / XPU(KunLun)".
format(heter_trainer_device))
self._heter_trainer_device = current_node_device
else: else:
self._is_heter_parameter_server_mode = False self._is_heter_parameter_server_mode = False
heter_trainers_num = 0 heter_trainers_num = 0
if training_role == "TRAINER": if training_role == "TRAINER":
role = Role.WORKER role = Role.WORKER
current_id = int(os.environ["PADDLE_TRAINER_ID"]) current_id = os.getenv("PADDLE_TRAINER_ID", None)
if current_id == None:
raise ValueError(
"Can not find PADDLE_TRAINER_ID, please check your environment."
)
current_id = int(current_id)
if len(self._worker_endpoints) > 0: if len(self._worker_endpoints) > 0:
self._cur_endpoint = self._worker_endpoints[current_id] self._cur_endpoint = self._worker_endpoints[current_id]
elif training_role == "PSERVER": elif training_role == "PSERVER":
role = Role.SERVER role = Role.SERVER
port = os.environ["PADDLE_PORT"] port = os.getenv("PADDLE_PORT", None)
ip = os.environ["POD_IP"] if port == None:
raise ValueError(
"Can not find PADDLE_PORT, please check your environment.")
ip = os.getenv("POD_IP", None)
if ip == None:
raise ValueError(
"Can not find POD_IP, please check your environment.")
self._cur_endpoint = ip + ":" + port self._cur_endpoint = ip + ":" + port
current_id = self._server_endpoints.index(self._cur_endpoint) current_id = self._server_endpoints.index(self._cur_endpoint)
elif training_role == "HETER_TRAINER": elif training_role == "HETER_TRAINER":
role = Role.HETER_WORKER role = Role.HETER_WORKER
cur_ip = os.environ["POD_IP"] cur_port = os.getenv("PADDLE_PORT", None)
cur_port = os.environ["PADDLE_PORT"] if cur_port == None:
curr_endpoint = ":".join([cur_ip, cur_port])
current_id = heter_trainer_eplist.index(curr_endpoint)
else:
raise ValueError( raise ValueError(
"TRAINING_ROLE must be PSERVER or TRAINER or HETER_TRAINER") "Can not find PADDLE_PORT, please check your environment.")
except ValueError as e: cur_ip = os.getenv("POD_IP", None)
if cur_ip == None:
raise ValueError( raise ValueError(
"Something wrong with PaddleCloud, please check environment") "Can not find POD_IP, please check your environment.")
curr_endpoint = ":".join([cur_ip, cur_port])
current_id = heter_trainer_eplist.index(curr_endpoint)
self._trainers_num = trainers_num self._trainers_num = trainers_num
self._role = role self._role = role
......
...@@ -89,14 +89,16 @@ def _parse_args(): ...@@ -89,14 +89,16 @@ def _parse_args():
description='''start paddle training using multi-process mode. description='''start paddle training using multi-process mode.
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
''') ''')
base_group = parser.add_argument_group("Base Parameters")
# Optional arguments for the launch helper base_group.add_argument(
parser.add_argument( "--log_dir",
"--ips",
type=str, type=str,
default="127.0.0.1", default="log",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") help="The path for each process's log.If it's not set, the log will printed to default pipe."
parser.add_argument( )
base_group.add_argument(
"--gpus", "--gpus",
type=str, type=str,
default=None, default=None,
...@@ -104,22 +106,7 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ...@@ -104,22 +106,7 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
"each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
) )
parser.add_argument( base_group.add_argument(
"--servers", type=str, default="", help="User defined servers ip:port")
parser.add_argument(
"--workers", type=str, default="", help="User defined workers ip:port")
parser.add_argument("--worker_num", type=int, help="number of workers")
parser.add_argument("--server_num", type=int, help="number of servers")
parser.add_argument(
"--log_dir",
type=str,
default="log",
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)
# positional
parser.add_argument(
"training_script", "training_script",
type=str, type=str,
help="The full path to the single GPU training " help="The full path to the single GPU training "
...@@ -127,8 +114,34 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ...@@ -127,8 +114,34 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
"followed by all the arguments for the " "followed by all the arguments for the "
"training script") "training script")
# rest from the training program base_group.add_argument('training_script_args', nargs=REMAINDER)
parser.add_argument('training_script_args', nargs=REMAINDER)
# Optional arguments for the launch helper
# for collective
collective_group = parser.add_argument_group("Collective Parameters")
collective_group.add_argument(
"--ips",
type=str,
default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
ps_group = parser.add_argument_group("Parameter-Server Parameters")
# for parameter server
ps_group.add_argument(
"--servers", type=str, default="", help="User defined servers ip:port")
ps_group.add_argument(
"--workers", type=str, default="", help="User defined workers ip:port")
ps_group.add_argument(
"--heter_workers",
type=str,
default="",
help="User defined heter workers ip:port")
ps_group.add_argument("--worker_num", type=int, help="number of workers")
ps_group.add_argument("--server_num", type=int, help="number of servers")
ps_group.add_argument(
"--heter_worker_num", type=int, help="number of heter_workers")
return parser.parse_args() return parser.parse_args()
...@@ -166,35 +179,6 @@ def get_cluster_from_args(args, gpus): ...@@ -166,35 +179,6 @@ def get_cluster_from_args(args, gpus):
return get_cluster(node_ips, node_ip, trainer_endpoints, gpus) return get_cluster(node_ips, node_ip, trainer_endpoints, gpus)
def get_gpus(gpus):
if gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
res_gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
res_gpus = [x.strip() for x in gpus.split(',')]
else:
# change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
# therefore gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
res_gpus = [
cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',')
]
logger.info("Change selected_gpus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"CUDA_VISIBLE_DEVICES:{}".format(
gpus, res_gpus, cuda_visible_devices_list))
return res_gpus
def launch_collective(args): def launch_collective(args):
# parse arguments, used for cloud-single-machine and local # parse arguments, used for cloud-single-machine and local
gpus = get_gpus(args.gpus) gpus = get_gpus(args.gpus)
...@@ -245,209 +229,37 @@ def launch_collective(args): ...@@ -245,209 +229,37 @@ def launch_collective(args):
shutil.rmtree(gloo_rendezvous_dir) shutil.rmtree(gloo_rendezvous_dir)
def launch_ps(args): def launch_ps(args, distribute_mode):
ports = None cloud_flag = cloud_utils.use_paddlecloud()
start_port = 6170
if args.server_num: # for ps-cpu on paddlecloud
server_num = args.server_num if cloud_flag and distribute_mode == DistributeMode.PS:
ports = get_ports(server_num, 0) direct_start(args)
server_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) return
else: elif cloud_flag and distribute_mode == DistributeMode.PS_HETER:
assert args.servers != "", "The setting of CPU mode must be either server_num or servers." cloud_ps_heter_env_set(args)
server_endpoints = args.servers args.workers = os.getenv("PADDLE_TRAINER_ENDPOINTS")
server_endpoints_ips = [ args.servers = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST")
x.strip().split(":")[0] for x in server_endpoints.split(",") args.heter_workers = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST")
ps_launcher = ParameterServerLauncher(args, distribute_mode)
ps_launcher.start_ps()
return
def which_distributed_mode(args):
ps_args = [
'--worker_num',
'--server_num',
'--heter_worker_num',
'--servers',
'--workers',
'--heter_workers',
] ]
server_endpoints_port = [ collective_args = ['--ips']
x.strip().split(":")[1] for x in server_endpoints.split(",")
]
server_num = len(server_endpoints_ips)
if args.worker_num: ps_heter_args = ["--heter_worker_num", "--heter_workers"]
worker_num = args.worker_num
ports = get_ports(worker_num, server_num)
worker_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
else:
assert args.workers != "", "The setting of CPU mode must be either worker_num or workers."
worker_endpoints = args.workers
worker_endpoints_ips = [
x.strip().split(":")[0] for x in worker_endpoints.split(",")
]
worker_num = len(worker_endpoints_ips)
node_ips = list(set(server_endpoints_ips + worker_endpoints_ips))
worker_endpoints_len = [
len(x.strip().split(":")) for x in worker_endpoints.split(",")
]
if 1 in worker_endpoints_len:
# if no port value in worker_endpoints, will set default port values.
worker_endpoints_port = range(start_port + server_num,
start_port + server_num + worker_num, 1)
else:
worker_endpoints_port = [
x.strip().split(":")[1] for x in worker_endpoints.split(",")
]
# local train
if len(set(node_ips)) == 1:
current_node_ip = node_ips[0]
else:
_, current_node_ip = get_host_name_ip()
assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (current_node_ip, node_ips)
node_rank = node_ips.index(current_node_ip)
logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}".
format(node_ips, current_node_ip, node_rank, server_endpoints_port))
cluster = Cluster(hdfs=None)
server_rank = 0
worker_rank = 0
for node_rank, ip in enumerate(node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
for i in range(len(server_endpoints_ips)):
if ip == server_endpoints_ips[i]:
server = Trainer()
server.endpoint = "%s:%s" % (ip, server_endpoints_port[i])
server.rank = server_rank
server_rank += 1
pod.servers.append(server)
for j in range(len(worker_endpoints_ips)):
if ip == worker_endpoints_ips[j]:
worker = Trainer()
worker.endpoint = "%s:%s" % (ip, worker_endpoints_port[i])
worker.rank = worker_rank
worker_rank += 1
pod.workers.append(worker)
cluster.pods.append(pod)
pod_rank = node_ips.index(current_node_ip)
pod = cluster.pods[pod_rank]
default_env = os.environ.copy()
current_env = copy.copy(default_env)
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
current_env["PADDLE_WITH_GLOO"] = "1"
current_env["PADDLE_GLOO_RENDEZVOUS"] = "3"
current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
cmds = []
log_fns = []
for idx, cur_server in enumerate(pod.servers):
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": cur_server.endpoint.split(":")[0]
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if idx == 0:
logger.info(
"Local server start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.servers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_server.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
procs.append(tp)
for idx, cur_worker in enumerate(pod.workers):
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank)
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if idx == 0:
logger.info(
"Local worker start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.workers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_worker.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
procs.append(tp)
logger.info(
"Please check servers and workers logs in {}/workerlog.* and {}/serverlog.*".
format(args.log_dir, args.log_dir))
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < len(pod.servers):
continue
procs[i].proc.wait()
if len(log_fns) > 0:
log_fns[i].close()
print("all workers exit, going to finish parameter server", file=sys.stderr)
for i in range(len(pod.servers)):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].proc.terminate()
print("all parameter server are killed", file=sys.stderr)
if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
def launch():
args = _parse_args()
logger = get_logger()
_print_arguments(args)
ps_args = ['--worker_num', '--server_num', '--servers', '--workers']
collective_args = ['--ips', '--gpus']
has_ps_args = [ has_ps_args = [
ps_arg for ps_arg in ps_args if ps_arg in " ".join(sys.argv[1:-1]) ps_arg for ps_arg in ps_args if ps_arg in " ".join(sys.argv[1:-1])
] ]
...@@ -455,23 +267,46 @@ def launch(): ...@@ -455,23 +267,46 @@ def launch():
co_arg for co_arg in collective_args co_arg for co_arg in collective_args
if co_arg in " ".join(sys.argv[1:-1]) if co_arg in " ".join(sys.argv[1:-1])
] ]
if len(has_ps_args) > 1 and len(has_collective_args) > 1:
raise ValueError(
"Only one mode(Collective or Parameter-Server) can be selected at the same time, but more than one configuration was received."
)
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
cuda_device_num = fluid.core.get_cuda_device_count() cuda_device_num = fluid.core.get_cuda_device_count()
else: else:
cuda_device_num = 0 cuda_device_num = 0
if len(has_ps_args) > 0 or cuda_device_num == 0: if len(has_ps_args) > 0:
logger.info("Run parameter-sever cpu mode. pserver arguments:{}".format( logger.info(
has_ps_args)) "Run parameter-sever mode. pserver arguments:{}, cuda count:{}".
launch_ps(args) format(has_ps_args, cuda_device_num))
has_ps_heter_args = list(set(has_ps_args) & set(ps_heter_args))
if len(has_ps_heter_args) > 0:
return DistributeMode.PS_HETER
else:
return DistributeMode.PS
elif len(has_collective_args) > 0: elif len(has_collective_args) > 0:
logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}". logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}".
format(has_collective_args, cuda_device_num)) format(has_collective_args, cuda_device_num))
launch_collective(args) return DistributeMode.COLLECTIVE
else: else:
logger.warning( logger.warning(
"Not found distinct arguments. Default use gpu collective mode") "Not found distinct arguments. Default use gpu collective mode")
return DistributeMode.COLLECTIVE
def launch():
args = _parse_args()
logger = get_logger()
_print_arguments(args)
distribute_mode = which_distributed_mode(args)
if distribute_mode == DistributeMode.COLLECTIVE:
launch_collective(args) launch_collective(args)
else:
launch_ps(args, distribute_mode)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -21,13 +21,27 @@ import signal ...@@ -21,13 +21,27 @@ import signal
import copy import copy
import sys import sys
import subprocess import subprocess
import tempfile
import shutil
from contextlib import closing from contextlib import closing
import socket import socket
import warnings
import paddle
import paddle.fluid as fluid
logger = logging.getLogger("root") logger = logging.getLogger("root")
logger.propagate = False logger.propagate = False
class DistributeMode:
"""
There are various mode for fleetrun, each of them is designed for different model.
"""
COLLECTIVE = 0
PS = 1
PS_HETER = 2
class Cluster(object): class Cluster(object):
def __init__(self, hdfs): def __init__(self, hdfs):
self.job_server = None self.job_server = None
...@@ -144,14 +158,16 @@ class Pod(object): ...@@ -144,14 +158,16 @@ class Pod(object):
self.trainers = [] self.trainers = []
self.servers = [] self.servers = []
self.workers = [] self.workers = []
self.heter_workers = []
self.gpus = [] self.gpus = []
def __str__(self): def __str__(self):
return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \ return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \
workers:{}".format(self.rank, self.id, self.addr, self.port, workers:{} heter_workers:{}".format(
self.gpus, [str(t) for t in self.trainers], self.rank, self.id, self.addr, self.port, self.gpus, [
[str(s) for s in self.servers], str(t) for t in self.trainers
[str(w) for w in self.workers]) ], [str(s) for s in self.servers], [str(w) for w in self.workers],
[str(h) for h in self.heter_workers])
def __eq__(self, pod): def __eq__(self, pod):
if self.rank != pod.rank or \ if self.rank != pod.rank or \
...@@ -262,7 +278,7 @@ def terminate_local_procs(procs): ...@@ -262,7 +278,7 @@ def terminate_local_procs(procs):
p.log_fn.close() p.log_fn.close()
logger.debug("terminate process id:{}".format(p.proc.pid)) logger.debug("terminate process id:{}".format(p.proc.pid))
#wait all process terminiated # wait all process terminiated
time.sleep(3) time.sleep(3)
for step in range(0, 50): for step in range(0, 50):
alive = False alive = False
...@@ -406,10 +422,10 @@ def start_local_trainers(cluster, ...@@ -406,10 +422,10 @@ def start_local_trainers(cluster,
else: else:
current_env = copy.copy(envs) current_env = copy.copy(envs)
#paddle broadcast ncclUniqueId use socket, and # paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them. # proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri" # if we set them to "", grpc will log error message "bad uri"
#so just delete them. # so just delete them.
current_env.pop("http_proxy", None) current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None) current_env.pop("https_proxy", None)
...@@ -518,3 +534,524 @@ def watch_local_trainers(procs, nranks): ...@@ -518,3 +534,524 @@ def watch_local_trainers(procs, nranks):
raise raise
return alive return alive
def get_gpus(gpus):
if gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
res_gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
res_gpus = [x.strip() for x in gpus.split(',')]
else:
# change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
# therefore gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
res_gpus = [
cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',')
]
logger.info("Change selected_gpus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"CUDA_VISIBLE_DEVICES:{}".format(
gpus, res_gpus, cuda_visible_devices_list))
return res_gpus
def direct_start(args):
# run ps-cpu mode on paddlecloud, using given envs
cmd = [sys.executable, "-u", args.training_script] + \
args.training_script_args
proc = subprocess.Popen(cmd)
proc.wait()
return
def get_custom_endpoints(origin_endpoints, offset=0):
"""
origin_endpoint: ip:port
user_define_endpoint: ip:(port+offset)
"""
assert origin_endpoints != None
paddle_user_define_endpoints_list = []
for ip_port in origin_endpoints.split(","):
ip = ip_port.split(":")[0]
port = ip_port.split(":")[1]
new_port = int(port) + offset
paddle_user_define_endpoints_list.append(":".join((ip, str(new_port))))
paddle_user_define_endpoints = ",".join(paddle_user_define_endpoints_list)
return paddle_user_define_endpoints
def cloud_ps_heter_env_set(args):
environs = {}
paddle_trainer_endpoints = os.getenv("TRAINER_IP_PORT_LIST", "")
assert paddle_trainer_endpoints != None
paddle_pserver_endpoints = os.getenv("PSERVER_IP_PORT_LIST", "")
assert paddle_pserver_endpoints != None
# hard code for paddlecloud custom-framework
avilable_ports = os.getenv("TRAINER_PORTS", "").split(",")
assert len(
avilable_ports
) > 3, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit"
# hard code for paddlecloud custom-framework
trainers_num = len(paddle_pserver_endpoints.split(","))
assert trainers_num != 0
environs["PADDLE_TRAINERS_NUM"] = trainers_num
environs["TRAINERS_NUM"] = trainers_num
# hard code for paddlecloud custom-framework
environs["PADDLE_HETER_TRAINER_IP_PORT_LIST"] = paddle_trainer_endpoints
environs["PADDLE_PSERVERS_IP_PORT_LIST"] = paddle_pserver_endpoints
environs["PADDLE_TRAINER_ENDPOINTS"] = get_custom_endpoints(
paddle_pserver_endpoints, 1)
heter_worker_num = len(paddle_trainer_endpoints.split(","))
if (args.heter_worker_num != None) and (
heter_worker_num != args.heter_worker_num):
warnings.warn(
"Your fleetrun setting: heter_worker_num is {}, but we find {} device can be used, this setting has been changed.".
format(args.heter_worker_num, heter_worker_num))
args.heter_worker_num = heter_worker_num
for k, v in environs.items():
os.environ[k] = str(v)
logger.info("Set heter parameter server env: {}".format(
pretty_print_envs(environs)))
class ParameterServerLauncher(object):
def __init__(self, args, distribute_mode):
self.args = args
self.distribute_mode = distribute_mode
self.server_num = 0
self.worker_num = 0
self.heter_worker_num = 0
self.server_endpoints = ""
self.server_endpoints_ips = []
self.server_endpoints_port = []
self.worker_endpoints = ""
self.worker_endpoints_ips = []
self.worker_endpoints_port = []
self.heter_worker_endpoints = ""
self.heter_worker_endpoints_ips = []
self.heter_worker_endpoints_port = []
self.is_local = True
self.current_node_ip = ""
self.get_role_endpoints(args)
def get_role_endpoints(self, args):
# get server envs
if args.server_num:
self.server_num = args.server_num
if args.servers:
assert len(
args.servers.split(",")
) == self.server_num, "The server_num and servers doesn't match. Expect servers endpoints num epual to server_num, but received servers enpoint num: {} and server_num {}".format(
len(args.servers.split(",")), self.server_num)
self.server_endpoints = args.servers
else:
ports = get_ports(self.server_num, 0)
self.server_endpoints = ",".join(
["127.0.0.1:" + str(x) for x in ports])
else:
assert args.servers != "", "The setting of Parameter-Server must has server_num or servers."
self.server_endpoints = args.servers
self.server_num = len(self.server_endpoints.split(","))
# get worker envs
if args.worker_num:
self.worker_num = args.worker_num
if args.workers:
assert len(
args.workers.split(",")
) == self.worker_num, "The worker_num and workers doesn't match. Expect workers endpoints num epual to worker_num, but received workers enpoint num: {} and worker_num {}".format(
len(args.workers.split(",")), self.worker_num)
self.worker_endpoints = args.workers
else:
ports = get_ports(self.worker_num, self.server_num)
self.worker_endpoints = ",".join(
["127.0.0.1:" + str(x) for x in ports])
else:
assert args.workers != "", "The setting of Parameter-Server must has worker_num or workers."
worker_endpoints_ips = [
x.strip().split(":")[0] for x in args.workers.split(",")
]
self.worker_num = len(worker_endpoints_ips)
worker_endpoints_len = [
len(x.strip().split(":")) for x in args.workers.split(",")
]
if 1 in worker_endpoints_len:
# if no port value in worker_endpoints, will set default port values.
start_port = 6170
worker_endpoints_port = range(
start_port + self.server_num,
start_port + self.server_num + self.worker_num, 1)
# create endpoints str
worker_endpoints = []
for i in range(self.worker_num):
worker_endpoints.append(":".join((worker_endpoints_ips[
i], str(worker_endpoints_port[i]))))
self.worker_endpoints = ",".join(worker_endpoints)
else:
self.worker_endpoints = args.workers
# get heter worker envs
if self.distribute_mode == DistributeMode.PS_HETER:
if args.heter_worker_num:
self.heter_worker_num = args.heter_worker_num
if args.heter_workers:
assert len(
args.heter_workers.split(",")
) == self.heter_worker_num, "The heter_worker_num and heter_workers doesn't match. Expect heter_workers endpoints num epual to heter_worker_num, but received heter_workers enpoint num: {} and heter_worker_num {}".format(
len(args.heter_workers.split(",")),
self.heter_worker_num)
self.heter_worker_endpoints = args.heter_workers
else:
ports = get_ports(self.heter_worker_num,
self.server_num + self.worker_num)
self.heter_worker_endpoints = ",".join(
["127.0.0.1:" + str(x) for x in ports])
else:
assert args.heter_workers != "", "The setting of Parameter-Server heter mode must has heter_worker_num or heter_workers."
self.heter_worker_endpoints = args.heter_workers
self.heter_worker_num = len(
self.heter_worker_endpoints.split(","))
# check local or user define
self.server_endpoints_ips = [
x.strip().split(":")[0] for x in self.server_endpoints.split(",")
]
self.worker_endpoints_ips = [
x.strip().split(":")[0] for x in self.worker_endpoints.split(",")
]
self.server_endpoints_port = [
x.strip().split(":")[1] for x in self.server_endpoints.split(",")
]
self.worker_endpoints_port = [
x.strip().split(":")[1] for x in self.worker_endpoints.split(",")
]
self.node_ips = list(
set(self.server_endpoints_ips + self.worker_endpoints_ips))
if self.distribute_mode == DistributeMode.PS_HETER:
self.heter_worker_endpoints_ips = [
x.strip().split(":")[0]
for x in self.heter_worker_endpoints.split(",")
]
self.heter_worker_endpoints_port = [
x.strip().split(":")[1]
for x in self.heter_worker_endpoints.split(",")
]
self.node_ips = list(
set(self.node_ips + self.heter_worker_endpoints_ips))
if len(set(self.node_ips)) == 1:
self.is_local = True
self.current_node_ip = self.node_ips[0]
else:
self.is_local = False
pod_ip = os.getenv("POD_IP", None)
if pod_ip == None:
_, self.current_node_ip = get_host_name_ip()
else:
self.current_node_ip = pod_ip
assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (self.current_node_ip, self.node_ips)
self.node_rank = self.node_ips.index(self.current_node_ip)
logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}".
format(self.node_ips, self.current_node_ip, self.node_rank))
def start_ps(self):
cluster = Cluster(hdfs=None)
server_rank = 0
worker_rank = 0
heter_worker_rank = 0
for node_rank, ip in enumerate(self.node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
for i in range(len(self.server_endpoints_ips)):
if ip == self.server_endpoints_ips[i]:
server = Trainer()
server.endpoint = "%s:%s" % (ip,
self.server_endpoints_port[i])
server.rank = server_rank
server_rank += 1
pod.servers.append(server)
for j in range(len(self.worker_endpoints_ips)):
if ip == self.worker_endpoints_ips[j]:
worker = Trainer()
worker.endpoint = "%s:%s" % (ip,
self.worker_endpoints_port[j])
worker.rank = worker_rank
worker_rank += 1
pod.workers.append(worker)
for k in range(len(self.heter_worker_endpoints_ips)):
if ip == self.heter_worker_endpoints_ips[k]:
heter_worker = Trainer()
heter_worker.endpoint = "%s:%s" % (
ip, self.heter_worker_endpoints_port[k])
heter_worker.rank = heter_worker_rank
heter_worker_rank += 1
pod.heter_workers.append(heter_worker)
cluster.pods.append(pod)
pod = cluster.pods[self.node_rank]
self.gloo_rendezvous_dir = tempfile.mkdtemp()
# 3. subproces start
self.procs = {"worker": [], "server": [], "heter_worker": []}
self.cmds = {"worker": [], "server": [], "heter_worker": []}
self.log_fns = {"worker": [], "server": [], "heter_worker": []}
self.start_pod_server(self.args, pod)
self.start_pod_worker(self.args, pod)
self.start_pod_heter_worker(self.args, pod)
logger.info(
"Please check servers, workers and heter_worker logs in {}/workerlog.*, {}/serverlog.* and {}/heterlog.*".
format(self.args.log_dir, self.args.log_dir, self.args.log_dir))
# 4. wait for finish training
if len(self.procs["worker"]) > 0:
# if node has worker procs
# only wait worker to finish here
for i, proc in enumerate(self.procs["worker"]):
self.procs["worker"][i].proc.wait()
if len(self.log_fns["worker"]) > 0:
self.log_fns["worker"][i].close()
logger.info(
"all workers exit, going to finish parameter server and heter_worker."
)
if len(self.procs["heter_worker"]) > 0:
for i, proc in enumerate(self.procs["heter_worker"]):
self.log_fns["heter_worker"][i].close()
self.procs["heter_worker"][i].proc.terminate()
logger.info("all heter_worker are killed")
if len(self.procs["server"]) > 0:
for i, proc in enumerate(self.procs["server"]):
self.log_fns["server"][i].close()
self.procs["server"][i].proc.terminate()
logger.info("all parameter server are killed")
else:
# if node has not worker procs
# blocking training process
if len(self.procs["server"]) > 0:
for i, proc in enumerate(self.procs["server"]):
self.procs["server"][i].proc.wait()
if len(self.procs["heter_worker"]) > 0:
for i, proc in enumerate(self.procs["heter_worker"]):
self.procs["heter_worker"][i].proc.wait()
if os.path.exists(self.gloo_rendezvous_dir):
shutil.rmtree(self.gloo_rendezvous_dir)
def start_pod_server(self, args, pod):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
for idx, cur_server in enumerate(pod.servers):
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints,
"PADDLE_HETER_TRAINER_IP_PORT_LIST":
self.heter_worker_endpoints,
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(self.worker_num),
"POD_IP": cur_server.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": "1",
"PADDLE_GLOO_RENDEZVOUS": "2",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
self.cmds["server"].append(cmd)
if idx == 0:
logger.info(
"Local server start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.servers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"
))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w")
self.log_fns["server"].append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_server.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
self.procs["server"].append(tp)
def start_pod_worker(self, args, pod):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
heter_device_num = 0
device_list = []
if fluid.core.is_compiled_with_cuda():
device_list = get_gpus(args.gpus)
heter_device_num = len(device_list)
elif fluid.core.is_compiled_with_xpu():
heter_device_num = fluid.core.get_xpu_device_count()
device_list = [str(x) for x in range(0, heter_device_num)]
for idx, cur_worker in enumerate(pod.workers):
device_id = str(device_list[idx % heter_device_num])
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints,
"PADDLE_TRAINERS_NUM": str(self.worker_num),
"PADDLE_HETER_TRAINER_IP_PORT_LIST":
self.heter_worker_endpoints,
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank),
"PADDLE_WITH_GLOO": "1",
"PADDLE_GLOO_RENDEZVOUS": "2",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"FLAGS_selected_gpus": "0",
"FLAGS_selected_xpus": "0",
"CUDA_VISIBLE_DEVICES": device_id,
"XPU_VISIBLE_DEVICES": device_id,
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
self.cmds["worker"].append(cmd)
if idx == 0:
logger.info(
"Local worker start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.workers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"
))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w")
self.log_fns["worker"].append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_worker.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
self.procs["worker"].append(tp)
def start_pod_heter_worker(self, args, pod):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
heter_device_num = 0
device_list = []
if fluid.core.is_compiled_with_cuda():
device_list = get_gpus(args.gpus)
heter_device_num = len(device_list)
elif fluid.core.is_compiled_with_xpu():
heter_device_num = fluid.core.get_xpu_device_count()
device_list = [str(x) for x in range(0, heter_device_num)]
assert heter_device_num != 0
for idx, cur_heter_worker in enumerate(pod.heter_workers):
device_id = str(device_list[idx % heter_device_num])
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints,
"PADDLE_HETER_TRAINER_IP_PORT_LIST":
self.heter_worker_endpoints,
"PADDLE_PORT": cur_heter_worker.endpoint.split(":")[1],
"TRAINING_ROLE": "HETER_TRAINER",
"PADDLE_TRAINERS_NUM": str(self.worker_num),
"POD_IP": cur_heter_worker.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": "1",
"PADDLE_GLOO_RENDEZVOUS": "2",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"FLAGS_selected_gpus": "0",
"FLAGS_selected_xpus": "0",
"CUDA_VISIBLE_DEVICES": device_id,
"XPU_VISIBLE_DEVICES": device_id,
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
self.cmds["heter_worker"].append(cmd)
if idx == 0:
logger.info(
"Local heter_worker start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.heter_workers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"
))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/heterlog.%d" % (args.log_dir, idx), "w")
self.log_fns["heter_worker"].append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
tp = TrainerProc()
tp.proc = proc
tp.rank = cur_heter_worker.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
self.procs["heter_worker"].append(tp)
...@@ -74,6 +74,8 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -74,6 +74,8 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_startup = worker.delet_extra_optimizes_pass(_startup, _startup = worker.delet_extra_optimizes_pass(_startup,
compiled_config) compiled_config)
compiled_config.set_origin_ps_main_program(_main)
compiled_config.set_origin_ps_startup_program(_startup)
# for heter program # for heter program
if self.role_maker._is_heter_parameter_server_mode: if self.role_maker._is_heter_parameter_server_mode:
from paddle.fluid.incubate.fleet.parameter_server.ir import heter_trainer_pass as heter_worker from paddle.fluid.incubate.fleet.parameter_server.ir import heter_trainer_pass as heter_worker
...@@ -91,6 +93,8 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -91,6 +93,8 @@ class ParameterServerOptimizer(MetaOptimizerBase):
else: else:
_main = worker.append_send_ops_pass(_main, compiled_config) _main = worker.append_send_ops_pass(_main, compiled_config)
_startup = _startup _startup = _startup
compiled_config.set_origin_ps_main_program(_main)
compiled_config.set_origin_ps_startup_program(_startup)
return _main, _startup return _main, _startup
......
...@@ -210,18 +210,23 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -210,18 +210,23 @@ class ParameterServerRuntime(RuntimeBase):
warnings.warn("communicator has been initialized, skip") warnings.warn("communicator has been initialized, skip")
def _get_executor(self): def _get_executor(self):
if self.role_maker._is_heter_worker():
if self.role_maker._get_heter_worker_device() == "GPU":
gpu_id = int(os.getenv("FLAGS_selected_gpus", "0"))
executor = Executor(fluid.CUDAPlace(gpu_id))
elif self.role_maker._get_heter_worker_device() == "XPU":
xpu_id = int(os.getenv("FLAGS_selected_xpus", "0"))
executor = Executor(fluid.XPUPlace(xpu_id))
else:
raise ValueError("Not Support Device {}".format(
self.role_maker._get_heter_worker_device()))
else:
executor = fluid.Executor(fluid.CPUPlace()) executor = fluid.Executor(fluid.CPUPlace())
if self.role_maker._is_heter_parameter_server_mode:
heter_worker_device_guard = self.context[
"valid_strategy"].a_sync_configs[
"heter_worker_device_guard"].upper()
if heter_worker_device_guard not in ["GPU", "XPU", "CPU"]:
raise ValueError("Heter Worker Not Support Device {}".format(
heter_worker_device_guard))
if self.role_maker._is_heter_worker():
if heter_worker_device_guard == "GPU":
executor = Executor(
fluid.CUDAPlace(
int(os.getenv("FLAGS_selected_gpus", "0"))))
elif heter_worker_device_guard == "XPU":
executor = Executor(
fluid.XPUPlace(
int(os.getenv("FLAGS_selected_xpus", "0"))))
return executor return executor
def _init_server(self, *args, **kwargs): def _init_server(self, *args, **kwargs):
...@@ -233,12 +238,14 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -233,12 +238,14 @@ class ParameterServerRuntime(RuntimeBase):
model_dirname = None model_dirname = None
executor = self._get_executor() executor = self._get_executor()
if self.role_maker._is_heter_worker() and self.context[
"valid_strategy"].a_sync_configs["launch_barrier"]:
# for heter trainer wait server ready
wait_server_ready(self.role_maker._get_pserver_endpoints())
executor.run(fluid.default_startup_program()) executor.run(fluid.default_startup_program())
if self.role_maker._is_heter_worker(): if self.role_maker._is_heter_worker():
self._init_worker() self._init_worker()
if self.role_maker._is_heter_worker():
return return
if not model_dirname: if not model_dirname:
...@@ -470,13 +477,13 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -470,13 +477,13 @@ class ParameterServerRuntime(RuntimeBase):
def _save_distributed_persistables(self, executor, dirname, main_program): def _save_distributed_persistables(self, executor, dirname, main_program):
dense_ctx = self.compiled_strategy.get_communicator_recv_context( dense_ctx = self.compiled_strategy.get_communicator_recv_context(
recv_type=1) recv_type=1, use_origin_program=True)
sparse_ctx = self.compiled_strategy.get_communicator_recv_context( sparse_ctx = self.compiled_strategy.get_communicator_recv_context(
recv_type=2) recv_type=2, use_origin_program=True)
distributed_ctx = self.compiled_strategy.get_communicator_recv_context( distributed_ctx = self.compiled_strategy.get_communicator_recv_context(
recv_type=3) recv_type=3, use_origin_program=True)
recv_dense_varnames = self._save_dense_params(executor, dirname, recv_dense_varnames = self._save_dense_params(executor, dirname,
dense_ctx, main_program) dense_ctx, main_program)
...@@ -528,7 +535,7 @@ class ParameterServerRuntime(RuntimeBase): ...@@ -528,7 +535,7 @@ class ParameterServerRuntime(RuntimeBase):
) )
if main_program is None: if main_program is None:
main_program = fluid.default_main_program() main_program = self.compiled_strategy.get_origin_ps_main_program()
if isinstance(main_program, CompiledProgram): if isinstance(main_program, CompiledProgram):
raise TypeError( raise TypeError(
......
...@@ -133,6 +133,8 @@ class CompileTimeStrategy(object): ...@@ -133,6 +133,8 @@ class CompileTimeStrategy(object):
self.origin_main_program = main_program self.origin_main_program = main_program
self.origin_startup_program = startup_program self.origin_startup_program = startup_program
self.origin_ps_main_program = main_program
self.origin_ps_startup_program = startup_program
self.strategy = strategy self.strategy = strategy
self.role_maker = role_maker self.role_maker = role_maker
...@@ -153,6 +155,11 @@ class CompileTimeStrategy(object): ...@@ -153,6 +155,11 @@ class CompileTimeStrategy(object):
self._build_var_distributed() self._build_var_distributed()
# for heter-ps save variables
self.origin_merged_variables_pairs = list(self.merged_variables_pairs)
self.origin_merged_dense_pairs = list(self.merged_dense_pairs)
self.origin_merged_sparse_pairs = list(self.merged_sparse_pairs)
def get_distributed_mode(self): def get_distributed_mode(self):
trainer = self.strategy.get_trainer_runtime_config() trainer = self.strategy.get_trainer_runtime_config()
return trainer.mode return trainer.mode
...@@ -214,6 +221,18 @@ class CompileTimeStrategy(object): ...@@ -214,6 +221,18 @@ class CompileTimeStrategy(object):
def get_origin_startup_program(self): def get_origin_startup_program(self):
return self.origin_startup_program return self.origin_startup_program
def set_origin_ps_main_program(self, program):
self.origin_ps_main_program = program
def set_origin_ps_startup_program(self, program):
self.origin_ps_startup_program = program
def get_origin_ps_main_program(self):
return self.origin_ps_main_program
def get_origin_ps_startup_program(self):
return self.origin_ps_startup_program
def get_sparse_varname_on_ps(self, is_distributed, endpoint=None): def get_sparse_varname_on_ps(self, is_distributed, endpoint=None):
if not endpoint: if not endpoint:
endpoint = self.get_ps_endpoint() endpoint = self.get_ps_endpoint()
...@@ -378,7 +397,9 @@ class CompileTimeStrategy(object): ...@@ -378,7 +397,9 @@ class CompileTimeStrategy(object):
send_ctx[name] = ctx send_ctx[name] = ctx
return send_ctx return send_ctx
def get_communicator_recv_context(self, recv_type=1): def get_communicator_recv_context(self,
recv_type=1,
use_origin_program=False):
# recv_type # recv_type
# 1 : DENSE 2. SPARSE 3. DISTRIBUTED 4. ALL # 1 : DENSE 2. SPARSE 3. DISTRIBUTED 4. ALL
distibuted_varnames = get_sparse_tablenames(self.origin_main_program, distibuted_varnames = get_sparse_tablenames(self.origin_main_program,
...@@ -392,7 +413,8 @@ class CompileTimeStrategy(object): ...@@ -392,7 +413,8 @@ class CompileTimeStrategy(object):
sparse_recv_ctx = {} sparse_recv_ctx = {}
distributed_recv_ctx = {} distributed_recv_ctx = {}
for merged in self.merged_variables_pairs: variables_pairs = self.merged_variables_pairs if not use_origin_program else self.origin_merged_variables_pairs
for merged in variables_pairs:
params = merged[0] params = merged[0]
if params.merged_var.name in sparse_varnames: if params.merged_var.name in sparse_varnames:
continue continue
......
...@@ -169,6 +169,10 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): ...@@ -169,6 +169,10 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase):
except fluid.core.EOFException: except fluid.core.EOFException:
self.reader.reset() self.reader.reset()
if fleet.is_first_worker():
model_path = tempfile.mkdtemp()
fleet.save_persistables(executor=exe, dirname=model_path)
shutil.rmtree(model_path)
fleet.stop_worker() fleet.stop_worker()
def do_dataset_training(self, fleet): def do_dataset_training(self, fleet):
......
...@@ -20,8 +20,12 @@ from paddle.fluid.incubate.fleet.base import role_maker ...@@ -20,8 +20,12 @@ from paddle.fluid.incubate.fleet.base import role_maker
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
input_y = fluid.layers.cast(input_y, dtype="float32")
with fluid.device_guard("gpu"):
input_y = fluid.layers.cast(input_y, dtype="int64")
cost = mlp(input_x, input_y)
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.Adagrad(learning_rate=0.01) optimizer = fluid.optimizer.Adagrad(learning_rate=0.01)
role = role_maker.PaddleCloudRoleMaker() role = role_maker.PaddleCloudRoleMaker()
......
...@@ -288,7 +288,7 @@ class TestFleetHeterBase(unittest.TestCase): ...@@ -288,7 +288,7 @@ class TestFleetHeterBase(unittest.TestCase):
print("tr end communicate") print("tr end communicate")
tr0_ret = tr0.returncode tr0_ret = tr0.returncode
tr1_ret = tr0.returncode tr1_ret = tr1.returncode
# close trainer file # close trainer file
tr0_pipe.close() tr0_pipe.close()
......
...@@ -50,6 +50,10 @@ class TestDistFleetHeterProgram(unittest.TestCase): ...@@ -50,6 +50,10 @@ class TestDistFleetHeterProgram(unittest.TestCase):
def build_strategy(self): def build_strategy(self):
self.strategy = paddle.distributed.fleet.DistributedStrategy() self.strategy = paddle.distributed.fleet.DistributedStrategy()
self.strategy.a_sync = True self.strategy.a_sync = True
self.strategy.a_sync_configs = {
"launch_barrier": False,
"heter_worker_device_guard": "gpu"
}
return self.strategy return self.strategy
def build_input(self): def build_input(self):
......
...@@ -28,13 +28,27 @@ function test_launch_ps(){ ...@@ -28,13 +28,27 @@ function test_launch_ps(){
fi fi
} }
function test_launch_ps_heter(){
fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 fleet_ps_training.py 2> ut.elog
if grep -q "server are killed" ut.elog; then
echo "test heter pserver launch succeed"
else
echo "test pserver launch failed"
exit -1
fi
}
if [[ ${WITH_GPU} == "OFF" ]]; then if [[ ${WITH_GPU} == "OFF" ]]; then
echo "in cpu test mode"
test_launch_ps test_launch_ps
exit 0 exit 0
fi fi
echo "No.1 unittest"
test_launch_ps test_launch_ps
test_launch_ps_heter
# use default values # use default values
echo "No.2 unittest"
fleetrun multi_process.py fleetrun fleetrun multi_process.py fleetrun
# use paddlecloud # use paddlecloud
...@@ -48,6 +62,7 @@ export PADDLE_TRAINER_ID=0 ...@@ -48,6 +62,7 @@ export PADDLE_TRAINER_ID=0
export PADDLE_PORT=35789 export PADDLE_PORT=35789
export TRAINER_PORTS_NUM=2 export TRAINER_PORTS_NUM=2
echo "No.3 unittest"
distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog" distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog"
CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun
...@@ -83,7 +98,7 @@ fi ...@@ -83,7 +98,7 @@ fi
unset PADDLE_PORT unset PADDLE_PORT
export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171
echo "" echo "No.4 unittest"
echo "paddle.distributed.launch async poll process test" echo "paddle.distributed.launch async poll process test"
if ! CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun abort; then if ! CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun abort; then
echo "train abort as planned" echo "train abort as planned"
...@@ -112,5 +127,6 @@ rm -rf $file_0_0 $file_0_1 ...@@ -112,5 +127,6 @@ rm -rf $file_0_0 $file_0_1
distributed_args="--gpus=0,1 --log_dir=testlog" distributed_args="--gpus=0,1 --log_dir=testlog"
export PADDLE_LAUNCH_LOG="test_launch_filelock_0" export PADDLE_LAUNCH_LOG="test_launch_filelock_0"
echo "No.5 unittest"
CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} find_ports.py CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} find_ports.py
str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071" str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test cloud role maker."""
from __future__ import print_function
import os
import platform
import shutil
import tempfile
import unittest
import paddle
import paddle.distributed.fleet.base.role_maker as role_maker
class TestPSCloudRoleMakerCase1(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
def test_paddle_trainers_num(self):
# PADDLE_TRAINERS_NUM
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
class TestPSCloudRoleMakerCase2(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
os.environ["PADDLE_TRAINERS_NUM"] = str(2)
def test_training_role(self):
# TRAINING_ROLE
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
class TestPSCloudRoleMakerCase3(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
os.environ["PADDLE_TRAINERS_NUM"] = str(2)
os.environ["TRAINING_ROLE"] = 'TRAINER'
def test_trainer_id(self):
# PADDLE_TRAINER_ID
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
class TestPSCloudRoleMakerCase4(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
os.environ["PADDLE_TRAINERS_NUM"] = str(2)
os.environ["TRAINING_ROLE"] = 'PSERVER'
def test_ps_port(self):
# PADDLE_PORT
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
class TestPSCloudRoleMakerCase5(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
os.environ["PADDLE_TRAINERS_NUM"] = str(2)
os.environ["TRAINING_ROLE"] = 'PSERVER'
os.environ["PADDLE_PORT"] = str(4001)
def test_ps_ip(self):
# POD_IP
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
class TestPSCloudRoleMakerCase6(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
os.environ[
"PADDLE_HETER_TRAINER_IP_PORT_LIST"] = "127.0.0.1:4003,127.0.0.1:4004"
os.environ["PADDLE_TRAINERS_NUM"] = str(2)
os.environ["TRAINING_ROLE"] = 'HETER_TRAINER'
def test_heter_port(self):
# PADDLE_PORT
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
class TestPSCloudRoleMakerCase7(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMake Parameter Server.
"""
def setUp(self):
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002"
os.environ[
"PADDLE_HETER_TRAINER_IP_PORT_LIST"] = "127.0.0.1:4003,127.0.0.1:4004"
os.environ["PADDLE_TRAINERS_NUM"] = str(2)
os.environ["TRAINING_ROLE"] = 'HETER_TRAINER'
os.environ["PADDLE_PORT"] = str(4003)
def test_heter_ip(self):
# POD_IP
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
self.assertRaises(ValueError, ro._generate_role)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册