From 86f059117579585db5e0ab59c1543177860260a3 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 16 Aug 2019 14:38:35 +0800 Subject: [PATCH] Remove node_num function. (#19167) node_num is not needed for users, so remove them and fix the bugs about it! --- python/paddle/distributed/launch.py | 3 + .../fluid/incubate/fleet/base/fleet_base.py | 8 -- .../fluid/incubate/fleet/base/role_maker.py | 19 ---- .../incubate/fleet/collective/__init__.py | 89 +++++++++++++------ .../distribute_transpiler/__init__.py | 8 -- 5 files changed, 64 insertions(+), 63 deletions(-) diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index 91b126aaaf4..82322c657e5 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -183,6 +183,9 @@ def start_procs(args): "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints }) + if num_nodes > 1: + current_env.update({"FLAGS_sync_nccl_allreduce": "0"}) + cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index 658d971a731..ac9b0f23276 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -232,14 +232,6 @@ class Fleet(object): def save_persistables(self, executor, dirname, main_program=None): pass - @abc.abstractmethod - def node_num(self): - pass - - @abc.abstractmethod - def node_id(self): - pass - class DistributedOptimizer(object): """ diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index ff99a912533..1369cea5805 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -384,27 +384,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._worker_endpoints = self._worker_endpoints.split(",") self._trainers_num = len(self._worker_endpoints) - self._node_ips = self._get_node_ips_from_endpoints( - self._worker_endpoints) - self._node_ip = self._current_endpoint.split(":")[0].strip() - - self._node_num = len(self._node_ips) - self._node_id = self._node_ips.index(self._node_ip) self._role_is_generated = True - def _get_node_ips_from_endpoints(self, endpoints): - ss = set() - ips = [] - for ep in endpoints: - ip = ep.split(":")[0].strip() - if ip not in ss: - ss.add(ip) - ips.append(ip) - else: - continue - - return ips - def get_pserver_endpoints(self): if not self._role_is_generated: self.generate_role() diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index 6f67ecc4a9a..b995b278e30 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -85,12 +85,6 @@ class Collective(Fleet): def save_persistables(self, executor, dirname, main_program=None): io.save_persistables(executor, dirname, main_program, None) - def node_num(self): - return self._role_maker._node_num - - def node_id(self): - return self._role_maker._node_id - fleet = Collective() @@ -102,9 +96,6 @@ class DistributedStrategy(fluid.BuildStrategy): def __init__(self): super(DistributedStrategy, self).__init__() - self.fuse_memory_size = -1 - self.fuse_layer_size = 1 - self.use_local_sgd = False self.use_dist_fc = False @@ -112,21 +103,9 @@ class DistributedStrategy(fluid.BuildStrategy): self.dist_fc_config = None # DistFCConfig self.mode = "nccl2" # or collective self.collective_mode = None # local_sgd or grad_allreduce - - self.nccl_comm_num = 2 + self.nccl_comm_num = 1 self.exec_strategy = fluid.ExecutionStrategy() - sync_allreduce = os.getenv("FLAGS_sync_nccl_allreduce") - if sync_allreduce == "0": - self._exec_strategy.num_threads = self.nccl_comm_num + 1 - if sef.use_hierarchical_allreduce: - self._exec_strategy.num_threads = 2 * self.nccl_comm_num + 1 - if self._exec_strategy.num_threads > 4: - print( - sys.stderr, - "WARNING: if you use use_hierarchical_allreduce or " - "with multi nccl comm, please set FLAGS_sync_nccl_allreduce = 0" - ) class CollectiveOpBasedOptimizer(DistributedOptimizer): @@ -215,12 +194,6 @@ class CollectiveOptimizer(DistributedOptimizer): """ Transpile the programs to distributed programs. And add the variables. """ - if self._strategy.fuse_all_reduce_ops: - os.environ[ - 'FLAGS_fuse_parameter_memory_size'] = self.fuse_memory_size - os.environ[ - 'FLAGS_fuse_parameter_groups_size'] = self.fuse_layer_size - worker_endpoints = fleet.worker_endpoints() trainer_id = fleet.worker_index() current_endpoint = fleet.worker_endpoints()[trainer_id] @@ -249,7 +222,67 @@ class CollectiveOptimizer(DistributedOptimizer): program=main_program, current_endpoint=current_endpoint) + def _get_node_ips_from_endpoints(self, endpoints): + ss = set() + ips = [] + for ep in endpoints: + ip = ep.split(":")[0].strip() + if ip not in ss: + ss.add(ip) + ips.append(ip) + else: + continue + + return ips + + def _node_num(self): + worker_endpoints = fleet.worker_endpoints() + current_endpoint = fleet.worker_endpoints()[fleet.worker_index()] + worker_endpoints_env = ','.join(worker_endpoints) + + node_ips = self._get_node_ips_from_endpoints(worker_endpoints) + node_ip = current_endpoint.split(":")[0].strip() + + node_num = len(node_ips) + + return node_num + def _try_to_compile(self, startup_program, main_program): + node_num = self._node_num() + assert node_num >= 1, "nccl2 node_num must >= 1, now:{}" % node_num + + self._strategy.fuse_all_reduce_ops = True + exec_strategy = self._strategy.exec_strategy + + if node_num <= 1: + if self._strategy.nccl_comm_num > 1: + logging.warn("set nccl_comm_num=1 since you only have 1 node.") + self._strategy.nccl_comm_num = 1 + + if self._strategy.use_hierarchical_allreduce: + logging.warn( + "set use_hierarchical_allreduce=False since you only have 1 node." + ) + self._strategy.use_hierarchical_allreduce = False + + sync_allreduce = os.getenv("FLAGS_sync_nccl_allreduce") + if sync_allreduce is None or sync_allreduce == "1": + exec_strategy.num_threads = self._strategy.nccl_comm_num + 1 + if self._strategy.use_hierarchical_allreduce: + exec_strategy.num_threads = 2 * self._strategy.nccl_comm_num + 1 + if exec_strategy.num_threads > 4: + logging.warn( + "if you use use_hierarchical_allreduce or " + "with multi nccl comm, please export FLAGS_sync_nccl_allreduce = 0" + ) + + if self.print_config: + print("node_num:", node_num, "num_threads:", + exec_strategy.num_threads, "use_hierarchical_allreduce:", + self._strategy.use_hierarchical_allreduce, "nccl_comm_num:", + self._strategy.nccl_comm_num, "FLAGS_sync_nccl_allreduce:", + sync_allreduce) + self._transpile(startup_program, main_program) if self._strategy.mode == "collective": diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index a13512d130d..8c230c58e32 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -239,14 +239,6 @@ class DistributedTranspiler(Fleet): self.main_program, self.startup_program = \ self._transpiler.get_pserver_programs(self.server_endpoints()[self.server_index()]) - def node_num(self): - logging.warn( - "You should not call 'node_num' method for collective mode.") - - def node_id(self): - logging.warn( - "You should not call 'node_id' method for collective mode.") - fleet = DistributedTranspiler() -- GitLab