From 357311fdb70286fc4ad2f065fddf444695ea2a63 Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Tue, 2 Jul 2019 12:54:21 +0800 Subject: [PATCH] make fleet support mpi job submit directly (#18441) make fleet support mpi job submit directly. --- .../fluid/incubate/fleet/base/role_maker.py | 3 ++- .../distribute_transpiler/__init__.py | 24 ++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index ae6bf82e629..fb674d67ebe 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -198,7 +198,7 @@ class MPIRoleMaker(RoleMakerBase): """ finalize the current MPI instance. """ - pass + self.MPI.Finalize() def _get_ips(self): """ @@ -356,6 +356,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): print("PaddleCloudRoleMaker() endpoints: %s" % self.endpoints) self.endpoints = self.endpoints.split(",") self._server_endpoints = self.endpoints + self._worker_endpoints = self.endpoints if self.role.upper() == "PSERVER": self._current_id = self.endpoints.index(self.current_endpoint) self._role = Role.SERVER diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 3854f258be7..8c230c58e32 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -26,6 +26,7 @@ from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerCo from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Mode +from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker class DistributedTranspiler(Fleet): @@ -52,6 +53,13 @@ class DistributedTranspiler(Fleet): Returns: None """ + # if MPISymetricRoleMaker is defined + # we suppose a user wants to submit job on mpi cluster + if isinstance(self._role_maker, MPISymetricRoleMaker): + # check whether server has been initialized + from paddle.fluid.transpiler.details.checkport import wait_server_ready + wait_server_ready(fleet.server_endpoints(to_string=False)) + if not self._transpile_config.sync_mode: self._communicator = Communicator(self.main_program) @@ -114,6 +122,9 @@ class DistributedTranspiler(Fleet): self._communicator.stop() self._executor.close() + if isinstance(self._role_maker, MPISymetricRoleMaker): + self._role_maker._finalize() + def distributed_optimizer(self, optimizer, strategy=None): """ Optimizer for distributed training. @@ -199,13 +210,24 @@ class DistributedTranspiler(Fleet): self._transpile_config = config self._transpiler = OriginTranspiler(config) + print("server endpoints") + print(fleet.server_endpoints(to_string=True)) + print("worker index: %d" % fleet.worker_index()) + print("worker num: %d" % fleet.worker_num()) + if self.is_worker(): self._transpiler.transpile( trainer_id=fleet.worker_index(), pservers=fleet.server_endpoints(to_string=True), trainers=fleet.worker_num(), sync_mode=config.sync_mode) - self.main_program = self._transpiler.get_trainer_program() + + wait_port = True + if isinstance(self._role_maker, MPISymetricRoleMaker): + wait_port = False + + self.main_program = self._transpiler.get_trainer_program( + wait_port=wait_port) self.startup_program = default_startup_program() else: self._transpiler.transpile( -- GitLab