diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index c8005dce44f779ba264f4ccaa73584c5008efe2f..b9cd73d158497eac08a1b3988783b60f0cdfb8f2 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -30,6 +30,41 @@ class Role: SERVER = 2 +class MockBarrier(object): + """ + MockBarrier is a empty impletation for barrier + mock as a real barrier for never-barrier in a specific scenario + """ + + def barrier(self): + """ + dummy barrier, do nothing + """ + pass + + def barrier_all(self): + """ + dummy all barrier, do nothing + """ + pass + + def all_reduce(self, obj): + """ + dummy all reduce, do nothing + Args: + obj(any): obj to do all reduce + """ + return obj + + def all_gather(self, obj): + """ + dummy all gather, do nothing + Args: + obj(any): obj to do all gather + """ + return [obj] + + class RoleMakerBase(object): """ RoleMakerBase is a base class for assigning a role to current process @@ -587,7 +622,10 @@ class GeneralRoleMaker(RoleMakerBase): trainers_num = len(worker_endpoints) if training_role not in ["TRAINER", "PSERVER"]: raise ValueError("TRAINING_ROLE must be PSERVER or TRAINER") - + self._is_barrier_all = 1 + if "PADDLE_IS_BARRIER_ALL_ROLE" in os.environ: + self._is_barrier_all = int(os.environ[ + "PADDLE_IS_BARRIER_ALL_ROLE"]) if training_role == "TRAINER": role = Role.WORKER current_id = int(os.environ["PADDLE_TRAINER_ID"]) @@ -608,21 +646,25 @@ class GeneralRoleMaker(RoleMakerBase): self._http_server.start() self._node_type = 1 self._cur_endpoint = worker_endpoints[current_id] - gloo = fluid.core.Gloo() - gloo.set_rank(current_id) - gloo.set_size(len(worker_endpoints)) - gloo.set_prefix(self._prefix) - gloo.set_iface(self._iface) - gloo.set_timeout_seconds(self._init_timeout_seconds, - self._run_timeout_seconds) - if len(self._http_ip_port) != 0: - gloo.set_http_store(self._http_ip_port[0], - int(self._http_ip_port[1]), "trainer") + if self._is_barrier_all: + gloo = fluid.core.Gloo() + gloo.set_rank(current_id) + gloo.set_size(len(worker_endpoints)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), + "trainer") + else: + gloo.set_hdfs_store(self._hdfs_path + "/trainer", + self._hdfs_name, self._hdfs_ugi) + gloo.init() + self._node_type_comm = gloo else: - gloo.set_hdfs_store(self._hdfs_path + "/trainer", - self._hdfs_name, self._hdfs_ugi) - gloo.init() - self._node_type_comm = gloo + self._all_comm = MockBarrier() elif training_role == "PSERVER": role = Role.SERVER if os.environ.get("PADDLE_PSERVER_ID") is not None: diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py index be1e228f4364f365ecb9f8d20918f66585b8af59..fe650ef0a2f3d07f3f301f5f031740ffb1bc13c3 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py @@ -79,6 +79,21 @@ class TestCloudRoleMaker(unittest.TestCase): print("do not support pslib test, skip") return + from paddle.fluid.incubate.fleet.base.role_maker import MockBarrier + mb = MockBarrier() + mb.barrier() + mb.barrier_all() + mb.all_reduce(1) + mb.all_gather(1) + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36005" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36005" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36006" + os.environ["PADDLE_IS_BARRIER_ALL_ROLE"] = "0" + role_maker = GeneralRoleMaker(path="test_mock1") + role_maker.generate_role() + if __name__ == "__main__": unittest.main()