From 506e79d1833066f5ce72c478383b352eb5d3e1d7 Mon Sep 17 00:00:00 2001 From: Yulong Ao Date: Tue, 7 Dec 2021 12:13:14 +0800 Subject: [PATCH] [Auto para] Relaunch with auto mapping function (#37326) * [Auto Parallel] Add the unified cluster representation * [Auto Parallel] Add the graph class for physical mapping * [Auto Parallel] Add the simple physical mapper * Set the timeout of the mapper * Merge the upstream develop unittests cmake files * Fix a bug of the process group * Remove mapper unittest from platforms which is not GPU * Move the instantiation of process group after resharding * Add the local id for devices * Update the rank mapping format * [Auto Parallel] Relaunch with the rank mapping file * Remove the unnecessary json file * Avoid entering get_device_proc_info for auto mapping * Correct the mapper unit test * Add some comments * Remove the related files about mapping * Update the unittest for auto mapping * Remove unused rank_mapping unittest * Improve the unittest coverage * Improve the unittest coverage * Improve the unittest of relaunch * Fix the unittest problem in CI * Improve the unittest of relaunch * Remove unnecessary statements * Update the unittest cmakefile * Correct the cmakefile of auto parallel unittests * Modify codes based on the new elastic change * Use the GPUs exclusively in the unittest * Correct the cmakefile * Set the timeout of the unittest --- .../distributed/auto_parallel/mapper.py | 14 +- .../distributed/auto_parallel/parallelizer.py | 158 +++++++++---- .../auto_parallel/process_group.py | 12 +- python/paddle/distributed/fleet/launch.py | 73 ++++-- .../paddle/distributed/fleet/launch_utils.py | 215 +++++++++++++++--- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../unittests/auto_parallel/CMakeLists.txt | 6 + .../auto_parallel_relaunch_model.py | 162 +++++++++++++ .../tests/unittests/auto_parallel/launch.py | 22 ++ .../test_auto_parallel_relaunch.py | 118 ++++++++++ .../unittests/test_auto_parallel_mapper.py | 3 + 11 files changed, 675 insertions(+), 110 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/auto_parallel_relaunch_model.py create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/launch.py create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py diff --git a/python/paddle/distributed/auto_parallel/mapper.py b/python/paddle/distributed/auto_parallel/mapper.py index f015cf44771..543fa2d9681 100644 --- a/python/paddle/distributed/auto_parallel/mapper.py +++ b/python/paddle/distributed/auto_parallel/mapper.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License +import os import operator import functools import json @@ -175,9 +176,19 @@ def build_process_graph(distributed_program): def build_cluster_graph(cluster): graph = Graph() + cuda_visible_devices_env = os.getenv("CUDA_VISIBLE_DEVICES") + cuda_visible_devices = [] + if cuda_visible_devices_env is not None and cuda_visible_devices_env != "": + cuda_visible_devices = [ + int(d.strip()) for d in cuda_visible_devices_env.split(",") + ] for machine in cluster.machines.values(): for device in machine.devices.values(): graph.add_node(device.global_id, device=device) + if cuda_visible_devices and device.local_id not in cuda_visible_devices: + graph.nodes[device.global_id]["occupied"] = True + else: + graph.nodes[device.global_id]["occupied"] = False for link in machine.links.values(): graph.add_edge( link.source.global_id, link.target.global_id, link=link) @@ -195,9 +206,6 @@ def mapping(distributed_program, cluster): for cur_rank_node in process_graph: cur_rank_node["visited"] = False - for cur_device_node in cluster_graph: - cur_device_node["occupied"] = False - def sort_by_comm_volume(rank_edge): return rank_edge["comm_requirements"]["comm_volume"] diff --git a/python/paddle/distributed/auto_parallel/parallelizer.py b/python/paddle/distributed/auto_parallel/parallelizer.py index 14556ff6ef4..affb27317da 100644 --- a/python/paddle/distributed/auto_parallel/parallelizer.py +++ b/python/paddle/distributed/auto_parallel/parallelizer.py @@ -12,6 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import sys +import json +import shlex +import copy +import pathlib +import subprocess import logging import paddle from paddle.distributed.utils import get_logger @@ -23,9 +30,12 @@ from .dist_context import set_default_distributed_context from .completion import complete_annotation, complete_backward_annotation from .partitioner import Partitioner from .process_group import get_all_process_groups +from .process_group import get_world_process_groups from .utils import make_data_unshard from .utils import set_grad_var_shape from .reshard import reshard +from .cluster import Cluster +from .mapper import mapping # from .auto_search import auto_search _logger = get_logger(logging.INFO) @@ -46,6 +56,21 @@ class AutoParallelizer: self._optimizer = self._fleet.user_defined_optimizer self._dist_strategy = self._fleet._user_defined_strategy self._dist_context = DistributedContext() + self._cluster = None + self._cluster_topo_path = os.getenv("PADDLE_CLUSTER_TOPO_PATH", None) + if self._cluster_topo_path is not None: + self._cluster = Cluster() + self._cluster.build_from_file(self._cluster_topo_path) + # Prepare information for auto mapping + self._rank_mapping_path = os.getenv("PADDLE_RANK_MAPPING_PATH", None) + enable_auto_mapping_env = os.getenv("PADDLE_ENABLE_AUTO_MAPPING", None) + if enable_auto_mapping_env is None: + self._enable_auto_mapping = False + else: + self._enable_auto_mapping = True + self._need_rank_mapping = os.getenv("PADDLE_NEED_RANK_MAPPING") + self._need_rank_mapping = True if self._need_rank_mapping and \ + self._need_rank_mapping.lower() == 'true' else False def _remove_distributed_attrs(self, main_program): suffix = core.kAutoParallelSuffix() @@ -57,60 +82,103 @@ class AutoParallelizer: if suffix in attr_name: op._remove_attr(attr_name) + def _get_dist_program(self, dist_context, rank): + # Annotation completion + completed_main_program = complete_annotation(self._main_program, + dist_context) + # Logical partition + partitioner = Partitioner(self._dist_strategy, dist_context, rank) + dist_main_prog, dist_startup_prog = partitioner.transpile_forward( + completed_main_program, self._startup_program) + dist_params_grads = partitioner.apply_backward( + self._loss, completed_main_program, self._startup_program, + dist_main_prog, dist_startup_prog) + dist_optimize_ops = partitioner.apply_optimize( + copy.deepcopy(self._optimizer), dist_params_grads, dist_main_prog, + dist_startup_prog) + + make_data_unshard(dist_main_prog, dist_startup_prog, dist_context) + + reshard(dist_main_prog, dist_startup_prog, rank, dist_context) + + return dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog + def parallelize(self, loss, startup_program, parameter_list=None, no_grad_set=None): assert startup_program is not None - main_program = loss.block.program - - if self._dist_strategy.auto_search: - # auto search - _logger.info("Start search dist attr.") - # self._dist_context, _ = auto_search(main_program, startup_program, - # loss, self._optimizer) - # completed_main_program = main_program - raise NotImplementedError("Auto search has not implemented") - else: - # Annotation completion - _logger.info("Start annotation dist attr.") - completed_main_program = complete_annotation(main_program, - self._dist_context) - - # Logical partition - rank = paddle.distributed.get_rank() - partitioner = Partitioner(self._dist_strategy, self._dist_context, rank) - partitioned_main_prog, partitioned_startup_prog = partitioner.transpile_forward( - completed_main_program, startup_program) - dist_params_grads = partitioner.apply_backward( - loss, completed_main_program, startup_program, - partitioned_main_prog, partitioned_startup_prog) - dist_optimize_ops = partitioner.apply_optimize( - self._optimizer, dist_params_grads, partitioned_main_prog, - partitioned_startup_prog) + self._loss = loss + self._startup_program = startup_program + self._main_program = loss.block.program + self._parameter_list = parameter_list + self._no_grad_set = no_grad_set + + if self._enable_auto_mapping and self._need_rank_mapping: + # Do the mapping pass before parallelization + assert self._cluster is not None, \ + "The cluster must not be none when using auto mapping." + dist_programs = {} + world_process_group = get_world_process_groups() + for rank in world_process_group.ranks: + dist_context = DistributedContext() + dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog = self._get_dist_program( + dist_context, rank) + dist_programs[rank] = dist_main_prog + + # Do the mapping between the distributed program graph and the cluster graph + rank_mapping_dict = mapping(dist_programs, self._cluster) + rank_mapping = list(rank_mapping_dict.values()) - # set the grad var shape - set_grad_var_shape(partitioned_main_prog, self._dist_context) + # Relaunch the training by using the rank mapping file + with open(self._rank_mapping_path, "w") as rank_mapping_file: + json.dump(rank_mapping, rank_mapping_file) + + enable_elastic = os.getenv("PADDLE_ENABLE_ELASTIC") + enable_elastic = True if enable_elastic and enable_elastic.lower( + ) == 'true' else False + if enable_elastic: + print("Auto mapping finished, now do elastic re-launch") + sys.exit(paddle.distributed.fleet.elastic.manager. + ELASTIC_AUTO_PARALLEL_EXIT_CODE) + + original_cmd_args = os.getenv("PADDLE_ORIGINAL_CMD_ARGS") + rank_mapping_args = " ".join( + ["--rank_mapping_path", self._rank_mapping_path]) + if os.environ.get("WITH_COVERAGE", "OFF") == "ON": + coverage_args = ["-m", "coverage", "run", "--branch", "-p"] + else: + coverage_args = [] + new_cmd_args = "-m paddle.distributed.fleet.launch" + " " + rank_mapping_args + " " + original_cmd_args + new_cmd = [sys.executable, "-u"] + coverage_args + shlex.split( + new_cmd_args) + new_process = subprocess.Popen(new_cmd) + new_process.wait() + assert new_process.returncode == 0, \ + "Launch failed with rank mapping" + print("Successfully do the second launch for auto mapping!") + sys.exit(0) + else: + # Parallelization after the mapping pass + rank = paddle.distributed.get_rank() - # The last step: remove all distributed attributes to be compatiable - # with inference. - self._remove_distributed_attrs(partitioned_main_prog) - make_data_unshard(partitioned_main_prog, partitioned_startup_prog, - self._dist_context) + dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog = self._get_dist_program( + self._dist_context, rank) - reshard(partitioned_main_prog, partitioned_startup_prog, rank, - self._dist_context) + # Traverse different rank programs and traverse each op of them, + # instantiate communication by process_mapping. + all_process_groups = get_all_process_groups() + for process_group in all_process_groups: + if rank not in process_group.ranks: + continue + process_group.instantiate() - # Traverse different rank programs and traverse each op of them, - # instantiate communication by process_mapping. - all_process_groups = get_all_process_groups() - for process_group in all_process_groups: - if rank not in process_group.ranks: - continue - process_group.instantiate() + # Copy distributed info to the default context + set_default_distributed_context(self._dist_context) - # Copy distributed info to the default context - set_default_distributed_context(self._dist_context) + # The last step: remove all distributed attributes to be compatible + # with inference. + self._remove_distributed_attrs(dist_main_prog) - return dist_optimize_ops, dist_params_grads, partitioned_startup_prog, partitioned_main_prog + return dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog diff --git a/python/paddle/distributed/auto_parallel/process_group.py b/python/paddle/distributed/auto_parallel/process_group.py index 70a19f6c538..2e4d370b394 100644 --- a/python/paddle/distributed/auto_parallel/process_group.py +++ b/python/paddle/distributed/auto_parallel/process_group.py @@ -19,10 +19,6 @@ from ..collective import _new_ring_id from ...fluid.framework import in_dygraph_mode from ...fluid.layers.tensor import fill_constant -# Note that Process group 0 is reserved for representing all ranks. -# At the begining, group 0 is empty and new ranks will be added automatically. -_g_process_group_map = {} - def get_all_process_groups(): global _g_process_group_map @@ -34,6 +30,11 @@ def get_process_group(group_id): return _g_process_group_map.get(group_id, None) +def get_world_process_groups(): + global _g_process_group_map + return _g_process_group_map[0] + + def new_process_group(ranks): global _g_process_group_map # A key constructed from ranks is used for avoiding duplication @@ -151,4 +152,7 @@ class ProcessGroup: return string +# Note that Process group 0 is reserved for representing all ranks. +# At the begining, group 0 is empty and new ranks will be added automatically. +_g_process_group_map = {} _g_process_group_map[0] = ProcessGroup(0, []) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 708ba281607..19306d3da99 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -175,25 +175,17 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra default="127.0.0.1", help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") collective_group.add_argument( - "--rank_mapping_file", - type=argparse.FileType('r'), - default=sys.stdin, - help="This rank mapping information in json format is used specifically " - "for lazy launch for auto parallel. Some of the ranks in each node " - "may not be used, and the indices of rank should be kept the same " - "as the indices of sub-task splited by auto parallel. " - " { " - " \"ip_ranks\": [ " - " { " - " \"ip\": \"127.0.0.1\", " - " \"ranks\": [0,1] " - " }, " - " { " - " \"ip\": \"127.0.0.2\", " - " \"ranks\": [2,3,4] " - " } " - " ] " - " } ") + "--cluster_topo_path", + type=str, + default=None, + help="A json format file will be stored in this path which is used" + "to represent the cluster topology information for auto parallel.") + collective_group.add_argument( + "--rank_mapping_path", + type=str, + default=None, + help="A json format file will be stored in this path which is used" + "to map processes to machines for auto parallel.") collective_group.add_argument( "--enable_auto_mapping", type=bool, @@ -297,20 +289,56 @@ def cpuonly_check(args): def get_cluster_info(args): # parse arguments, used for cloud-single-machine and local if args.backend == 'gloo': cpuonly_check(args) - (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) + if args.enable_auto_mapping: + (device_mode, devices_per_proc) = (DeviceMode.GPU, []) + else: + (device_mode, + devices_per_proc) = launch_utils.get_device_proc_info(args) trainers_num = cloud_utils.get_trainers_num() logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( trainers_num, device_mode, devices_per_proc)) + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + cluster = None pod = None start_port = 6170 if os.environ.get('FLAGS_START_PORT') is not None: start_port = os.environ.get('FLAGS_START_PORT') - # lazy launch for auto-parallel + # auto mapping between processes and devices for auto-parallel if args.enable_auto_mapping == True: - cluster, pod = get_mapped_cluster_from_args(args, device_mode) + assert args.cluster_topo_path is not None, \ + "The cluster topology must be provied when enabling auto mapping." + rank_mapping_path = args.rank_mapping_path or os.getenv( + "PADDLE_RANK_MAPPING_PATH") + if not rank_mapping_path: + os.environ["PADDLE_NEED_RANK_MAPPING"] = str(True) + os.environ["PADDLE_ENABLE_ELASTIC"] = str( + enable_elastic(args, device_mode)) + cwd = pathlib.Path().resolve() + rank_mapping_path = os.path.join(cwd, + "auto_parallel_rank_mapping.json") + os.environ["PADDLE_RANK_MAPPING_PATH"] = str(rank_mapping_path) + + original_args = sys.argv[1:] + os.environ["PADDLE_ORIGINAL_CMD_ARGS"] = " ".join(original_args) + os.environ["PADDLE_CLUSTER_TOPO_PATH"] = str(args.cluster_topo_path) + os.environ["PADDLE_ENABLE_AUTO_MAPPING"] = str( + args.enable_auto_mapping) + cluster, pod = launch_utils.get_mapped_cluster_from_args_without_rank_mapping( + args, device_mode) + else: + os.environ["PADDLE_NEED_RANK_MAPPING"] = str(False) + os.environ["PADDLE_ENABLE_ELASTIC"] = str( + enable_elastic(args, device_mode)) + + os.environ["PADDLE_CLUSTER_TOPO_PATH"] = str(args.cluster_topo_path) + os.environ["PADDLE_RANK_MAPPING_PATH"] = str(rank_mapping_path) + os.environ["PADDLE_ENABLE_AUTO_MAPPING"] = str( + args.enable_auto_mapping) + cluster, pod = launch_utils.get_mapped_cluster_from_args_with_rank_mapping( + args, device_mode) elif cloud_utils.use_paddlecloud() and trainers_num != 1: cluster, pod = cloud_utils.get_cloud_cluster( args.ips, device_mode, devices_per_proc, start_port) @@ -328,6 +356,7 @@ def get_cluster_info(args): logger.debug("get cluster from args:{}".format(cluster)) return cluster, pod + def get_global_envs(args, tmp_dir): global_envs = copy.copy(os.environ.copy()) # add gloo env diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 569f64c18bf..c20c209d601 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -511,6 +511,17 @@ def start_local_trainers(cluster, "PADDLE_WORLD_DEVICE_IDS": ",".join(res), } + # The following three environnement variables are used for auto mapping + if current_env.get("PADDLE_CLUSTER_TOPO_PATH", None) is not None: + proc_env["PADDLE_CLUSTER_TOPO_PATH"] = current_env[ + "PADDLE_CLUSTER_TOPO_PATH"] + if current_env.get("PADDLE_RANK_MAPPING_PATH", None) is not None: + proc_env["PADDLE_RANK_MAPPING_PATH"] = current_env[ + "PADDLE_RANK_MAPPING_PATH"] + if current_env.get("PADDLE_ENABLE_AUTO_MAPPING", None) is not None: + proc_env["PADDLE_ENABLE_AUTO_MAPPING"] = current_env[ + "PADDLE_ENABLE_AUTO_MAPPING"] + if len(t.accelerators) > 0 and pod.device_mode == DeviceMode.GPU: proc_env["FLAGS_selected_gpus"] = "%s" % ",".join( [str(g) for g in t.accelerators]) @@ -531,7 +542,8 @@ def start_local_trainers(cluster, current_env.update(proc_env) coverage_args = [] - if run_with_coverage(): + if run_with_coverage() or os.environ.get("WITH_COVERAGE", + "OFF") == "ON": coverage_args = ["-m", "coverage", "run", "--branch", "-p"] cmd = [sys.executable, "-u"] + coverage_args + [training_script ] + training_script_args @@ -557,7 +569,11 @@ def start_local_trainers(cluster, with open("%s/endpoints.log" % log_dir, "w") as f: f.write("PADDLE_TRAINER_ENDPOINTS: \n") f.write("\n".join(cluster.trainers_endpoints())) - fn = open("%s/workerlog.%d" % (log_dir, idx), "a") + if current_env.get("PADDLE_ENABLE_AUTO_MAPPING") is not None \ + and current_env.get("PADDLE_NEED_RANK_MAPPING").lower() == "true": + fn = open("%s/prelaunchlog.%d" % (log_dir, idx), "a") + else: + fn = open("%s/workerlog.%d" % (log_dir, idx), "a") proc = subprocess.Popen( cmd, env=current_env, stdout=fn, stderr=fn, preexec_fn=pre_fn) else: @@ -876,8 +892,8 @@ def get_custom_endpoints(origin_endpoints, offset=0): # pretty_print_envs(environs))) -def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, - node_mapping_ranks): +def get_mapped_cluster_without_rank_mapping( + node_ips, node_ip, trainer_endpoints, device_mode, node_ranks): assert type(trainer_endpoints) is list, "trainer_endpoints must be list" assert device_mode == DeviceMode.GPU, \ "Only support get mapped cluster for gpu now." @@ -890,17 +906,121 @@ def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, cur_node_endpoints = trainer_endpoints[node_rank] # choose rank from global mapped ranks and set it to the trainer. - ranks_per_node = node_mapping_ranks[node_rank] + ranks_per_node = node_ranks[node_rank] + assert len(ranks_per_node) == 1 for i in range(len(ranks_per_node)): trainer = Trainer() - # change global rank(mapped) to local rank within each node. - # e.g. mapped ranks of node: 3,4,7 -> 0,1,2 - local_rank = ranks_per_node.index(ranks_per_node[i]) - trainer.accelerators.append(local_rank) trainer.endpoint = "%s" % (cur_node_endpoints[i]) - # global mapped ranks trainer.rank = ranks_per_node[i] + pod.trainers.append(trainer) + cluster.pods.append(pod) + + pod_rank = node_ips.index(node_ip) + return cluster, cluster.pods[pod_rank] + + +def get_mapped_cluster_from_args_without_rank_mapping(args, device_mode): + assert device_mode == DeviceMode.GPU, \ + "Only support get mapped cluster for gpu now." + gpus_num = fluid.core.get_cuda_device_count() + + # parse ip-ranks json file + cluster_topo = None + with open(args.cluster_topo_path, "r") as json_file: + cluster_topo = json.load(json_file) + + node_ips = [] + node_ranks = [] + for idx, cur_cluster_topo in enumerate(cluster_topo["machines"]): + node_ips.append(cur_cluster_topo['addr']) + node_ranks.append([idx]) + + if len(node_ips) == 1: + node_ip = node_ips[0] + else: + if args.host: + node_ip = args.host + else: + _, node_ip = get_host_name_ip() + + assert node_ip in node_ips, \ + "Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips) + node_rank = node_ips.index(node_ip) + + assert len(node_ranks) == len(node_ips), \ + "ranks length should be equal to ips length." + + logger.debug("parsed from args: node_ips:{} node_ip:{} " + "node_rank:{} node_ranks:{}".format( + node_ips, node_ip, node_rank, node_ranks[node_rank])) + + # NOTE: there are different number of global mapped ranks on each node. + free_ports = [] + trainer_endpoints = [] + for ip in node_ips: + node_rank = node_ips.index(ip) + if os.environ.get('PADDLE_PORT') is not None: + start_port = int(os.getenv("PADDLE_PORT", "")) + free_ports = [ + x + for x in range(start_port, start_port + len(node_ranks[ + node_rank])) + ] + elif os.environ.get('FLAGS_START_PORT') is not None: + start_port = int(os.environ.get('FLAGS_START_PORT')) + free_ports = [ + x + for x in range(start_port, start_port + len(node_ranks[ + node_rank])) + ] + else: + free_ports = find_free_ports(len(node_ranks[node_rank])) + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + + return get_mapped_cluster_without_rank_mapping( + node_ips, node_ip, trainer_endpoints, device_mode, node_ranks) + + +def get_mapped_cluster_with_rank_mapping(node_ips, node_ip, trainer_endpoints, + device_mode, node_ranks, + node_rank_mappings): + assert type(trainer_endpoints) is list, "trainer_endpoints must be list" + assert device_mode == DeviceMode.GPU, \ + "Only support get mapped cluster for gpu now." + + def get_relative_gpu_id(gpu_id): + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + return gpu_id + else: + cuda_visible_devices_list = cuda_visible_devices.split(',') + relative_id = cuda_visible_devices_list.index(str(gpu_id)) + logger.info( + "Change gpu id from {} to {} based on CUDA_VISIBLE_DEVICES {}". + format(gpu_id, relative_id, cuda_visible_devices_list)) + return relative_id + + cluster = Cluster(hdfs=None) + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + pod.device_mode = device_mode + cur_node_endpoints = trainer_endpoints[node_rank] + # choose rank from global mapped ranks and set it to the trainer. + ranks_per_node = node_ranks[node_rank] + cur_node_rank_mapping = node_rank_mappings[node_rank] + for i in range(len(ranks_per_node)): + trainer = Trainer() + local_device_ids = cur_node_rank_mapping["ranks"][str( + ranks_per_node[i])] + assert len(local_device_ids) == 1, \ + "Only support one process to one device mapping" + trainer.accelerators.append( + get_relative_gpu_id(local_device_ids[0])) + trainer.endpoint = "%s" % (cur_node_endpoints[i]) + trainer.rank = ranks_per_node[i] pod.trainers.append(trainer) cluster.pods.append(pod) @@ -908,22 +1028,31 @@ def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, return cluster, cluster.pods[pod_rank] -def get_mapped_cluster_from_args(args, device_mode): +def get_mapped_cluster_from_args_with_rank_mapping(args, device_mode): assert device_mode == DeviceMode.GPU, \ "Only support get mapped cluster for gpu now." gpus_num = fluid.core.get_cuda_device_count() # parse ip-ranks json file - json_data = None - with args.rank_mapping_file as json_file: - json_data = json.load(json_file) + rank_mapping_path = args.rank_mapping_path or os.getenv( + "PADDLE_RANK_MAPPING_PATH") + rank_mapping = None + with open(rank_mapping_path, "r") as json_file: + rank_mapping = json.load(json_file) + # reset PADDLE_RANK_MAPPING_PATH env + os.environ["PADDLE_RANK_MAPPING_PATH"] = "" node_ips = [] - node_ranks_mapping = [] - ip_ranks_list = json_data['ip_ranks'] - for ip_ranks in ip_ranks_list: - node_ips.append(ip_ranks['ip']) - node_ranks_mapping.append(ip_ranks['ranks']) + node_ranks = [] + node_rank_mappings = [] + for cur_rank_mapping in rank_mapping: + node_ips.append(cur_rank_mapping['addr']) + cur_node_rank_list = [ + int(i) for i in list(cur_rank_mapping['ranks'].keys()) + ] + cur_node_rank_list.sort() + node_ranks.append(cur_node_rank_list) + node_rank_mappings.append(cur_rank_mapping) if len(node_ips) == 1: node_ip = node_ips[0] @@ -937,31 +1066,41 @@ def get_mapped_cluster_from_args(args, device_mode): "Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips) node_rank = node_ips.index(node_ip) - assert len(node_ranks_mapping[node_rank]) <= gpus_num, \ + assert len(node_ranks[node_rank]) <= gpus_num, \ "number of ranks mapped to one node should not exceed the avaiable ones." - assert len(node_ranks_mapping) == len(node_ips), \ + assert len(node_ranks) == len(node_ips), \ "ranks length should be equal to ips length." logger.debug("parsed from args: node_ips:{} node_ip:{} " - "node_rank:{} node_ranks_mapping:{}".format( - node_ips, node_ip, node_rank, node_ranks_mapping[ - node_rank])) + "node_rank:{} node_ranks:{}".format( + node_ips, node_ip, node_rank, node_ranks[node_rank])) # NOTE: there are different number of global mapped ranks on each node. free_ports = [] trainer_endpoints = [] for ip in node_ips: node_rank = node_ips.index(ip) - if os.environ.get('FLAGS_START_PORT') is not None: + if os.environ.get('PADDLE_PORT') is not None: + start_port = int(os.getenv("PADDLE_PORT", "")) + free_ports = [ + x + for x in range(start_port, start_port + len(node_ranks[ + node_rank])) + ] + elif os.environ.get('FLAGS_START_PORT') is not None: start_port = int(os.environ.get('FLAGS_START_PORT')) - end_port = start_port + len(node_ranks_mapping[node_rank]) - free_ports = [x for x in range(start_port, end_port)] + free_ports = [ + x + for x in range(start_port, start_port + len(node_ranks[ + node_rank])) + ] else: - free_ports = find_free_ports(len(node_ranks_mapping[node_rank])) + free_ports = find_free_ports(len(node_ranks[node_rank])) trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) - return get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, - node_ranks_mapping) + return get_mapped_cluster_with_rank_mapping(node_ips, node_ip, + trainer_endpoints, device_mode, + node_ranks, node_rank_mappings) class ParameterServerLauncher(object): @@ -1229,14 +1368,18 @@ class ParameterServerLauncher(object): _, self.current_node_ip = get_host_name_ip() else: self.current_node_ip = pod_ip - assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ - % (self.current_node_ip, self.node_ips) - self.node_rank = self.node_ips.index(self.current_node_ip) - logger.debug( - "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}". - format(self.node_ips, self.current_node_ip, self.node_rank)) + if not self.distribute_mode == DistributeMode.PS_HETER: + assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ + % (self.current_node_ip, self.node_ips) + if self.current_node_ip in self.node_ips: + self.node_rank = self.node_ips.index(self.current_node_ip) + logger.debug( + "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}". + format(self.node_ips, self.current_node_ip, self.node_rank)) def start_ps(self): + if not self.current_node_ip in self.node_ips: + return cluster = Cluster(hdfs=None) server_rank = 0 worker_rank = 0 diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 15f857f6087..4162f697d27 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -589,6 +589,8 @@ set_tests_properties(test_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") if(WITH_DISTRIBUTE) add_subdirectory(distributed_passes) + add_subdirectory(auto_parallel) + # FIXME(typhoonzero): add these tests back list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transformer") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transpiler") diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt new file mode 100644 index 00000000000..4244fda0c51 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -0,0 +1,6 @@ +# file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") +# string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") +if(WITH_DISTRIBUTE AND WITH_GPU) + py_test_modules(test_auto_parallel_relaunch MODULES test_auto_parallel_relaunch ENVS ${dist_ENVS}) + set_tests_properties(test_auto_parallel_relaunch PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) +endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/auto_parallel_relaunch_model.py b/python/paddle/fluid/tests/unittests/auto_parallel/auto_parallel_relaunch_model.py new file mode 100644 index 00000000000..8e5221ed5ff --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/auto_parallel_relaunch_model.py @@ -0,0 +1,162 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import time +import paddle.fluid as fluid +import copy +import os +import numpy as np +import subprocess +import paddle +import paddle.nn as nn +import paddle.fluid as fluid +import paddle.static as static +import paddle.nn.functional as F +import paddle.utils as utils +from paddle.fluid import layers +from paddle.io import IterableDataset, DataLoader +from paddle.distributed import fleet +import paddle.distributed.auto_parallel as auto + +paddle.enable_static() +_global_parallel_strategy = None +_global_process_mesh = None +batch_size = 4 +hidden_size = 1024 +sequence_len = 512 + + +def get_random_inputs_and_labels(input_shape, label_shape): + input = np.random.random(size=input_shape).astype('float32') + label = np.random.random(size=label_shape).astype('float32') + return input, label + + +def batch_generator_creator(): + def __reader__(): + for _ in range(batch_size): + batch_input, batch_label = get_random_inputs_and_labels( + [batch_size, sequence_len, hidden_size], + [batch_size, sequence_len, 1]) + yield batch_input, batch_label + + return __reader__ + + +class MLPLayer(nn.Layer): + def __init__(self, + hidden_size=1024, + intermediate_size=4 * 1024, + dropout_ratio=0.1, + initializer_range=0.02): + super(MLPLayer, self).__init__() + d_model = hidden_size + dim_feedforward = intermediate_size + weight_attr = paddle.ParamAttr(initializer=nn.initializer.Normal( + mean=0.0, std=initializer_range)) + bias_attr = None + + self.linear0 = nn.Linear( + d_model, dim_feedforward, weight_attr, bias_attr=bias_attr) + self.linear1 = nn.Linear( + dim_feedforward, d_model, weight_attr, bias_attr=bias_attr) + self.linear2 = nn.Linear(d_model, 1, weight_attr, bias_attr=bias_attr) + self.norm = nn.LayerNorm(d_model, epsilon=1e-5) + self.dropout = nn.Dropout(dropout_ratio, mode="upscale_in_train") + + def forward(self, input): + out = self.norm(input) + out = self.linear0(out) + out = F.gelu(out, approximate=True) + out = self.linear1(out) + out = self.dropout(out) + out = self.linear2(out) + + return out + + +def mlp_pretrain_forward(train_program, start_program): + with static.program_guard(train_program, + start_program), utils.unique_name.guard(): + input = static.data( + name="input", + shape=[batch_size, sequence_len, hidden_size], + dtype='float32') + label = static.data( + name="label", shape=[batch_size, sequence_len, 1], dtype='float32') + + auto.shard_tensor( + input, + dist_attr={ + "process_mesh": _global_process_mesh, + "dims_mappig": [-1, -1, -1] + }) + + mlp = MLPLayer( + hidden_size=hidden_size, + intermediate_size=4 * hidden_size, + dropout_ratio=0.1, + initializer_range=0.02) + + predict = mlp(input) + error_cost = paddle.nn.functional.square_error_cost(predict, label) + loss = paddle.mean(error_cost) + + loader = paddle.io.DataLoader.from_generator( + feed_list=[input, label], capacity=4 * batch_size, iterable=True) + + return loss, train_program, start_program, loader + + +def train(): + global _global_process_mesh + _global_process_mesh = auto.ProcessMesh(mesh=[0, 1]) + + dist_strategy = fleet.DistributedStrategy() + dist_strategy.amp = False + dist_strategy.pipeline = False + dist_strategy.recompute = False + # init parallel optimizer + dist_strategy.semi_auto = True + + fleet.init(is_collective=True, strategy=dist_strategy) + + train_program = static.Program() + start_program = static.Program() + loss, train_program, start_program, loader = mlp_pretrain_forward( + train_program, start_program) + + optimizer = paddle.fluid.optimizer.AdamOptimizer( + learning_rate=0.00001, + beta1=0.9, + beta2=0.999, + epsilon=1e-08, + grad_clip=None) + + optimizer = fleet.distributed_optimizer(optimizer) + _, _, distributed_startup_program, distributed_main_program = optimizer.minimize( + loss, start_program) + + places = static.cuda_places() + loader.set_batch_generator(batch_generator_creator(), places=places) + exe = paddle.static.Executor(places[0]) + exe.run(distributed_startup_program) + + for data in loader(): + exe.run(distributed_main_program, feed=data, fetch_list=[loss]) + + +if __name__ == "__main__": + train() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/launch.py b/python/paddle/fluid/tests/unittests/auto_parallel/launch.py new file mode 100644 index 00000000000..c225fe85cd8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/launch.py @@ -0,0 +1,22 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from paddle.distributed.fleet import launch +from paddle.distributed.fleet.launch_utils import run_with_coverage + +if __name__ == "__main__": + if os.environ.get("WITH_COVERAGE", "OFF") == "ON": + run_with_coverage(True) + launch.launch() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py new file mode 100644 index 00000000000..321b2622862 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py @@ -0,0 +1,118 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +import sys +import json +import shutil +import subprocess +from paddle.distributed.fleet.launch_utils import run_with_coverage + +cluster_json = """ +{ + "machines": [ + { + "hostname": "machine1", + "addr": "127.0.0.1", + "port": "768", + "devices": [ + { + "global_id": 0, + "local_id": 0, + "type": "GPU", + "model": "Tesla V100-SXM2-32GB", + "sp_gflops": 15700, + "dp_gflops": 7800, + "memory": 32 + }, + { + "global_id": 1, + "local_id": 1, + "type": "GPU", + "model": "Tesla V100-SXM2-32GB", + "sp_gflops": 15700, + "dp_gflops": 7800, + "memory": 32 + }, + { + "global_id": 2, + "local_id": 0, + "type": "CPU", + "model": "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G", + "arch": "x86_64", + "vendor": "GenuineIntel", + "sp_gflops": 150, + "dp_gflops": 75, + "memory": "503" + } + ], + "links": [ + { + "source_global_id": 0, + "target_global_id": 1, + "type": "NVL", + "bandwidth": 42 + }, + { + "source_global_id": 1, + "target_global_id": 0, + "type": "PHB", + "bandwidth": 12 + } + ] + } + ] +} +""" + + +class TestAutoParallelReLaunch(unittest.TestCase): + def test_relaunch(self): + file_dir = os.path.dirname(os.path.abspath(__file__)) + cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_object = json.loads(cluster_json) + with open(cluster_json_path, "w") as cluster_json_file: + json.dump(cluster_json_object, cluster_json_file) + + launch_model_path = os.path.join(file_dir, + "auto_parallel_relaunch_model.py") + + if os.environ.get("WITH_COVERAGE", "OFF") == "ON": + coverage_args = ["-m", "coverage", "run", "--branch", "-p"] + else: + coverage_args = [] + + cmd = [sys.executable, "-u"] + coverage_args + [ + "-m", "launch", "--cluster_topo_path", cluster_json_path, + "--enable_auto_mapping", "True", launch_model_path + ] + process = subprocess.Popen(cmd) + process.wait() + self.assertEqual(process.returncode, 0) + + # Remove unnecessary files + if os.path.exists(cluster_json_path): + os.remove(cluster_json_path) + rank_mapping_json_path = os.path.join(file_dir, + "auto_parallel_rank_mapping.json") + if os.path.exists(rank_mapping_json_path): + os.remove(rank_mapping_json_path) + log_path = os.path.join(file_dir, "log") + if os.path.exists(log_path): + shutil.rmtree(log_path) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py index 7b60a9753bd..de37ac56bfb 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py @@ -52,6 +52,9 @@ from paddle.distributed.auto_parallel.mapper import mapping from paddle.distributed.auto_parallel.mapper import get_dtype_bytes from paddle.distributed.auto_parallel.mapper import get_comm_volume +if os.getenv("CUDA_VISIBLE_DEVICES") is not None: + os.environ["CUDA_VISIBLE_DEVICES"] = "" + paddle.enable_static() _global_parallel_strategy = None _global_process_mesh = None -- GitLab