diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index ae6bf82e6290c0036b4abbf2bee43e454d365228..fb674d67ebe0cafde4ee5b1fd2711d1c065a955a 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -198,7 +198,7 @@ class MPIRoleMaker(RoleMakerBase): """ finalize the current MPI instance. """ - pass + self.MPI.Finalize() def _get_ips(self): """ @@ -356,6 +356,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): print("PaddleCloudRoleMaker() endpoints: %s" % self.endpoints) self.endpoints = self.endpoints.split(",") self._server_endpoints = self.endpoints + self._worker_endpoints = self.endpoints if self.role.upper() == "PSERVER": self._current_id = self.endpoints.index(self.current_endpoint) self._role = Role.SERVER diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 3854f258be73e368f545db0307c901e10ab89e02..8c230c58e32d68f943cceb306b049ce86135c436 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -26,6 +26,7 @@ from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerCo from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Mode +from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker class DistributedTranspiler(Fleet): @@ -52,6 +53,13 @@ class DistributedTranspiler(Fleet): Returns: None """ + # if MPISymetricRoleMaker is defined + # we suppose a user wants to submit job on mpi cluster + if isinstance(self._role_maker, MPISymetricRoleMaker): + # check whether server has been initialized + from paddle.fluid.transpiler.details.checkport import wait_server_ready + wait_server_ready(fleet.server_endpoints(to_string=False)) + if not self._transpile_config.sync_mode: self._communicator = Communicator(self.main_program) @@ -114,6 +122,9 @@ class DistributedTranspiler(Fleet): self._communicator.stop() self._executor.close() + if isinstance(self._role_maker, MPISymetricRoleMaker): + self._role_maker._finalize() + def distributed_optimizer(self, optimizer, strategy=None): """ Optimizer for distributed training. @@ -199,13 +210,24 @@ class DistributedTranspiler(Fleet): self._transpile_config = config self._transpiler = OriginTranspiler(config) + print("server endpoints") + print(fleet.server_endpoints(to_string=True)) + print("worker index: %d" % fleet.worker_index()) + print("worker num: %d" % fleet.worker_num()) + if self.is_worker(): self._transpiler.transpile( trainer_id=fleet.worker_index(), pservers=fleet.server_endpoints(to_string=True), trainers=fleet.worker_num(), sync_mode=config.sync_mode) - self.main_program = self._transpiler.get_trainer_program() + + wait_port = True + if isinstance(self._role_maker, MPISymetricRoleMaker): + wait_port = False + + self.main_program = self._transpiler.get_trainer_program( + wait_port=wait_port) self.startup_program = default_startup_program() else: self._transpiler.transpile(