未验证 提交 506e79d1 编写于 作者: Y Yulong Ao 提交者: GitHub

[Auto para] Relaunch with auto mapping function (#37326)

* [Auto Parallel]  Add the unified cluster representation

* [Auto Parallel] Add the graph class for physical mapping

* [Auto Parallel] Add the simple physical mapper

* Set the timeout of the mapper

* Merge the upstream develop unittests cmake files

* Fix a bug of the process group

* Remove mapper unittest from platforms which is not GPU

* Move the instantiation of process group after resharding

* Add the local id for devices

* Update the rank mapping format

* [Auto Parallel] Relaunch with the rank mapping file

* Remove the unnecessary json file

* Avoid entering get_device_proc_info for auto mapping

* Correct the mapper unit test

* Add some comments

* Remove the related files about mapping

* Update the unittest for auto mapping

* Remove unused rank_mapping unittest

* Improve the unittest coverage

* Improve the unittest coverage

* Improve the unittest of relaunch

* Fix the unittest problem in CI

* Improve the unittest of relaunch

* Remove unnecessary statements

* Update the unittest cmakefile

* Correct the cmakefile of auto parallel unittests

* Modify codes based on the new elastic change

* Use the GPUs exclusively in the unittest

* Correct the cmakefile

* Set the timeout of the unittest
上级 bfa0d7f3
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License # limitations under the License
import os
import operator import operator
import functools import functools
import json import json
...@@ -175,9 +176,19 @@ def build_process_graph(distributed_program): ...@@ -175,9 +176,19 @@ def build_process_graph(distributed_program):
def build_cluster_graph(cluster): def build_cluster_graph(cluster):
graph = Graph() graph = Graph()
cuda_visible_devices_env = os.getenv("CUDA_VISIBLE_DEVICES")
cuda_visible_devices = []
if cuda_visible_devices_env is not None and cuda_visible_devices_env != "":
cuda_visible_devices = [
int(d.strip()) for d in cuda_visible_devices_env.split(",")
]
for machine in cluster.machines.values(): for machine in cluster.machines.values():
for device in machine.devices.values(): for device in machine.devices.values():
graph.add_node(device.global_id, device=device) graph.add_node(device.global_id, device=device)
if cuda_visible_devices and device.local_id not in cuda_visible_devices:
graph.nodes[device.global_id]["occupied"] = True
else:
graph.nodes[device.global_id]["occupied"] = False
for link in machine.links.values(): for link in machine.links.values():
graph.add_edge( graph.add_edge(
link.source.global_id, link.target.global_id, link=link) link.source.global_id, link.target.global_id, link=link)
...@@ -195,9 +206,6 @@ def mapping(distributed_program, cluster): ...@@ -195,9 +206,6 @@ def mapping(distributed_program, cluster):
for cur_rank_node in process_graph: for cur_rank_node in process_graph:
cur_rank_node["visited"] = False cur_rank_node["visited"] = False
for cur_device_node in cluster_graph:
cur_device_node["occupied"] = False
def sort_by_comm_volume(rank_edge): def sort_by_comm_volume(rank_edge):
return rank_edge["comm_requirements"]["comm_volume"] return rank_edge["comm_requirements"]["comm_volume"]
......
...@@ -12,6 +12,13 @@ ...@@ -12,6 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import sys
import json
import shlex
import copy
import pathlib
import subprocess
import logging import logging
import paddle import paddle
from paddle.distributed.utils import get_logger from paddle.distributed.utils import get_logger
...@@ -23,9 +30,12 @@ from .dist_context import set_default_distributed_context ...@@ -23,9 +30,12 @@ from .dist_context import set_default_distributed_context
from .completion import complete_annotation, complete_backward_annotation from .completion import complete_annotation, complete_backward_annotation
from .partitioner import Partitioner from .partitioner import Partitioner
from .process_group import get_all_process_groups from .process_group import get_all_process_groups
from .process_group import get_world_process_groups
from .utils import make_data_unshard from .utils import make_data_unshard
from .utils import set_grad_var_shape from .utils import set_grad_var_shape
from .reshard import reshard from .reshard import reshard
from .cluster import Cluster
from .mapper import mapping
# from .auto_search import auto_search # from .auto_search import auto_search
_logger = get_logger(logging.INFO) _logger = get_logger(logging.INFO)
...@@ -46,6 +56,21 @@ class AutoParallelizer: ...@@ -46,6 +56,21 @@ class AutoParallelizer:
self._optimizer = self._fleet.user_defined_optimizer self._optimizer = self._fleet.user_defined_optimizer
self._dist_strategy = self._fleet._user_defined_strategy self._dist_strategy = self._fleet._user_defined_strategy
self._dist_context = DistributedContext() self._dist_context = DistributedContext()
self._cluster = None
self._cluster_topo_path = os.getenv("PADDLE_CLUSTER_TOPO_PATH", None)
if self._cluster_topo_path is not None:
self._cluster = Cluster()
self._cluster.build_from_file(self._cluster_topo_path)
# Prepare information for auto mapping
self._rank_mapping_path = os.getenv("PADDLE_RANK_MAPPING_PATH", None)
enable_auto_mapping_env = os.getenv("PADDLE_ENABLE_AUTO_MAPPING", None)
if enable_auto_mapping_env is None:
self._enable_auto_mapping = False
else:
self._enable_auto_mapping = True
self._need_rank_mapping = os.getenv("PADDLE_NEED_RANK_MAPPING")
self._need_rank_mapping = True if self._need_rank_mapping and \
self._need_rank_mapping.lower() == 'true' else False
def _remove_distributed_attrs(self, main_program): def _remove_distributed_attrs(self, main_program):
suffix = core.kAutoParallelSuffix() suffix = core.kAutoParallelSuffix()
...@@ -57,60 +82,103 @@ class AutoParallelizer: ...@@ -57,60 +82,103 @@ class AutoParallelizer:
if suffix in attr_name: if suffix in attr_name:
op._remove_attr(attr_name) op._remove_attr(attr_name)
def _get_dist_program(self, dist_context, rank):
# Annotation completion
completed_main_program = complete_annotation(self._main_program,
dist_context)
# Logical partition
partitioner = Partitioner(self._dist_strategy, dist_context, rank)
dist_main_prog, dist_startup_prog = partitioner.transpile_forward(
completed_main_program, self._startup_program)
dist_params_grads = partitioner.apply_backward(
self._loss, completed_main_program, self._startup_program,
dist_main_prog, dist_startup_prog)
dist_optimize_ops = partitioner.apply_optimize(
copy.deepcopy(self._optimizer), dist_params_grads, dist_main_prog,
dist_startup_prog)
make_data_unshard(dist_main_prog, dist_startup_prog, dist_context)
reshard(dist_main_prog, dist_startup_prog, rank, dist_context)
return dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog
def parallelize(self, def parallelize(self,
loss, loss,
startup_program, startup_program,
parameter_list=None, parameter_list=None,
no_grad_set=None): no_grad_set=None):
assert startup_program is not None assert startup_program is not None
main_program = loss.block.program self._loss = loss
self._startup_program = startup_program
if self._dist_strategy.auto_search: self._main_program = loss.block.program
# auto search self._parameter_list = parameter_list
_logger.info("Start search dist attr.") self._no_grad_set = no_grad_set
# self._dist_context, _ = auto_search(main_program, startup_program,
# loss, self._optimizer) if self._enable_auto_mapping and self._need_rank_mapping:
# completed_main_program = main_program # Do the mapping pass before parallelization
raise NotImplementedError("Auto search has not implemented") assert self._cluster is not None, \
else: "The cluster must not be none when using auto mapping."
# Annotation completion dist_programs = {}
_logger.info("Start annotation dist attr.") world_process_group = get_world_process_groups()
completed_main_program = complete_annotation(main_program, for rank in world_process_group.ranks:
self._dist_context) dist_context = DistributedContext()
dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog = self._get_dist_program(
# Logical partition dist_context, rank)
rank = paddle.distributed.get_rank() dist_programs[rank] = dist_main_prog
partitioner = Partitioner(self._dist_strategy, self._dist_context, rank)
partitioned_main_prog, partitioned_startup_prog = partitioner.transpile_forward( # Do the mapping between the distributed program graph and the cluster graph
completed_main_program, startup_program) rank_mapping_dict = mapping(dist_programs, self._cluster)
dist_params_grads = partitioner.apply_backward( rank_mapping = list(rank_mapping_dict.values())
loss, completed_main_program, startup_program,
partitioned_main_prog, partitioned_startup_prog)
dist_optimize_ops = partitioner.apply_optimize(
self._optimizer, dist_params_grads, partitioned_main_prog,
partitioned_startup_prog)
# set the grad var shape # Relaunch the training by using the rank mapping file
set_grad_var_shape(partitioned_main_prog, self._dist_context) with open(self._rank_mapping_path, "w") as rank_mapping_file:
json.dump(rank_mapping, rank_mapping_file)
enable_elastic = os.getenv("PADDLE_ENABLE_ELASTIC")
enable_elastic = True if enable_elastic and enable_elastic.lower(
) == 'true' else False
if enable_elastic:
print("Auto mapping finished, now do elastic re-launch")
sys.exit(paddle.distributed.fleet.elastic.manager.
ELASTIC_AUTO_PARALLEL_EXIT_CODE)
original_cmd_args = os.getenv("PADDLE_ORIGINAL_CMD_ARGS")
rank_mapping_args = " ".join(
["--rank_mapping_path", self._rank_mapping_path])
if os.environ.get("WITH_COVERAGE", "OFF") == "ON":
coverage_args = ["-m", "coverage", "run", "--branch", "-p"]
else:
coverage_args = []
new_cmd_args = "-m paddle.distributed.fleet.launch" + " " + rank_mapping_args + " " + original_cmd_args
new_cmd = [sys.executable, "-u"] + coverage_args + shlex.split(
new_cmd_args)
new_process = subprocess.Popen(new_cmd)
new_process.wait()
assert new_process.returncode == 0, \
"Launch failed with rank mapping"
print("Successfully do the second launch for auto mapping!")
sys.exit(0)
else:
# Parallelization after the mapping pass
rank = paddle.distributed.get_rank()
# The last step: remove all distributed attributes to be compatiable dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog = self._get_dist_program(
# with inference. self._dist_context, rank)
self._remove_distributed_attrs(partitioned_main_prog)
make_data_unshard(partitioned_main_prog, partitioned_startup_prog,
self._dist_context)
reshard(partitioned_main_prog, partitioned_startup_prog, rank, # Traverse different rank programs and traverse each op of them,
self._dist_context) # instantiate communication by process_mapping.
all_process_groups = get_all_process_groups()
for process_group in all_process_groups:
if rank not in process_group.ranks:
continue
process_group.instantiate()
# Traverse different rank programs and traverse each op of them, # Copy distributed info to the default context
# instantiate communication by process_mapping. set_default_distributed_context(self._dist_context)
all_process_groups = get_all_process_groups()
for process_group in all_process_groups:
if rank not in process_group.ranks:
continue
process_group.instantiate()
# Copy distributed info to the default context # The last step: remove all distributed attributes to be compatible
set_default_distributed_context(self._dist_context) # with inference.
self._remove_distributed_attrs(dist_main_prog)
return dist_optimize_ops, dist_params_grads, partitioned_startup_prog, partitioned_main_prog return dist_optimize_ops, dist_params_grads, dist_startup_prog, dist_main_prog
...@@ -19,10 +19,6 @@ from ..collective import _new_ring_id ...@@ -19,10 +19,6 @@ from ..collective import _new_ring_id
from ...fluid.framework import in_dygraph_mode from ...fluid.framework import in_dygraph_mode
from ...fluid.layers.tensor import fill_constant from ...fluid.layers.tensor import fill_constant
# Note that Process group 0 is reserved for representing all ranks.
# At the begining, group 0 is empty and new ranks will be added automatically.
_g_process_group_map = {}
def get_all_process_groups(): def get_all_process_groups():
global _g_process_group_map global _g_process_group_map
...@@ -34,6 +30,11 @@ def get_process_group(group_id): ...@@ -34,6 +30,11 @@ def get_process_group(group_id):
return _g_process_group_map.get(group_id, None) return _g_process_group_map.get(group_id, None)
def get_world_process_groups():
global _g_process_group_map
return _g_process_group_map[0]
def new_process_group(ranks): def new_process_group(ranks):
global _g_process_group_map global _g_process_group_map
# A key constructed from ranks is used for avoiding duplication # A key constructed from ranks is used for avoiding duplication
...@@ -151,4 +152,7 @@ class ProcessGroup: ...@@ -151,4 +152,7 @@ class ProcessGroup:
return string return string
# Note that Process group 0 is reserved for representing all ranks.
# At the begining, group 0 is empty and new ranks will be added automatically.
_g_process_group_map = {}
_g_process_group_map[0] = ProcessGroup(0, []) _g_process_group_map[0] = ProcessGroup(0, [])
...@@ -175,25 +175,17 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra ...@@ -175,25 +175,17 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
default="127.0.0.1", default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
collective_group.add_argument( collective_group.add_argument(
"--rank_mapping_file", "--cluster_topo_path",
type=argparse.FileType('r'), type=str,
default=sys.stdin, default=None,
help="This rank mapping information in json format is used specifically " help="A json format file will be stored in this path which is used"
"for lazy launch for auto parallel. Some of the ranks in each node " "to represent the cluster topology information for auto parallel.")
"may not be used, and the indices of rank should be kept the same " collective_group.add_argument(
"as the indices of sub-task splited by auto parallel. " "--rank_mapping_path",
" { " type=str,
" \"ip_ranks\": [ " default=None,
" { " help="A json format file will be stored in this path which is used"
" \"ip\": \"127.0.0.1\", " "to map processes to machines for auto parallel.")
" \"ranks\": [0,1] "
" }, "
" { "
" \"ip\": \"127.0.0.2\", "
" \"ranks\": [2,3,4] "
" } "
" ] "
" } ")
collective_group.add_argument( collective_group.add_argument(
"--enable_auto_mapping", "--enable_auto_mapping",
type=bool, type=bool,
...@@ -297,20 +289,56 @@ def cpuonly_check(args): ...@@ -297,20 +289,56 @@ def cpuonly_check(args):
def get_cluster_info(args): def get_cluster_info(args):
# parse arguments, used for cloud-single-machine and local # parse arguments, used for cloud-single-machine and local
if args.backend == 'gloo': cpuonly_check(args) if args.backend == 'gloo': cpuonly_check(args)
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) if args.enable_auto_mapping:
(device_mode, devices_per_proc) = (DeviceMode.GPU, [])
else:
(device_mode,
devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num() trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format(
trainers_num, device_mode, devices_per_proc)) trainers_num, device_mode, devices_per_proc))
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
cluster = None cluster = None
pod = None pod = None
start_port = 6170 start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None: if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT') start_port = os.environ.get('FLAGS_START_PORT')
# lazy launch for auto-parallel # auto mapping between processes and devices for auto-parallel
if args.enable_auto_mapping == True: if args.enable_auto_mapping == True:
cluster, pod = get_mapped_cluster_from_args(args, device_mode) assert args.cluster_topo_path is not None, \
"The cluster topology must be provied when enabling auto mapping."
rank_mapping_path = args.rank_mapping_path or os.getenv(
"PADDLE_RANK_MAPPING_PATH")
if not rank_mapping_path:
os.environ["PADDLE_NEED_RANK_MAPPING"] = str(True)
os.environ["PADDLE_ENABLE_ELASTIC"] = str(
enable_elastic(args, device_mode))
cwd = pathlib.Path().resolve()
rank_mapping_path = os.path.join(cwd,
"auto_parallel_rank_mapping.json")
os.environ["PADDLE_RANK_MAPPING_PATH"] = str(rank_mapping_path)
original_args = sys.argv[1:]
os.environ["PADDLE_ORIGINAL_CMD_ARGS"] = " ".join(original_args)
os.environ["PADDLE_CLUSTER_TOPO_PATH"] = str(args.cluster_topo_path)
os.environ["PADDLE_ENABLE_AUTO_MAPPING"] = str(
args.enable_auto_mapping)
cluster, pod = launch_utils.get_mapped_cluster_from_args_without_rank_mapping(
args, device_mode)
else:
os.environ["PADDLE_NEED_RANK_MAPPING"] = str(False)
os.environ["PADDLE_ENABLE_ELASTIC"] = str(
enable_elastic(args, device_mode))
os.environ["PADDLE_CLUSTER_TOPO_PATH"] = str(args.cluster_topo_path)
os.environ["PADDLE_RANK_MAPPING_PATH"] = str(rank_mapping_path)
os.environ["PADDLE_ENABLE_AUTO_MAPPING"] = str(
args.enable_auto_mapping)
cluster, pod = launch_utils.get_mapped_cluster_from_args_with_rank_mapping(
args, device_mode)
elif cloud_utils.use_paddlecloud() and trainers_num != 1: elif cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster( cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port) args.ips, device_mode, devices_per_proc, start_port)
...@@ -328,6 +356,7 @@ def get_cluster_info(args): ...@@ -328,6 +356,7 @@ def get_cluster_info(args):
logger.debug("get cluster from args:{}".format(cluster)) logger.debug("get cluster from args:{}".format(cluster))
return cluster, pod return cluster, pod
def get_global_envs(args, tmp_dir): def get_global_envs(args, tmp_dir):
global_envs = copy.copy(os.environ.copy()) global_envs = copy.copy(os.environ.copy())
# add gloo env # add gloo env
......
...@@ -511,6 +511,17 @@ def start_local_trainers(cluster, ...@@ -511,6 +511,17 @@ def start_local_trainers(cluster,
"PADDLE_WORLD_DEVICE_IDS": ",".join(res), "PADDLE_WORLD_DEVICE_IDS": ",".join(res),
} }
# The following three environnement variables are used for auto mapping
if current_env.get("PADDLE_CLUSTER_TOPO_PATH", None) is not None:
proc_env["PADDLE_CLUSTER_TOPO_PATH"] = current_env[
"PADDLE_CLUSTER_TOPO_PATH"]
if current_env.get("PADDLE_RANK_MAPPING_PATH", None) is not None:
proc_env["PADDLE_RANK_MAPPING_PATH"] = current_env[
"PADDLE_RANK_MAPPING_PATH"]
if current_env.get("PADDLE_ENABLE_AUTO_MAPPING", None) is not None:
proc_env["PADDLE_ENABLE_AUTO_MAPPING"] = current_env[
"PADDLE_ENABLE_AUTO_MAPPING"]
if len(t.accelerators) > 0 and pod.device_mode == DeviceMode.GPU: if len(t.accelerators) > 0 and pod.device_mode == DeviceMode.GPU:
proc_env["FLAGS_selected_gpus"] = "%s" % ",".join( proc_env["FLAGS_selected_gpus"] = "%s" % ",".join(
[str(g) for g in t.accelerators]) [str(g) for g in t.accelerators])
...@@ -531,7 +542,8 @@ def start_local_trainers(cluster, ...@@ -531,7 +542,8 @@ def start_local_trainers(cluster,
current_env.update(proc_env) current_env.update(proc_env)
coverage_args = [] coverage_args = []
if run_with_coverage(): if run_with_coverage() or os.environ.get("WITH_COVERAGE",
"OFF") == "ON":
coverage_args = ["-m", "coverage", "run", "--branch", "-p"] coverage_args = ["-m", "coverage", "run", "--branch", "-p"]
cmd = [sys.executable, "-u"] + coverage_args + [training_script cmd = [sys.executable, "-u"] + coverage_args + [training_script
] + training_script_args ] + training_script_args
...@@ -557,7 +569,11 @@ def start_local_trainers(cluster, ...@@ -557,7 +569,11 @@ def start_local_trainers(cluster,
with open("%s/endpoints.log" % log_dir, "w") as f: with open("%s/endpoints.log" % log_dir, "w") as f:
f.write("PADDLE_TRAINER_ENDPOINTS: \n") f.write("PADDLE_TRAINER_ENDPOINTS: \n")
f.write("\n".join(cluster.trainers_endpoints())) f.write("\n".join(cluster.trainers_endpoints()))
fn = open("%s/workerlog.%d" % (log_dir, idx), "a") if current_env.get("PADDLE_ENABLE_AUTO_MAPPING") is not None \
and current_env.get("PADDLE_NEED_RANK_MAPPING").lower() == "true":
fn = open("%s/prelaunchlog.%d" % (log_dir, idx), "a")
else:
fn = open("%s/workerlog.%d" % (log_dir, idx), "a")
proc = subprocess.Popen( proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, preexec_fn=pre_fn) cmd, env=current_env, stdout=fn, stderr=fn, preexec_fn=pre_fn)
else: else:
...@@ -876,8 +892,8 @@ def get_custom_endpoints(origin_endpoints, offset=0): ...@@ -876,8 +892,8 @@ def get_custom_endpoints(origin_endpoints, offset=0):
# pretty_print_envs(environs))) # pretty_print_envs(environs)))
def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, def get_mapped_cluster_without_rank_mapping(
node_mapping_ranks): node_ips, node_ip, trainer_endpoints, device_mode, node_ranks):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list" assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
assert device_mode == DeviceMode.GPU, \ assert device_mode == DeviceMode.GPU, \
"Only support get mapped cluster for gpu now." "Only support get mapped cluster for gpu now."
...@@ -890,17 +906,121 @@ def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, ...@@ -890,17 +906,121 @@ def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
cur_node_endpoints = trainer_endpoints[node_rank] cur_node_endpoints = trainer_endpoints[node_rank]
# choose rank from global mapped ranks and set it to the trainer. # choose rank from global mapped ranks and set it to the trainer.
ranks_per_node = node_mapping_ranks[node_rank] ranks_per_node = node_ranks[node_rank]
assert len(ranks_per_node) == 1
for i in range(len(ranks_per_node)): for i in range(len(ranks_per_node)):
trainer = Trainer() trainer = Trainer()
# change global rank(mapped) to local rank within each node.
# e.g. mapped ranks of node: 3,4,7 -> 0,1,2
local_rank = ranks_per_node.index(ranks_per_node[i])
trainer.accelerators.append(local_rank)
trainer.endpoint = "%s" % (cur_node_endpoints[i]) trainer.endpoint = "%s" % (cur_node_endpoints[i])
# global mapped ranks
trainer.rank = ranks_per_node[i] trainer.rank = ranks_per_node[i]
pod.trainers.append(trainer)
cluster.pods.append(pod)
pod_rank = node_ips.index(node_ip)
return cluster, cluster.pods[pod_rank]
def get_mapped_cluster_from_args_without_rank_mapping(args, device_mode):
assert device_mode == DeviceMode.GPU, \
"Only support get mapped cluster for gpu now."
gpus_num = fluid.core.get_cuda_device_count()
# parse ip-ranks json file
cluster_topo = None
with open(args.cluster_topo_path, "r") as json_file:
cluster_topo = json.load(json_file)
node_ips = []
node_ranks = []
for idx, cur_cluster_topo in enumerate(cluster_topo["machines"]):
node_ips.append(cur_cluster_topo['addr'])
node_ranks.append([idx])
if len(node_ips) == 1:
node_ip = node_ips[0]
else:
if args.host:
node_ip = args.host
else:
_, node_ip = get_host_name_ip()
assert node_ip in node_ips, \
"Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips)
node_rank = node_ips.index(node_ip)
assert len(node_ranks) == len(node_ips), \
"ranks length should be equal to ips length."
logger.debug("parsed from args: node_ips:{} node_ip:{} "
"node_rank:{} node_ranks:{}".format(
node_ips, node_ip, node_rank, node_ranks[node_rank]))
# NOTE: there are different number of global mapped ranks on each node.
free_ports = []
trainer_endpoints = []
for ip in node_ips:
node_rank = node_ips.index(ip)
if os.environ.get('PADDLE_PORT') is not None:
start_port = int(os.getenv("PADDLE_PORT", ""))
free_ports = [
x
for x in range(start_port, start_port + len(node_ranks[
node_rank]))
]
elif os.environ.get('FLAGS_START_PORT') is not None:
start_port = int(os.environ.get('FLAGS_START_PORT'))
free_ports = [
x
for x in range(start_port, start_port + len(node_ranks[
node_rank]))
]
else:
free_ports = find_free_ports(len(node_ranks[node_rank]))
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_mapped_cluster_without_rank_mapping(
node_ips, node_ip, trainer_endpoints, device_mode, node_ranks)
def get_mapped_cluster_with_rank_mapping(node_ips, node_ip, trainer_endpoints,
device_mode, node_ranks,
node_rank_mappings):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
assert device_mode == DeviceMode.GPU, \
"Only support get mapped cluster for gpu now."
def get_relative_gpu_id(gpu_id):
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
return gpu_id
else:
cuda_visible_devices_list = cuda_visible_devices.split(',')
relative_id = cuda_visible_devices_list.index(str(gpu_id))
logger.info(
"Change gpu id from {} to {} based on CUDA_VISIBLE_DEVICES {}".
format(gpu_id, relative_id, cuda_visible_devices_list))
return relative_id
cluster = Cluster(hdfs=None)
for node_rank, ip in enumerate(node_ips):
pod = Pod()
pod.rank = node_rank
pod.addr = ip
pod.device_mode = device_mode
cur_node_endpoints = trainer_endpoints[node_rank]
# choose rank from global mapped ranks and set it to the trainer.
ranks_per_node = node_ranks[node_rank]
cur_node_rank_mapping = node_rank_mappings[node_rank]
for i in range(len(ranks_per_node)):
trainer = Trainer()
local_device_ids = cur_node_rank_mapping["ranks"][str(
ranks_per_node[i])]
assert len(local_device_ids) == 1, \
"Only support one process to one device mapping"
trainer.accelerators.append(
get_relative_gpu_id(local_device_ids[0]))
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.rank = ranks_per_node[i]
pod.trainers.append(trainer) pod.trainers.append(trainer)
cluster.pods.append(pod) cluster.pods.append(pod)
...@@ -908,22 +1028,31 @@ def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, ...@@ -908,22 +1028,31 @@ def get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
return cluster, cluster.pods[pod_rank] return cluster, cluster.pods[pod_rank]
def get_mapped_cluster_from_args(args, device_mode): def get_mapped_cluster_from_args_with_rank_mapping(args, device_mode):
assert device_mode == DeviceMode.GPU, \ assert device_mode == DeviceMode.GPU, \
"Only support get mapped cluster for gpu now." "Only support get mapped cluster for gpu now."
gpus_num = fluid.core.get_cuda_device_count() gpus_num = fluid.core.get_cuda_device_count()
# parse ip-ranks json file # parse ip-ranks json file
json_data = None rank_mapping_path = args.rank_mapping_path or os.getenv(
with args.rank_mapping_file as json_file: "PADDLE_RANK_MAPPING_PATH")
json_data = json.load(json_file) rank_mapping = None
with open(rank_mapping_path, "r") as json_file:
rank_mapping = json.load(json_file)
# reset PADDLE_RANK_MAPPING_PATH env
os.environ["PADDLE_RANK_MAPPING_PATH"] = ""
node_ips = [] node_ips = []
node_ranks_mapping = [] node_ranks = []
ip_ranks_list = json_data['ip_ranks'] node_rank_mappings = []
for ip_ranks in ip_ranks_list: for cur_rank_mapping in rank_mapping:
node_ips.append(ip_ranks['ip']) node_ips.append(cur_rank_mapping['addr'])
node_ranks_mapping.append(ip_ranks['ranks']) cur_node_rank_list = [
int(i) for i in list(cur_rank_mapping['ranks'].keys())
]
cur_node_rank_list.sort()
node_ranks.append(cur_node_rank_list)
node_rank_mappings.append(cur_rank_mapping)
if len(node_ips) == 1: if len(node_ips) == 1:
node_ip = node_ips[0] node_ip = node_ips[0]
...@@ -937,31 +1066,41 @@ def get_mapped_cluster_from_args(args, device_mode): ...@@ -937,31 +1066,41 @@ def get_mapped_cluster_from_args(args, device_mode):
"Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips) "Can't find your local ip {%s} in node_ips: {%s}" % (node_ip, node_ips)
node_rank = node_ips.index(node_ip) node_rank = node_ips.index(node_ip)
assert len(node_ranks_mapping[node_rank]) <= gpus_num, \ assert len(node_ranks[node_rank]) <= gpus_num, \
"number of ranks mapped to one node should not exceed the avaiable ones." "number of ranks mapped to one node should not exceed the avaiable ones."
assert len(node_ranks_mapping) == len(node_ips), \ assert len(node_ranks) == len(node_ips), \
"ranks length should be equal to ips length." "ranks length should be equal to ips length."
logger.debug("parsed from args: node_ips:{} node_ip:{} " logger.debug("parsed from args: node_ips:{} node_ip:{} "
"node_rank:{} node_ranks_mapping:{}".format( "node_rank:{} node_ranks:{}".format(
node_ips, node_ip, node_rank, node_ranks_mapping[ node_ips, node_ip, node_rank, node_ranks[node_rank]))
node_rank]))
# NOTE: there are different number of global mapped ranks on each node. # NOTE: there are different number of global mapped ranks on each node.
free_ports = [] free_ports = []
trainer_endpoints = [] trainer_endpoints = []
for ip in node_ips: for ip in node_ips:
node_rank = node_ips.index(ip) node_rank = node_ips.index(ip)
if os.environ.get('FLAGS_START_PORT') is not None: if os.environ.get('PADDLE_PORT') is not None:
start_port = int(os.getenv("PADDLE_PORT", ""))
free_ports = [
x
for x in range(start_port, start_port + len(node_ranks[
node_rank]))
]
elif os.environ.get('FLAGS_START_PORT') is not None:
start_port = int(os.environ.get('FLAGS_START_PORT')) start_port = int(os.environ.get('FLAGS_START_PORT'))
end_port = start_port + len(node_ranks_mapping[node_rank]) free_ports = [
free_ports = [x for x in range(start_port, end_port)] x
for x in range(start_port, start_port + len(node_ranks[
node_rank]))
]
else: else:
free_ports = find_free_ports(len(node_ranks_mapping[node_rank])) free_ports = find_free_ports(len(node_ranks[node_rank]))
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_mapped_cluster(node_ips, node_ip, trainer_endpoints, device_mode, return get_mapped_cluster_with_rank_mapping(node_ips, node_ip,
node_ranks_mapping) trainer_endpoints, device_mode,
node_ranks, node_rank_mappings)
class ParameterServerLauncher(object): class ParameterServerLauncher(object):
...@@ -1229,14 +1368,18 @@ class ParameterServerLauncher(object): ...@@ -1229,14 +1368,18 @@ class ParameterServerLauncher(object):
_, self.current_node_ip = get_host_name_ip() _, self.current_node_ip = get_host_name_ip()
else: else:
self.current_node_ip = pod_ip self.current_node_ip = pod_ip
assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ if not self.distribute_mode == DistributeMode.PS_HETER:
% (self.current_node_ip, self.node_ips) assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
self.node_rank = self.node_ips.index(self.current_node_ip) % (self.current_node_ip, self.node_ips)
logger.debug( if self.current_node_ip in self.node_ips:
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}". self.node_rank = self.node_ips.index(self.current_node_ip)
format(self.node_ips, self.current_node_ip, self.node_rank)) logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}".
format(self.node_ips, self.current_node_ip, self.node_rank))
def start_ps(self): def start_ps(self):
if not self.current_node_ip in self.node_ips:
return
cluster = Cluster(hdfs=None) cluster = Cluster(hdfs=None)
server_rank = 0 server_rank = 0
worker_rank = 0 worker_rank = 0
......
...@@ -589,6 +589,8 @@ set_tests_properties(test_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") ...@@ -589,6 +589,8 @@ set_tests_properties(test_nn_grad PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
if(WITH_DISTRIBUTE) if(WITH_DISTRIBUTE)
add_subdirectory(distributed_passes) add_subdirectory(distributed_passes)
add_subdirectory(auto_parallel)
# FIXME(typhoonzero): add these tests back # FIXME(typhoonzero): add these tests back
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transformer") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transformer")
list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transpiler") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transpiler")
......
# file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
# string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_auto_parallel_relaunch MODULES test_auto_parallel_relaunch ENVS ${dist_ENVS})
set_tests_properties(test_auto_parallel_relaunch PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120)
endif()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import time
import paddle.fluid as fluid
import copy
import os
import numpy as np
import subprocess
import paddle
import paddle.nn as nn
import paddle.fluid as fluid
import paddle.static as static
import paddle.nn.functional as F
import paddle.utils as utils
from paddle.fluid import layers
from paddle.io import IterableDataset, DataLoader
from paddle.distributed import fleet
import paddle.distributed.auto_parallel as auto
paddle.enable_static()
_global_parallel_strategy = None
_global_process_mesh = None
batch_size = 4
hidden_size = 1024
sequence_len = 512
def get_random_inputs_and_labels(input_shape, label_shape):
input = np.random.random(size=input_shape).astype('float32')
label = np.random.random(size=label_shape).astype('float32')
return input, label
def batch_generator_creator():
def __reader__():
for _ in range(batch_size):
batch_input, batch_label = get_random_inputs_and_labels(
[batch_size, sequence_len, hidden_size],
[batch_size, sequence_len, 1])
yield batch_input, batch_label
return __reader__
class MLPLayer(nn.Layer):
def __init__(self,
hidden_size=1024,
intermediate_size=4 * 1024,
dropout_ratio=0.1,
initializer_range=0.02):
super(MLPLayer, self).__init__()
d_model = hidden_size
dim_feedforward = intermediate_size
weight_attr = paddle.ParamAttr(initializer=nn.initializer.Normal(
mean=0.0, std=initializer_range))
bias_attr = None
self.linear0 = nn.Linear(
d_model, dim_feedforward, weight_attr, bias_attr=bias_attr)
self.linear1 = nn.Linear(
dim_feedforward, d_model, weight_attr, bias_attr=bias_attr)
self.linear2 = nn.Linear(d_model, 1, weight_attr, bias_attr=bias_attr)
self.norm = nn.LayerNorm(d_model, epsilon=1e-5)
self.dropout = nn.Dropout(dropout_ratio, mode="upscale_in_train")
def forward(self, input):
out = self.norm(input)
out = self.linear0(out)
out = F.gelu(out, approximate=True)
out = self.linear1(out)
out = self.dropout(out)
out = self.linear2(out)
return out
def mlp_pretrain_forward(train_program, start_program):
with static.program_guard(train_program,
start_program), utils.unique_name.guard():
input = static.data(
name="input",
shape=[batch_size, sequence_len, hidden_size],
dtype='float32')
label = static.data(
name="label", shape=[batch_size, sequence_len, 1], dtype='float32')
auto.shard_tensor(
input,
dist_attr={
"process_mesh": _global_process_mesh,
"dims_mappig": [-1, -1, -1]
})
mlp = MLPLayer(
hidden_size=hidden_size,
intermediate_size=4 * hidden_size,
dropout_ratio=0.1,
initializer_range=0.02)
predict = mlp(input)
error_cost = paddle.nn.functional.square_error_cost(predict, label)
loss = paddle.mean(error_cost)
loader = paddle.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=True)
return loss, train_program, start_program, loader
def train():
global _global_process_mesh
_global_process_mesh = auto.ProcessMesh(mesh=[0, 1])
dist_strategy = fleet.DistributedStrategy()
dist_strategy.amp = False
dist_strategy.pipeline = False
dist_strategy.recompute = False
# init parallel optimizer
dist_strategy.semi_auto = True
fleet.init(is_collective=True, strategy=dist_strategy)
train_program = static.Program()
start_program = static.Program()
loss, train_program, start_program, loader = mlp_pretrain_forward(
train_program, start_program)
optimizer = paddle.fluid.optimizer.AdamOptimizer(
learning_rate=0.00001,
beta1=0.9,
beta2=0.999,
epsilon=1e-08,
grad_clip=None)
optimizer = fleet.distributed_optimizer(optimizer)
_, _, distributed_startup_program, distributed_main_program = optimizer.minimize(
loss, start_program)
places = static.cuda_places()
loader.set_batch_generator(batch_generator_creator(), places=places)
exe = paddle.static.Executor(places[0])
exe.run(distributed_startup_program)
for data in loader():
exe.run(distributed_main_program, feed=data, fetch_list=[loss])
if __name__ == "__main__":
train()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from paddle.distributed.fleet import launch
from paddle.distributed.fleet.launch_utils import run_with_coverage
if __name__ == "__main__":
if os.environ.get("WITH_COVERAGE", "OFF") == "ON":
run_with_coverage(True)
launch.launch()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import sys
import json
import shutil
import subprocess
from paddle.distributed.fleet.launch_utils import run_with_coverage
cluster_json = """
{
"machines": [
{
"hostname": "machine1",
"addr": "127.0.0.1",
"port": "768",
"devices": [
{
"global_id": 0,
"local_id": 0,
"type": "GPU",
"model": "Tesla V100-SXM2-32GB",
"sp_gflops": 15700,
"dp_gflops": 7800,
"memory": 32
},
{
"global_id": 1,
"local_id": 1,
"type": "GPU",
"model": "Tesla V100-SXM2-32GB",
"sp_gflops": 15700,
"dp_gflops": 7800,
"memory": 32
},
{
"global_id": 2,
"local_id": 0,
"type": "CPU",
"model": "Intel(R) Xeon(R) Gold 6271C CPU @ 2.60G",
"arch": "x86_64",
"vendor": "GenuineIntel",
"sp_gflops": 150,
"dp_gflops": 75,
"memory": "503"
}
],
"links": [
{
"source_global_id": 0,
"target_global_id": 1,
"type": "NVL",
"bandwidth": 42
},
{
"source_global_id": 1,
"target_global_id": 0,
"type": "PHB",
"bandwidth": 12
}
]
}
]
}
"""
class TestAutoParallelReLaunch(unittest.TestCase):
def test_relaunch(self):
file_dir = os.path.dirname(os.path.abspath(__file__))
cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json")
cluster_json_object = json.loads(cluster_json)
with open(cluster_json_path, "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file)
launch_model_path = os.path.join(file_dir,
"auto_parallel_relaunch_model.py")
if os.environ.get("WITH_COVERAGE", "OFF") == "ON":
coverage_args = ["-m", "coverage", "run", "--branch", "-p"]
else:
coverage_args = []
cmd = [sys.executable, "-u"] + coverage_args + [
"-m", "launch", "--cluster_topo_path", cluster_json_path,
"--enable_auto_mapping", "True", launch_model_path
]
process = subprocess.Popen(cmd)
process.wait()
self.assertEqual(process.returncode, 0)
# Remove unnecessary files
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)
rank_mapping_json_path = os.path.join(file_dir,
"auto_parallel_rank_mapping.json")
if os.path.exists(rank_mapping_json_path):
os.remove(rank_mapping_json_path)
log_path = os.path.join(file_dir, "log")
if os.path.exists(log_path):
shutil.rmtree(log_path)
if __name__ == "__main__":
unittest.main()
...@@ -52,6 +52,9 @@ from paddle.distributed.auto_parallel.mapper import mapping ...@@ -52,6 +52,9 @@ from paddle.distributed.auto_parallel.mapper import mapping
from paddle.distributed.auto_parallel.mapper import get_dtype_bytes from paddle.distributed.auto_parallel.mapper import get_dtype_bytes
from paddle.distributed.auto_parallel.mapper import get_comm_volume from paddle.distributed.auto_parallel.mapper import get_comm_volume
if os.getenv("CUDA_VISIBLE_DEVICES") is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = ""
paddle.enable_static() paddle.enable_static()
_global_parallel_strategy = None _global_parallel_strategy = None
_global_process_mesh = None _global_process_mesh = None
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册