diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 21e28d7ac86d06571e49a522db13c17c0ebf33be..881ef30ffe6903716ebe53256e7623b41f276eb7 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -98,6 +98,7 @@ message AsyncConfig { optional int32 send_wait_times = 7 [ default = 1 ]; optional bool runtime_split_send_recv = 8 [ default = false ]; optional bool launch_barrier = 9 [ default = true ]; + optional string heter_worker_device_guard = 10 [ default = 'cpu' ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index deba3b4a17d1bad1d558eb1d75e064d851f804f3..ce9826d7e59ae2441eb838adb0cccbf7512599db 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -530,13 +530,6 @@ class RoleMakerBase(object): return self._heter_trainer_endpoints[(self._current_id) % self._heter_worker_num()] - def _get_heter_worker_device(self): - """ - Returns: - string: heter_trainer's device of current node, e.g: CPU/GPU/XPU - """ - return self._heter_trainer_device.upper() - class PaddleCloudRoleMaker(RoleMakerBase): def __init__(self, is_collective=False, **kwargs): @@ -677,88 +670,99 @@ class PaddleCloudRoleMaker(RoleMakerBase): return self._role == Role.HETER_WORKER def _ps_env(self): - try: - # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set - # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 - self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") - - if self._server_endpoints is None: - # back to non_distributed execution. - self._server_endpoints = "" - self._trainers_num = 1 - self._role = Role.WORKER - self._current_id = 0 - self._nodes_num = 1 - self._heter_trainers_num = 0 - self._heter_trainer_endpoints = None - self._non_distributed = True - return - - self._server_endpoints = self._server_endpoints.split(",") - - self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") - if self._worker_endpoints: - self._worker_endpoints = self._worker_endpoints.split(",") - else: - self._worker_endpoints = [] + # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set + # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 + self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST", None) + + if self._server_endpoints is None: + # back to non_distributed execution. + self._server_endpoints = "" + self._trainers_num = 1 + self._role = Role.WORKER + self._current_id = 0 + self._nodes_num = 1 + self._heter_trainers_num = 0 + self._heter_trainer_endpoints = None + self._non_distributed = True + return + + self._server_endpoints = self._server_endpoints.split(",") + + self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", None) + if self._worker_endpoints != None: + self._worker_endpoints = self._worker_endpoints.split(",") + else: + self._worker_endpoints = [] + + trainers_num = os.getenv("PADDLE_TRAINERS_NUM", None) + if trainers_num == None: + raise ValueError( + "Can not find PADDLE_TRAINERS_NUM, please check your environment." + ) + trainers_num = int(trainers_num) - trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) - training_role = os.environ["TRAINING_ROLE"] + training_role = os.getenv("TRAINING_ROLE", None) + if training_role == None: + raise ValueError( + "Can not find TRAINING_ROLE, please check your environment.") - if training_role not in ["TRAINER", "PSERVER", "HETER_TRAINER"]: + if training_role not in ["TRAINER", "PSERVER", "HETER_TRAINER"]: + raise ValueError( + "TRAINING_ROLE must be PSERVER or TRAINER or HETER_TRAINER, but get {}, please check your environment.". + format(training_role)) + + # For heter parameter server env setting + heter_trainer_eplist = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST", + "") + if heter_trainer_eplist != "": + try: + heter_trainer_eplist = os.environ[ + "PADDLE_HETER_TRAINER_IP_PORT_LIST"].split(",") + except: raise ValueError( - "TRAINING_ROLE must be PSERVER or TRAINER or HETER_TRAINER, but get {}, please check your environment.". - format(training_role)) - - # For heter parameter server env setting - heter_trainer_eplist = os.getenv( - "PADDLE_HETER_TRAINER_IP_PORT_LIST", None) - heter_trainer_device = os.getenv("PADDLE_HETER_TRAINER_DEVICE", - None) - if heter_trainer_eplist and heter_trainer_device: - try: - heter_trainer_eplist = os.environ[ - "PADDLE_HETER_TRAINER_IP_PORT_LIST"].split(",") - except: - raise ValueError( - "Can not Find PADDLE_HETER_TRAINER_IP_PORT_LIST in env or its format doesn't match the requirement: 'IP:PORT,IP:PORT' ." - ) - - self._is_heter_parameter_server_mode = True - heter_trainers_num = len(heter_trainer_eplist) - current_node_device = heter_trainer_device.upper() - if current_node_device not in ["CPU", "GPU", "XPU"]: - raise ValueError( - "Heter Trainer doesn't support {} device now, please use CPU / GPU / XPU(KunLun)". - format(heter_trainer_device)) - self._heter_trainer_device = current_node_device - else: - self._is_heter_parameter_server_mode = False - heter_trainers_num = 0 - - if training_role == "TRAINER": - role = Role.WORKER - current_id = int(os.environ["PADDLE_TRAINER_ID"]) - if len(self._worker_endpoints) > 0: - self._cur_endpoint = self._worker_endpoints[current_id] - elif training_role == "PSERVER": - role = Role.SERVER - port = os.environ["PADDLE_PORT"] - ip = os.environ["POD_IP"] - self._cur_endpoint = ip + ":" + port - current_id = self._server_endpoints.index(self._cur_endpoint) - elif training_role == "HETER_TRAINER": - role = Role.HETER_WORKER - cur_ip = os.environ["POD_IP"] - cur_port = os.environ["PADDLE_PORT"] - curr_endpoint = ":".join([cur_ip, cur_port]) - current_id = heter_trainer_eplist.index(curr_endpoint) - else: + "Can not Find PADDLE_HETER_TRAINER_IP_PORT_LIST in env or its format doesn't match the requirement: 'IP:PORT,IP:PORT' ." + ) + + self._is_heter_parameter_server_mode = True + heter_trainers_num = len(heter_trainer_eplist) + else: + self._is_heter_parameter_server_mode = False + heter_trainers_num = 0 + + if training_role == "TRAINER": + role = Role.WORKER + current_id = os.getenv("PADDLE_TRAINER_ID", None) + if current_id == None: raise ValueError( - "TRAINING_ROLE must be PSERVER or TRAINER or HETER_TRAINER") - except ValueError as e: - raise ValueError( - "Something wrong with PaddleCloud, please check environment") + "Can not find PADDLE_TRAINER_ID, please check your environment." + ) + current_id = int(current_id) + if len(self._worker_endpoints) > 0: + self._cur_endpoint = self._worker_endpoints[current_id] + elif training_role == "PSERVER": + role = Role.SERVER + port = os.getenv("PADDLE_PORT", None) + if port == None: + raise ValueError( + "Can not find PADDLE_PORT, please check your environment.") + ip = os.getenv("POD_IP", None) + if ip == None: + raise ValueError( + "Can not find POD_IP, please check your environment.") + self._cur_endpoint = ip + ":" + port + current_id = self._server_endpoints.index(self._cur_endpoint) + elif training_role == "HETER_TRAINER": + role = Role.HETER_WORKER + cur_port = os.getenv("PADDLE_PORT", None) + if cur_port == None: + raise ValueError( + "Can not find PADDLE_PORT, please check your environment.") + cur_ip = os.getenv("POD_IP", None) + if cur_ip == None: + raise ValueError( + "Can not find POD_IP, please check your environment.") + curr_endpoint = ":".join([cur_ip, cur_port]) + current_id = heter_trainer_eplist.index(curr_endpoint) self._trainers_num = trainers_num self._role = role diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 015d59b516e94ef0fe322fefda403b7bec7b1804..2e23a915454fa55ee7e060a7bb8ec92ac6d08a4f 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -89,14 +89,16 @@ def _parse_args(): description='''start paddle training using multi-process mode. see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- ''') + base_group = parser.add_argument_group("Base Parameters") - # Optional arguments for the launch helper - parser.add_argument( - "--ips", + base_group.add_argument( + "--log_dir", type=str, - default="127.0.0.1", - help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") - parser.add_argument( + default="log", + help="The path for each process's log.If it's not set, the log will printed to default pipe." + ) + + base_group.add_argument( "--gpus", type=str, default=None, @@ -104,22 +106,7 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." ) - parser.add_argument( - "--servers", type=str, default="", help="User defined servers ip:port") - parser.add_argument( - "--workers", type=str, default="", help="User defined workers ip:port") - parser.add_argument("--worker_num", type=int, help="number of workers") - - parser.add_argument("--server_num", type=int, help="number of servers") - - parser.add_argument( - "--log_dir", - type=str, - default="log", - help="The path for each process's log.If it's not set, the log will printed to default pipe." - ) - # positional - parser.add_argument( + base_group.add_argument( "training_script", type=str, help="The full path to the single GPU training " @@ -127,8 +114,34 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra "followed by all the arguments for the " "training script") - # rest from the training program - parser.add_argument('training_script_args', nargs=REMAINDER) + base_group.add_argument('training_script_args', nargs=REMAINDER) + + # Optional arguments for the launch helper + # for collective + collective_group = parser.add_argument_group("Collective Parameters") + collective_group.add_argument( + "--ips", + type=str, + default="127.0.0.1", + help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") + + ps_group = parser.add_argument_group("Parameter-Server Parameters") + # for parameter server + ps_group.add_argument( + "--servers", type=str, default="", help="User defined servers ip:port") + ps_group.add_argument( + "--workers", type=str, default="", help="User defined workers ip:port") + ps_group.add_argument( + "--heter_workers", + type=str, + default="", + help="User defined heter workers ip:port") + + ps_group.add_argument("--worker_num", type=int, help="number of workers") + ps_group.add_argument("--server_num", type=int, help="number of servers") + ps_group.add_argument( + "--heter_worker_num", type=int, help="number of heter_workers") + return parser.parse_args() @@ -166,35 +179,6 @@ def get_cluster_from_args(args, gpus): return get_cluster(node_ips, node_ip, trainer_endpoints, gpus) -def get_gpus(gpus): - if gpus is None: - gpus_num = fluid.core.get_cuda_device_count() - res_gpus = [str(x) for x in range(0, gpus_num)] - else: - cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") - if cuda_visible_devices is None or cuda_visible_devices == "": - res_gpus = [x.strip() for x in gpus.split(',')] - else: - # change gpus into relative values - # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; - # therefore gpus=0,1,2,3 - cuda_visible_devices_list = cuda_visible_devices.split(',') - for x in gpus.split(','): - assert x in cuda_visible_devices_list, "Can't find "\ - "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ - % (x, cuda_visible_devices) - res_gpus = [ - cuda_visible_devices_list.index(x.strip()) - for x in gpus.split(',') - ] - logger.info("Change selected_gpus into reletive values. --ips:{} " - "will change into relative_ips:{} according to your " - "CUDA_VISIBLE_DEVICES:{}".format( - gpus, res_gpus, cuda_visible_devices_list)) - - return res_gpus - - def launch_collective(args): # parse arguments, used for cloud-single-machine and local gpus = get_gpus(args.gpus) @@ -245,209 +229,37 @@ def launch_collective(args): shutil.rmtree(gloo_rendezvous_dir) -def launch_ps(args): - ports = None - start_port = 6170 - if args.server_num: - server_num = args.server_num - ports = get_ports(server_num, 0) - server_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - else: - assert args.servers != "", "The setting of CPU mode must be either server_num or servers." - server_endpoints = args.servers - server_endpoints_ips = [ - x.strip().split(":")[0] for x in server_endpoints.split(",") - ] - server_endpoints_port = [ - x.strip().split(":")[1] for x in server_endpoints.split(",") +def launch_ps(args, distribute_mode): + cloud_flag = cloud_utils.use_paddlecloud() + + # for ps-cpu on paddlecloud + if cloud_flag and distribute_mode == DistributeMode.PS: + direct_start(args) + return + elif cloud_flag and distribute_mode == DistributeMode.PS_HETER: + cloud_ps_heter_env_set(args) + args.workers = os.getenv("PADDLE_TRAINER_ENDPOINTS") + args.servers = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") + args.heter_workers = os.getenv("PADDLE_HETER_TRAINER_IP_PORT_LIST") + + ps_launcher = ParameterServerLauncher(args, distribute_mode) + ps_launcher.start_ps() + return + + +def which_distributed_mode(args): + ps_args = [ + '--worker_num', + '--server_num', + '--heter_worker_num', + '--servers', + '--workers', + '--heter_workers', ] - server_num = len(server_endpoints_ips) - - if args.worker_num: - worker_num = args.worker_num - ports = get_ports(worker_num, server_num) - worker_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - else: - assert args.workers != "", "The setting of CPU mode must be either worker_num or workers." - worker_endpoints = args.workers - worker_endpoints_ips = [ - x.strip().split(":")[0] for x in worker_endpoints.split(",") - ] - worker_num = len(worker_endpoints_ips) - node_ips = list(set(server_endpoints_ips + worker_endpoints_ips)) - worker_endpoints_len = [ - len(x.strip().split(":")) for x in worker_endpoints.split(",") - ] - if 1 in worker_endpoints_len: - # if no port value in worker_endpoints, will set default port values. - worker_endpoints_port = range(start_port + server_num, - start_port + server_num + worker_num, 1) - else: - worker_endpoints_port = [ - x.strip().split(":")[1] for x in worker_endpoints.split(",") - ] - - # local train - if len(set(node_ips)) == 1: - current_node_ip = node_ips[0] - else: - _, current_node_ip = get_host_name_ip() - - assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ - % (current_node_ip, node_ips) - node_rank = node_ips.index(current_node_ip) - logger.debug( - "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}". - format(node_ips, current_node_ip, node_rank, server_endpoints_port)) - - cluster = Cluster(hdfs=None) - server_rank = 0 - worker_rank = 0 - for node_rank, ip in enumerate(node_ips): - pod = Pod() - pod.rank = node_rank - pod.addr = ip - for i in range(len(server_endpoints_ips)): - if ip == server_endpoints_ips[i]: - server = Trainer() - server.endpoint = "%s:%s" % (ip, server_endpoints_port[i]) - server.rank = server_rank - server_rank += 1 - pod.servers.append(server) - for j in range(len(worker_endpoints_ips)): - if ip == worker_endpoints_ips[j]: - worker = Trainer() - worker.endpoint = "%s:%s" % (ip, worker_endpoints_port[i]) - worker.rank = worker_rank - worker_rank += 1 - pod.workers.append(worker) - - cluster.pods.append(pod) - - pod_rank = node_ips.index(current_node_ip) - pod = cluster.pods[pod_rank] - - default_env = os.environ.copy() - current_env = copy.copy(default_env) - - gloo_rendezvous_dir = tempfile.mkdtemp() - # add gloo env - current_env["PADDLE_WITH_GLOO"] = "1" - current_env["PADDLE_GLOO_RENDEZVOUS"] = "3" - current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir - - current_env.pop("http_proxy", None) - current_env.pop("https_proxy", None) - procs = [] - cmds = [] - log_fns = [] - for idx, cur_server in enumerate(pod.servers): - proc_env = { - "PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints, - "PADDLE_TRAINER_ENDPOINTS": worker_endpoints, - "PADDLE_PORT": cur_server.endpoint.split(":")[1], - "TRAINING_ROLE": "PSERVER", - "PADDLE_TRAINERS_NUM": str(worker_num), - "POD_IP": cur_server.endpoint.split(":")[0] - } - current_env.update(proc_env) - - cmd = [sys.executable, "-u", args.training_script - ] + args.training_script_args - cmds.append(cmd) - - if idx == 0: - logger.info( - "Local server start {} processes. First process distributed " - "environment info (Only For Debug): {}".format( - len(pod.servers), - pretty_print_envs(proc_env, ("Distributed Envs", "Value")))) - - if args.log_dir is not None: - os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w") - log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) - else: - proc = subprocess.Popen(cmd, env=current_env) - - tp = TrainerProc() - tp.proc = proc - tp.rank = cur_server.rank - tp.local_rank = idx - tp.log_fn = fn - tp.log_offset = fn.tell() if fn else None - tp.cmd = cmd - - procs.append(tp) - - for idx, cur_worker in enumerate(pod.workers): - proc_env = { - "PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints, - "PADDLE_TRAINER_ENDPOINTS": worker_endpoints, - "PADDLE_TRAINERS_NUM": str(worker_num), - "TRAINING_ROLE": "TRAINER", - "PADDLE_TRAINER_ID": str(cur_worker.rank) - } - current_env.update(proc_env) - - cmd = [sys.executable, "-u", args.training_script - ] + args.training_script_args - cmds.append(cmd) - - if idx == 0: - logger.info( - "Local worker start {} processes. First process distributed " - "environment info (Only For Debug): {}".format( - len(pod.workers), - pretty_print_envs(proc_env, ("Distributed Envs", "Value")))) - - if args.log_dir is not None: - os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w") - log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) - else: - proc = subprocess.Popen(cmd, env=current_env) - - tp = TrainerProc() - tp.proc = proc - tp.rank = cur_worker.rank - tp.local_rank = idx - tp.log_fn = fn - tp.log_offset = fn.tell() if fn else None - tp.cmd = cmd - - procs.append(tp) - - logger.info( - "Please check servers and workers logs in {}/workerlog.* and {}/serverlog.*". - format(args.log_dir, args.log_dir)) - # only wait worker to finish here - for i, proc in enumerate(procs): - if i < len(pod.servers): - continue - procs[i].proc.wait() - if len(log_fns) > 0: - log_fns[i].close() - - print("all workers exit, going to finish parameter server", file=sys.stderr) - for i in range(len(pod.servers)): - if len(log_fns) > 0: - log_fns[i].close() - procs[i].proc.terminate() - print("all parameter server are killed", file=sys.stderr) - - if os.path.exists(gloo_rendezvous_dir): - shutil.rmtree(gloo_rendezvous_dir) + collective_args = ['--ips'] + ps_heter_args = ["--heter_worker_num", "--heter_workers"] -def launch(): - args = _parse_args() - logger = get_logger() - _print_arguments(args) - ps_args = ['--worker_num', '--server_num', '--servers', '--workers'] - collective_args = ['--ips', '--gpus'] has_ps_args = [ ps_arg for ps_arg in ps_args if ps_arg in " ".join(sys.argv[1:-1]) ] @@ -455,23 +267,46 @@ def launch(): co_arg for co_arg in collective_args if co_arg in " ".join(sys.argv[1:-1]) ] + + if len(has_ps_args) > 1 and len(has_collective_args) > 1: + raise ValueError( + "Only one mode(Collective or Parameter-Server) can be selected at the same time, but more than one configuration was received." + ) + if fluid.core.is_compiled_with_cuda(): cuda_device_num = fluid.core.get_cuda_device_count() else: cuda_device_num = 0 - if len(has_ps_args) > 0 or cuda_device_num == 0: - logger.info("Run parameter-sever cpu mode. pserver arguments:{}".format( - has_ps_args)) - launch_ps(args) + if len(has_ps_args) > 0: + logger.info( + "Run parameter-sever mode. pserver arguments:{}, cuda count:{}". + format(has_ps_args, cuda_device_num)) + has_ps_heter_args = list(set(has_ps_args) & set(ps_heter_args)) + if len(has_ps_heter_args) > 0: + return DistributeMode.PS_HETER + else: + return DistributeMode.PS elif len(has_collective_args) > 0: logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}". format(has_collective_args, cuda_device_num)) - launch_collective(args) + return DistributeMode.COLLECTIVE else: logger.warning( "Not found distinct arguments. Default use gpu collective mode") + return DistributeMode.COLLECTIVE + + +def launch(): + args = _parse_args() + logger = get_logger() + _print_arguments(args) + + distribute_mode = which_distributed_mode(args) + if distribute_mode == DistributeMode.COLLECTIVE: launch_collective(args) + else: + launch_ps(args, distribute_mode) if __name__ == "__main__": diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 7540cd9f4c1f352804550561c6f75b63104f9381..35782e0b04c5a54599367351e5c9c76ab384fba7 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -21,13 +21,27 @@ import signal import copy import sys import subprocess +import tempfile +import shutil from contextlib import closing import socket +import warnings +import paddle +import paddle.fluid as fluid logger = logging.getLogger("root") logger.propagate = False +class DistributeMode: + """ + There are various mode for fleetrun, each of them is designed for different model. + """ + COLLECTIVE = 0 + PS = 1 + PS_HETER = 2 + + class Cluster(object): def __init__(self, hdfs): self.job_server = None @@ -144,14 +158,16 @@ class Pod(object): self.trainers = [] self.servers = [] self.workers = [] + self.heter_workers = [] self.gpus = [] def __str__(self): return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \ - workers:{}".format(self.rank, self.id, self.addr, self.port, - self.gpus, [str(t) for t in self.trainers], - [str(s) for s in self.servers], - [str(w) for w in self.workers]) + workers:{} heter_workers:{}".format( + self.rank, self.id, self.addr, self.port, self.gpus, [ + str(t) for t in self.trainers + ], [str(s) for s in self.servers], [str(w) for w in self.workers], + [str(h) for h in self.heter_workers]) def __eq__(self, pod): if self.rank != pod.rank or \ @@ -262,7 +278,7 @@ def terminate_local_procs(procs): p.log_fn.close() logger.debug("terminate process id:{}".format(p.proc.pid)) - #wait all process terminiated + # wait all process terminiated time.sleep(3) for step in range(0, 50): alive = False @@ -406,10 +422,10 @@ def start_local_trainers(cluster, else: current_env = copy.copy(envs) - #paddle broadcast ncclUniqueId use socket, and - #proxy maybe make trainers unreachable, so delete them. - #if we set them to "", grpc will log error message "bad uri" - #so just delete them. + # paddle broadcast ncclUniqueId use socket, and + # proxy maybe make trainers unreachable, so delete them. + # if we set them to "", grpc will log error message "bad uri" + # so just delete them. current_env.pop("http_proxy", None) current_env.pop("https_proxy", None) @@ -518,3 +534,524 @@ def watch_local_trainers(procs, nranks): raise return alive + + +def get_gpus(gpus): + if gpus is None: + gpus_num = fluid.core.get_cuda_device_count() + res_gpus = [str(x) for x in range(0, gpus_num)] + else: + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + res_gpus = [x.strip() for x in gpus.split(',')] + else: + # change gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; + # therefore gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in gpus.split(','): + assert x in cuda_visible_devices_list, "Can't find "\ + "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + res_gpus = [ + cuda_visible_devices_list.index(x.strip()) + for x in gpus.split(',') + ] + logger.info("Change selected_gpus into reletive values. --ips:{} " + "will change into relative_ips:{} according to your " + "CUDA_VISIBLE_DEVICES:{}".format( + gpus, res_gpus, cuda_visible_devices_list)) + + return res_gpus + + +def direct_start(args): + # run ps-cpu mode on paddlecloud, using given envs + cmd = [sys.executable, "-u", args.training_script] + \ + args.training_script_args + proc = subprocess.Popen(cmd) + proc.wait() + return + + +def get_custom_endpoints(origin_endpoints, offset=0): + """ + origin_endpoint: ip:port + user_define_endpoint: ip:(port+offset) + """ + assert origin_endpoints != None + paddle_user_define_endpoints_list = [] + for ip_port in origin_endpoints.split(","): + ip = ip_port.split(":")[0] + port = ip_port.split(":")[1] + new_port = int(port) + offset + paddle_user_define_endpoints_list.append(":".join((ip, str(new_port)))) + paddle_user_define_endpoints = ",".join(paddle_user_define_endpoints_list) + return paddle_user_define_endpoints + + +def cloud_ps_heter_env_set(args): + environs = {} + + paddle_trainer_endpoints = os.getenv("TRAINER_IP_PORT_LIST", "") + assert paddle_trainer_endpoints != None + + paddle_pserver_endpoints = os.getenv("PSERVER_IP_PORT_LIST", "") + assert paddle_pserver_endpoints != None + + # hard code for paddlecloud custom-framework + avilable_ports = os.getenv("TRAINER_PORTS", "").split(",") + assert len( + avilable_ports + ) > 3, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit" + + # hard code for paddlecloud custom-framework + trainers_num = len(paddle_pserver_endpoints.split(",")) + assert trainers_num != 0 + environs["PADDLE_TRAINERS_NUM"] = trainers_num + environs["TRAINERS_NUM"] = trainers_num + + # hard code for paddlecloud custom-framework + environs["PADDLE_HETER_TRAINER_IP_PORT_LIST"] = paddle_trainer_endpoints + environs["PADDLE_PSERVERS_IP_PORT_LIST"] = paddle_pserver_endpoints + environs["PADDLE_TRAINER_ENDPOINTS"] = get_custom_endpoints( + paddle_pserver_endpoints, 1) + heter_worker_num = len(paddle_trainer_endpoints.split(",")) + if (args.heter_worker_num != None) and ( + heter_worker_num != args.heter_worker_num): + warnings.warn( + "Your fleetrun setting: heter_worker_num is {}, but we find {} device can be used, this setting has been changed.". + format(args.heter_worker_num, heter_worker_num)) + args.heter_worker_num = heter_worker_num + + for k, v in environs.items(): + os.environ[k] = str(v) + logger.info("Set heter parameter server env: {}".format( + pretty_print_envs(environs))) + + +class ParameterServerLauncher(object): + def __init__(self, args, distribute_mode): + self.args = args + self.distribute_mode = distribute_mode + self.server_num = 0 + self.worker_num = 0 + self.heter_worker_num = 0 + + self.server_endpoints = "" + self.server_endpoints_ips = [] + self.server_endpoints_port = [] + + self.worker_endpoints = "" + self.worker_endpoints_ips = [] + self.worker_endpoints_port = [] + + self.heter_worker_endpoints = "" + self.heter_worker_endpoints_ips = [] + self.heter_worker_endpoints_port = [] + + self.is_local = True + self.current_node_ip = "" + + self.get_role_endpoints(args) + + def get_role_endpoints(self, args): + # get server envs + if args.server_num: + self.server_num = args.server_num + if args.servers: + assert len( + args.servers.split(",") + ) == self.server_num, "The server_num and servers doesn't match. Expect servers endpoints num epual to server_num, but received servers enpoint num: {} and server_num {}".format( + len(args.servers.split(",")), self.server_num) + self.server_endpoints = args.servers + else: + ports = get_ports(self.server_num, 0) + self.server_endpoints = ",".join( + ["127.0.0.1:" + str(x) for x in ports]) + else: + assert args.servers != "", "The setting of Parameter-Server must has server_num or servers." + self.server_endpoints = args.servers + self.server_num = len(self.server_endpoints.split(",")) + + # get worker envs + if args.worker_num: + self.worker_num = args.worker_num + if args.workers: + assert len( + args.workers.split(",") + ) == self.worker_num, "The worker_num and workers doesn't match. Expect workers endpoints num epual to worker_num, but received workers enpoint num: {} and worker_num {}".format( + len(args.workers.split(",")), self.worker_num) + + self.worker_endpoints = args.workers + else: + ports = get_ports(self.worker_num, self.server_num) + self.worker_endpoints = ",".join( + ["127.0.0.1:" + str(x) for x in ports]) + else: + assert args.workers != "", "The setting of Parameter-Server must has worker_num or workers." + worker_endpoints_ips = [ + x.strip().split(":")[0] for x in args.workers.split(",") + ] + self.worker_num = len(worker_endpoints_ips) + worker_endpoints_len = [ + len(x.strip().split(":")) for x in args.workers.split(",") + ] + + if 1 in worker_endpoints_len: + # if no port value in worker_endpoints, will set default port values. + start_port = 6170 + worker_endpoints_port = range( + start_port + self.server_num, + start_port + self.server_num + self.worker_num, 1) + # create endpoints str + worker_endpoints = [] + for i in range(self.worker_num): + worker_endpoints.append(":".join((worker_endpoints_ips[ + i], str(worker_endpoints_port[i])))) + self.worker_endpoints = ",".join(worker_endpoints) + else: + self.worker_endpoints = args.workers + + # get heter worker envs + if self.distribute_mode == DistributeMode.PS_HETER: + if args.heter_worker_num: + self.heter_worker_num = args.heter_worker_num + if args.heter_workers: + assert len( + args.heter_workers.split(",") + ) == self.heter_worker_num, "The heter_worker_num and heter_workers doesn't match. Expect heter_workers endpoints num epual to heter_worker_num, but received heter_workers enpoint num: {} and heter_worker_num {}".format( + len(args.heter_workers.split(",")), + self.heter_worker_num) + self.heter_worker_endpoints = args.heter_workers + else: + ports = get_ports(self.heter_worker_num, + self.server_num + self.worker_num) + self.heter_worker_endpoints = ",".join( + ["127.0.0.1:" + str(x) for x in ports]) + else: + assert args.heter_workers != "", "The setting of Parameter-Server heter mode must has heter_worker_num or heter_workers." + self.heter_worker_endpoints = args.heter_workers + self.heter_worker_num = len( + self.heter_worker_endpoints.split(",")) + + # check local or user define + self.server_endpoints_ips = [ + x.strip().split(":")[0] for x in self.server_endpoints.split(",") + ] + self.worker_endpoints_ips = [ + x.strip().split(":")[0] for x in self.worker_endpoints.split(",") + ] + self.server_endpoints_port = [ + x.strip().split(":")[1] for x in self.server_endpoints.split(",") + ] + self.worker_endpoints_port = [ + x.strip().split(":")[1] for x in self.worker_endpoints.split(",") + ] + self.node_ips = list( + set(self.server_endpoints_ips + self.worker_endpoints_ips)) + if self.distribute_mode == DistributeMode.PS_HETER: + self.heter_worker_endpoints_ips = [ + x.strip().split(":")[0] + for x in self.heter_worker_endpoints.split(",") + ] + self.heter_worker_endpoints_port = [ + x.strip().split(":")[1] + for x in self.heter_worker_endpoints.split(",") + ] + self.node_ips = list( + set(self.node_ips + self.heter_worker_endpoints_ips)) + + if len(set(self.node_ips)) == 1: + self.is_local = True + self.current_node_ip = self.node_ips[0] + else: + self.is_local = False + pod_ip = os.getenv("POD_IP", None) + if pod_ip == None: + _, self.current_node_ip = get_host_name_ip() + else: + self.current_node_ip = pod_ip + assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ + % (self.current_node_ip, self.node_ips) + self.node_rank = self.node_ips.index(self.current_node_ip) + + logger.debug( + "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}". + format(self.node_ips, self.current_node_ip, self.node_rank)) + + def start_ps(self): + cluster = Cluster(hdfs=None) + server_rank = 0 + worker_rank = 0 + heter_worker_rank = 0 + + for node_rank, ip in enumerate(self.node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + for i in range(len(self.server_endpoints_ips)): + if ip == self.server_endpoints_ips[i]: + server = Trainer() + server.endpoint = "%s:%s" % (ip, + self.server_endpoints_port[i]) + server.rank = server_rank + server_rank += 1 + pod.servers.append(server) + for j in range(len(self.worker_endpoints_ips)): + if ip == self.worker_endpoints_ips[j]: + worker = Trainer() + worker.endpoint = "%s:%s" % (ip, + self.worker_endpoints_port[j]) + worker.rank = worker_rank + worker_rank += 1 + pod.workers.append(worker) + for k in range(len(self.heter_worker_endpoints_ips)): + if ip == self.heter_worker_endpoints_ips[k]: + heter_worker = Trainer() + heter_worker.endpoint = "%s:%s" % ( + ip, self.heter_worker_endpoints_port[k]) + heter_worker.rank = heter_worker_rank + heter_worker_rank += 1 + pod.heter_workers.append(heter_worker) + + cluster.pods.append(pod) + + pod = cluster.pods[self.node_rank] + self.gloo_rendezvous_dir = tempfile.mkdtemp() + + # 3. subproces start + self.procs = {"worker": [], "server": [], "heter_worker": []} + self.cmds = {"worker": [], "server": [], "heter_worker": []} + self.log_fns = {"worker": [], "server": [], "heter_worker": []} + + self.start_pod_server(self.args, pod) + self.start_pod_worker(self.args, pod) + self.start_pod_heter_worker(self.args, pod) + + logger.info( + "Please check servers, workers and heter_worker logs in {}/workerlog.*, {}/serverlog.* and {}/heterlog.*". + format(self.args.log_dir, self.args.log_dir, self.args.log_dir)) + + # 4. wait for finish training + if len(self.procs["worker"]) > 0: + # if node has worker procs + # only wait worker to finish here + for i, proc in enumerate(self.procs["worker"]): + self.procs["worker"][i].proc.wait() + if len(self.log_fns["worker"]) > 0: + self.log_fns["worker"][i].close() + logger.info( + "all workers exit, going to finish parameter server and heter_worker." + ) + if len(self.procs["heter_worker"]) > 0: + for i, proc in enumerate(self.procs["heter_worker"]): + self.log_fns["heter_worker"][i].close() + self.procs["heter_worker"][i].proc.terminate() + logger.info("all heter_worker are killed") + + if len(self.procs["server"]) > 0: + for i, proc in enumerate(self.procs["server"]): + self.log_fns["server"][i].close() + self.procs["server"][i].proc.terminate() + logger.info("all parameter server are killed") + + else: + # if node has not worker procs + # blocking training process + if len(self.procs["server"]) > 0: + for i, proc in enumerate(self.procs["server"]): + self.procs["server"][i].proc.wait() + + if len(self.procs["heter_worker"]) > 0: + for i, proc in enumerate(self.procs["heter_worker"]): + self.procs["heter_worker"][i].proc.wait() + + if os.path.exists(self.gloo_rendezvous_dir): + shutil.rmtree(self.gloo_rendezvous_dir) + + def start_pod_server(self, args, pod): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + for idx, cur_server in enumerate(pod.servers): + proc_env = { + "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, + "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, + "PADDLE_HETER_TRAINER_IP_PORT_LIST": + self.heter_worker_endpoints, + "PADDLE_PORT": cur_server.endpoint.split(":")[1], + "TRAINING_ROLE": "PSERVER", + "PADDLE_TRAINERS_NUM": str(self.worker_num), + "POD_IP": cur_server.endpoint.split(":")[0], + "PADDLE_WITH_GLOO": "1", + "PADDLE_GLOO_RENDEZVOUS": "2", + "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir + } + current_env.update(proc_env) + + cmd = [sys.executable, "-u", args.training_script + ] + args.training_script_args + self.cmds["server"].append(cmd) + + if idx == 0: + logger.info( + "Local server start {} processes. First process distributed " + "environment info (Only For Debug): {}".format( + len(pod.servers), + pretty_print_envs(proc_env, ("Distributed Envs", "Value" + )))) + + if args.log_dir is not None: + os.system("mkdir -p {}".format(args.log_dir)) + fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w") + self.log_fns["server"].append(fn) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = cur_server.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = fn.tell() if fn else None + tp.cmd = cmd + + self.procs["server"].append(tp) + + def start_pod_worker(self, args, pod): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + heter_device_num = 0 + device_list = [] + if fluid.core.is_compiled_with_cuda(): + device_list = get_gpus(args.gpus) + heter_device_num = len(device_list) + elif fluid.core.is_compiled_with_xpu(): + heter_device_num = fluid.core.get_xpu_device_count() + device_list = [str(x) for x in range(0, heter_device_num)] + + for idx, cur_worker in enumerate(pod.workers): + device_id = str(device_list[idx % heter_device_num]) + proc_env = { + "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, + "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, + "PADDLE_TRAINERS_NUM": str(self.worker_num), + "PADDLE_HETER_TRAINER_IP_PORT_LIST": + self.heter_worker_endpoints, + "TRAINING_ROLE": "TRAINER", + "PADDLE_TRAINER_ID": str(cur_worker.rank), + "PADDLE_WITH_GLOO": "1", + "PADDLE_GLOO_RENDEZVOUS": "2", + "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, + "FLAGS_selected_gpus": "0", + "FLAGS_selected_xpus": "0", + "CUDA_VISIBLE_DEVICES": device_id, + "XPU_VISIBLE_DEVICES": device_id, + } + current_env.update(proc_env) + + cmd = [sys.executable, "-u", args.training_script + ] + args.training_script_args + self.cmds["worker"].append(cmd) + + if idx == 0: + logger.info( + "Local worker start {} processes. First process distributed " + "environment info (Only For Debug): {}".format( + len(pod.workers), + pretty_print_envs(proc_env, ("Distributed Envs", "Value" + )))) + + if args.log_dir is not None: + os.system("mkdir -p {}".format(args.log_dir)) + fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w") + self.log_fns["worker"].append(fn) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = cur_worker.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = fn.tell() if fn else None + tp.cmd = cmd + + self.procs["worker"].append(tp) + + def start_pod_heter_worker(self, args, pod): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + heter_device_num = 0 + device_list = [] + if fluid.core.is_compiled_with_cuda(): + device_list = get_gpus(args.gpus) + heter_device_num = len(device_list) + elif fluid.core.is_compiled_with_xpu(): + heter_device_num = fluid.core.get_xpu_device_count() + device_list = [str(x) for x in range(0, heter_device_num)] + assert heter_device_num != 0 + + for idx, cur_heter_worker in enumerate(pod.heter_workers): + device_id = str(device_list[idx % heter_device_num]) + proc_env = { + "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, + "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, + "PADDLE_HETER_TRAINER_IP_PORT_LIST": + self.heter_worker_endpoints, + "PADDLE_PORT": cur_heter_worker.endpoint.split(":")[1], + "TRAINING_ROLE": "HETER_TRAINER", + "PADDLE_TRAINERS_NUM": str(self.worker_num), + "POD_IP": cur_heter_worker.endpoint.split(":")[0], + "PADDLE_WITH_GLOO": "1", + "PADDLE_GLOO_RENDEZVOUS": "2", + "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, + "FLAGS_selected_gpus": "0", + "FLAGS_selected_xpus": "0", + "CUDA_VISIBLE_DEVICES": device_id, + "XPU_VISIBLE_DEVICES": device_id, + } + current_env.update(proc_env) + + cmd = [sys.executable, "-u", args.training_script + ] + args.training_script_args + self.cmds["heter_worker"].append(cmd) + + if idx == 0: + logger.info( + "Local heter_worker start {} processes. First process distributed " + "environment info (Only For Debug): {}".format( + len(pod.heter_workers), + pretty_print_envs(proc_env, ("Distributed Envs", "Value" + )))) + + if args.log_dir is not None: + os.system("mkdir -p {}".format(args.log_dir)) + fn = open("%s/heterlog.%d" % (args.log_dir, idx), "w") + self.log_fns["heter_worker"].append(fn) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = cur_heter_worker.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = fn.tell() if fn else None + tp.cmd = cmd + + self.procs["heter_worker"].append(tp) diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py index 38ad41f8836b4e8c3b304dbf539b47d5293a8221..83345cb6f623e42b06c0ab1dacac2bbce8d7f27b 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py @@ -74,6 +74,8 @@ class ParameterServerOptimizer(MetaOptimizerBase): _startup = worker.delet_extra_optimizes_pass(_startup, compiled_config) + compiled_config.set_origin_ps_main_program(_main) + compiled_config.set_origin_ps_startup_program(_startup) # for heter program if self.role_maker._is_heter_parameter_server_mode: from paddle.fluid.incubate.fleet.parameter_server.ir import heter_trainer_pass as heter_worker @@ -91,6 +93,8 @@ class ParameterServerOptimizer(MetaOptimizerBase): else: _main = worker.append_send_ops_pass(_main, compiled_config) _startup = _startup + compiled_config.set_origin_ps_main_program(_main) + compiled_config.set_origin_ps_startup_program(_startup) return _main, _startup diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 42be7e869d9a7c6394152167ac2cbce9b0986de0..266c7d0f405bfd4d1cd24fcf523d94819db4cc47 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -210,18 +210,23 @@ class ParameterServerRuntime(RuntimeBase): warnings.warn("communicator has been initialized, skip") def _get_executor(self): - if self.role_maker._is_heter_worker(): - if self.role_maker._get_heter_worker_device() == "GPU": - gpu_id = int(os.getenv("FLAGS_selected_gpus", "0")) - executor = Executor(fluid.CUDAPlace(gpu_id)) - elif self.role_maker._get_heter_worker_device() == "XPU": - xpu_id = int(os.getenv("FLAGS_selected_xpus", "0")) - executor = Executor(fluid.XPUPlace(xpu_id)) - else: - raise ValueError("Not Support Device {}".format( - self.role_maker._get_heter_worker_device())) - else: - executor = fluid.Executor(fluid.CPUPlace()) + executor = fluid.Executor(fluid.CPUPlace()) + if self.role_maker._is_heter_parameter_server_mode: + heter_worker_device_guard = self.context[ + "valid_strategy"].a_sync_configs[ + "heter_worker_device_guard"].upper() + if heter_worker_device_guard not in ["GPU", "XPU", "CPU"]: + raise ValueError("Heter Worker Not Support Device {}".format( + heter_worker_device_guard)) + if self.role_maker._is_heter_worker(): + if heter_worker_device_guard == "GPU": + executor = Executor( + fluid.CUDAPlace( + int(os.getenv("FLAGS_selected_gpus", "0")))) + elif heter_worker_device_guard == "XPU": + executor = Executor( + fluid.XPUPlace( + int(os.getenv("FLAGS_selected_xpus", "0")))) return executor def _init_server(self, *args, **kwargs): @@ -233,12 +238,14 @@ class ParameterServerRuntime(RuntimeBase): model_dirname = None executor = self._get_executor() + if self.role_maker._is_heter_worker() and self.context[ + "valid_strategy"].a_sync_configs["launch_barrier"]: + # for heter trainer wait server ready + wait_server_ready(self.role_maker._get_pserver_endpoints()) executor.run(fluid.default_startup_program()) if self.role_maker._is_heter_worker(): self._init_worker() - - if self.role_maker._is_heter_worker(): return if not model_dirname: @@ -470,13 +477,13 @@ class ParameterServerRuntime(RuntimeBase): def _save_distributed_persistables(self, executor, dirname, main_program): dense_ctx = self.compiled_strategy.get_communicator_recv_context( - recv_type=1) + recv_type=1, use_origin_program=True) sparse_ctx = self.compiled_strategy.get_communicator_recv_context( - recv_type=2) + recv_type=2, use_origin_program=True) distributed_ctx = self.compiled_strategy.get_communicator_recv_context( - recv_type=3) + recv_type=3, use_origin_program=True) recv_dense_varnames = self._save_dense_params(executor, dirname, dense_ctx, main_program) @@ -528,7 +535,7 @@ class ParameterServerRuntime(RuntimeBase): ) if main_program is None: - main_program = fluid.default_main_program() + main_program = self.compiled_strategy.get_origin_ps_main_program() if isinstance(main_program, CompiledProgram): raise TypeError( diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index e348c67ae0461674358fa6d34ee8a73648862a6d..90847382c86e1c0bfd2cd9fae33342cbdb38e5ce 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -133,6 +133,8 @@ class CompileTimeStrategy(object): self.origin_main_program = main_program self.origin_startup_program = startup_program + self.origin_ps_main_program = main_program + self.origin_ps_startup_program = startup_program self.strategy = strategy self.role_maker = role_maker @@ -153,6 +155,11 @@ class CompileTimeStrategy(object): self._build_var_distributed() + # for heter-ps save variables + self.origin_merged_variables_pairs = list(self.merged_variables_pairs) + self.origin_merged_dense_pairs = list(self.merged_dense_pairs) + self.origin_merged_sparse_pairs = list(self.merged_sparse_pairs) + def get_distributed_mode(self): trainer = self.strategy.get_trainer_runtime_config() return trainer.mode @@ -214,6 +221,18 @@ class CompileTimeStrategy(object): def get_origin_startup_program(self): return self.origin_startup_program + def set_origin_ps_main_program(self, program): + self.origin_ps_main_program = program + + def set_origin_ps_startup_program(self, program): + self.origin_ps_startup_program = program + + def get_origin_ps_main_program(self): + return self.origin_ps_main_program + + def get_origin_ps_startup_program(self): + return self.origin_ps_startup_program + def get_sparse_varname_on_ps(self, is_distributed, endpoint=None): if not endpoint: endpoint = self.get_ps_endpoint() @@ -378,7 +397,9 @@ class CompileTimeStrategy(object): send_ctx[name] = ctx return send_ctx - def get_communicator_recv_context(self, recv_type=1): + def get_communicator_recv_context(self, + recv_type=1, + use_origin_program=False): # recv_type # 1 : DENSE 2. SPARSE 3. DISTRIBUTED 4. ALL distibuted_varnames = get_sparse_tablenames(self.origin_main_program, @@ -392,7 +413,8 @@ class CompileTimeStrategy(object): sparse_recv_ctx = {} distributed_recv_ctx = {} - for merged in self.merged_variables_pairs: + variables_pairs = self.merged_variables_pairs if not use_origin_program else self.origin_merged_variables_pairs + for merged in variables_pairs: params = merged[0] if params.merged_var.name in sparse_varnames: continue diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index fefaecd3b8979b47cf7c0c4f7aa058e9ffcaae42..7fc66e8e8496157ca6e74522ec744d606c043d63 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -169,6 +169,10 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): except fluid.core.EOFException: self.reader.reset() + if fleet.is_first_worker(): + model_path = tempfile.mkdtemp() + fleet.save_persistables(executor=exe, dirname=model_path) + shutil.rmtree(model_path) fleet.stop_worker() def do_dataset_training(self, fleet): diff --git a/python/paddle/fluid/tests/unittests/fleet_ps_training.py b/python/paddle/fluid/tests/unittests/fleet_ps_training.py index a9e9675a611601d612219b6b8971d0df9fc21de6..65fa1ef935ef1e7a6082e52d8cbfdbe7745114ab 100644 --- a/python/paddle/fluid/tests/unittests/fleet_ps_training.py +++ b/python/paddle/fluid/tests/unittests/fleet_ps_training.py @@ -20,8 +20,12 @@ from paddle.fluid.incubate.fleet.base import role_maker input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') +input_y = fluid.layers.cast(input_y, dtype="float32") + +with fluid.device_guard("gpu"): + input_y = fluid.layers.cast(input_y, dtype="int64") + cost = mlp(input_x, input_y) -cost = mlp(input_x, input_y) optimizer = fluid.optimizer.Adagrad(learning_rate=0.01) role = role_maker.PaddleCloudRoleMaker() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py index 6c5a1d6e36c2549d5e3549f81f26d5ffcca3a247..071b68bf9e856df3b27bc1c1e23369320ffc59ce 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_base.py @@ -288,7 +288,7 @@ class TestFleetHeterBase(unittest.TestCase): print("tr end communicate") tr0_ret = tr0.returncode - tr1_ret = tr0.returncode + tr1_ret = tr1.returncode # close trainer file tr0_pipe.close() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_program.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_program.py index 7f4e5d99e02084f363b71dbb73b80e59d704aa15..eed8d5f1a496ead4712cf792dec879612d167825 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_program.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_heter_program.py @@ -50,6 +50,10 @@ class TestDistFleetHeterProgram(unittest.TestCase): def build_strategy(self): self.strategy = paddle.distributed.fleet.DistributedStrategy() self.strategy.a_sync = True + self.strategy.a_sync_configs = { + "launch_barrier": False, + "heter_worker_device_guard": "gpu" + } return self.strategy def build_input(self): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh index e717962ead2e2da30092b12379bf36f368e8a735..4cd8dc3d945e16bb43ebcef627b98b3531556a7e 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh @@ -28,13 +28,27 @@ function test_launch_ps(){ fi } +function test_launch_ps_heter(){ + fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 fleet_ps_training.py 2> ut.elog + if grep -q "server are killed" ut.elog; then + echo "test heter pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi +} + if [[ ${WITH_GPU} == "OFF" ]]; then + echo "in cpu test mode" test_launch_ps exit 0 fi +echo "No.1 unittest" test_launch_ps +test_launch_ps_heter # use default values +echo "No.2 unittest" fleetrun multi_process.py fleetrun # use paddlecloud @@ -48,6 +62,7 @@ export PADDLE_TRAINER_ID=0 export PADDLE_PORT=35789 export TRAINER_PORTS_NUM=2 +echo "No.3 unittest" distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog" CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun @@ -83,7 +98,7 @@ fi unset PADDLE_PORT export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 -echo "" +echo "No.4 unittest" echo "paddle.distributed.launch async poll process test" if ! CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun abort; then echo "train abort as planned" @@ -112,5 +127,6 @@ rm -rf $file_0_0 $file_0_1 distributed_args="--gpus=0,1 --log_dir=testlog" export PADDLE_LAUNCH_LOG="test_launch_filelock_0" +echo "No.5 unittest" CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} find_ports.py str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071" diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_init.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_init.py new file mode 100644 index 0000000000000000000000000000000000000000..9f8ee1b46e827be2d679884b8436819126971bb8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_init.py @@ -0,0 +1,149 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test cloud role maker.""" + +from __future__ import print_function +import os +import platform +import shutil +import tempfile +import unittest +import paddle +import paddle.distributed.fleet.base.role_maker as role_maker + + +class TestPSCloudRoleMakerCase1(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + + def test_paddle_trainers_num(self): + # PADDLE_TRAINERS_NUM + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +class TestPSCloudRoleMakerCase2(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + os.environ["PADDLE_TRAINERS_NUM"] = str(2) + + def test_training_role(self): + # TRAINING_ROLE + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +class TestPSCloudRoleMakerCase3(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + os.environ["PADDLE_TRAINERS_NUM"] = str(2) + os.environ["TRAINING_ROLE"] = 'TRAINER' + + def test_trainer_id(self): + # PADDLE_TRAINER_ID + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +class TestPSCloudRoleMakerCase4(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + os.environ["PADDLE_TRAINERS_NUM"] = str(2) + os.environ["TRAINING_ROLE"] = 'PSERVER' + + def test_ps_port(self): + # PADDLE_PORT + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +class TestPSCloudRoleMakerCase5(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + os.environ["PADDLE_TRAINERS_NUM"] = str(2) + os.environ["TRAINING_ROLE"] = 'PSERVER' + os.environ["PADDLE_PORT"] = str(4001) + + def test_ps_ip(self): + # POD_IP + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +class TestPSCloudRoleMakerCase6(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + os.environ[ + "PADDLE_HETER_TRAINER_IP_PORT_LIST"] = "127.0.0.1:4003,127.0.0.1:4004" + os.environ["PADDLE_TRAINERS_NUM"] = str(2) + os.environ["TRAINING_ROLE"] = 'HETER_TRAINER' + + def test_heter_port(self): + # PADDLE_PORT + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +class TestPSCloudRoleMakerCase7(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMake Parameter Server. + """ + + def setUp(self): + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:4001,127.0.0.1:4002" + os.environ[ + "PADDLE_HETER_TRAINER_IP_PORT_LIST"] = "127.0.0.1:4003,127.0.0.1:4004" + os.environ["PADDLE_TRAINERS_NUM"] = str(2) + os.environ["TRAINING_ROLE"] = 'HETER_TRAINER' + os.environ["PADDLE_PORT"] = str(4003) + + def test_heter_ip(self): + # POD_IP + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertRaises(ValueError, ro._generate_role) + + +if __name__ == "__main__": + unittest.main()