diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 31bfd482766cb97d3c373c49774640ba8d7ba487..5531160d7c5032dfa434247ecdc737a56303f5f2 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -81,6 +81,8 @@ class DistributedJobInfo(object): class DistributedStrategy(object): + __lock_attr = False + def __init__(self): """ DistributedStrategy is the main configuration entry for distributed training of Paddle. @@ -95,6 +97,13 @@ class DistributedStrategy(object): """ self.strategy = distributed_strategy_pb2.DistributedStrategy() + self.__lock_attr = True + + def __setattr__(self, key, value): + if self.__lock_attr and not hasattr(self, key): + raise TypeError("%s is not a attribute of %s" % + (key, self.__class__.__name__)) + object.__setattr__(self, key, value) def save_to_prototxt(self, output): """ diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index 0cf909c98c057e330195135bf1b3b5b90facd2ca..856290a05046d310d71ee5aecb2ecb62faf4ddfe 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -110,6 +110,14 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") + def node_num(self): + """ + Get the training node number + Returns: + int: node num + """ + raise NotImplementedError("Please implement this method in child class") + def get_trainer_endpoints(self): """ return trainer endpoints @@ -286,6 +294,14 @@ class PaddleCloudRoleMaker(RoleMakerBase): self.generate_role() return self._trainers_num + def node_num(self): + """ + return the training node number + """ + if not self._role_is_generated: + self.generate_role() + return self._node_num + def get_trainer_endpoints(self): """ get endpoint of all trainers @@ -353,6 +369,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._trainers_num = trainers_num self._role = role self._current_id = current_id + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def _collective_env(self): self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) @@ -363,6 +381,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): assert self._worker_endpoints is not None, "can't find PADDLE_TRAINER_ENDPOINTS" self._worker_endpoints = self._worker_endpoints.split(",") self._trainers_num = len(self._worker_endpoints) + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def _init_gloo_env(self): def init_gloo_instance(role="trainer"): @@ -513,12 +533,16 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): self._cur_endpoint = self._worker_endpoints[self._current_id] elif self._role == Role.SERVER: self._cur_endpoint = self._server_endpoints[self._current_id] + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def _user_defined_collective_env(self): self._worker_endpoints = self._kwargs.get("worker_endpoints") self._current_id = self._kwargs.get("current_id") self._trainers_num = len(self._worker_endpoints) self._training_role = Role.Worker + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def generate_role(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py index 78478b9691b2174612669a8dca3fc749f8d8a7b3..b9ff31a068e7f397b04c51fab53ae134a78b3bb5 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py @@ -119,18 +119,26 @@ class GraphExecutionOptimizer(MetaOptimizerBase): local_build_strategy.nccl_comm_num = \ dist_strategy.nccl_comm_num + if self.user_defined_strategy.recompute == True: + logging.warn( + "set enable_sequential_execution=True since you have enable the recompute strategy" + ) + local_build_strategy.enable_sequential_execution = True + exe_strategy = self.user_defined_strategy.execution_strategy - node_num = self.role_maker.worker_num() + worker_num = self.role_maker.worker_num() + node_num = self.role_maker.node_num() if self.role_maker._is_collective: - assert node_num >= 1, "nccl2 node_num must >= 1, now:{}" % node_num + assert worker_num >= 1, "nccl2 worker_num must >= 1, now:{}" % worker_num - if node_num <= 1: + if worker_num <= 1: # local mode if local_build_strategy.nccl_comm_num > 1: logging.warn("set nccl_comm_num=1 since you only have 1 node.") local_build_strategy.nccl_comm_num = 1 + if node_num <= 1: if local_build_strategy.use_hierarchical_allreduce: logging.warn( "set hierachical_allreduce=False since you only have 1 node." diff --git a/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py index 96247474927b99b197adf015278c9fbddcfea15b..07b69f19e7ebddbc2a595504371155d4ea2eea7b 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py @@ -30,7 +30,8 @@ class RecomputeOptimizer(MetaOptimizerBase): user_defined_strategy): super(RecomputeOptimizer, self)._set_basic_info( loss, role_maker, user_defined_optimizer, user_defined_strategy) - self.wrapped_opt._set_checkpoints([]) + self.wrapped_opt._set_checkpoints( + list(user_defined_strategy.recompute_configs["checkpoints"])) def _can_apply(self): if self.user_defined_strategy.recompute == True: diff --git a/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py index 0e19069d5c04e7e6bf05be8f3f48a7ce395a0a57..22a1434ae251a920182226e20758ba3c925b0a75 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker import unittest import paddle import os @@ -23,8 +25,6 @@ class TestFleetAMPOptimizer(unittest.TestCase): os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" def test_amp_optimizer(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) input_x = paddle.fluid.layers.data( diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index 3a79b694cad5b0cb3fe0a08b6a18506510eead5b..ca657a5a619b66b31fcb6dc64b48040ed9f549ae 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -14,6 +14,8 @@ import unittest import paddle +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker import os @@ -26,67 +28,49 @@ class TestFleetBase(unittest.TestCase): "127.0.0.1:36001,127.0.0.2:36001" def test_init(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) def test_is_first_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_first_worker(): print("test fleet first worker done.") def test_worker_index(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) print(fleet.worker_index()) def test_worker_num(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) print(fleet.worker_num()) def test_is_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): print("test fleet is worker") def test_worker_endpoints(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) print(fleet.worker_endpoints(to_string=True)) def test_server_num(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): print("fleet server num: {}".format(fleet.server_num())) def test_server_index(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): print("fleet server index: {}".format(fleet.server_index())) def test_server_endpoints(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): @@ -94,55 +78,41 @@ class TestFleetBase(unittest.TestCase): fleet.server_endpoints(to_string=True))) def test_is_server(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): print("test fleet is server") def test_util(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) self.assertEqual(fleet.util, None) def test_barrier_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.barrier_worker() def test_init_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.init_worker() def test_run_server(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.run_worker() def test_stop_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.stop_worker() def test_distributed_optimizer(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) @@ -150,10 +120,6 @@ class TestFleetBase(unittest.TestCase): optimizer = fleet.distributed_optimizer(optimizer) def test_minimize(self): - import paddle - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker - input_x = paddle.fluid.layers.data( name="x", shape=[32], dtype='float32') input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') diff --git a/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py index 1d211a77008b47fadd5908e9d8382bdaaaf77eb9..b43687ce1cdabaf8918bed94173325a9d7cfe6e3 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle from paddle import fluid import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetDGCOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py index 45dd461237ba5ba47d2b39e6b53279372d0723cb..40e0168e1ac93dfd93a99c19eced05756a49471f 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -289,6 +289,11 @@ class TestStrategyConfig(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.execution_strategy = exe_strategy + def test_unknown_strategy(self): + strategy = paddle.distributed.fleet.DistributedStrategy() + with self.assertRaises(TypeError): + strategy.unknown_key = 'UNK' + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py index 49ce09877f0a0fb91d398eb6eba57ada323f96a0..581f8becbbff1945603330221014ca4ce6d51a86 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py @@ -16,7 +16,7 @@ import unittest import paddle import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py b/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py index d2e0112ba298cf7d8267c2194b44213c150f41e4..4d92c6f70541d18a02c4bd179bb17e36117cffcb 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py @@ -14,6 +14,8 @@ import unittest import paddle +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker import os from launch_function_helper import launch_func @@ -39,8 +41,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): } def node_func(): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) input_x = paddle.fluid.layers.data( diff --git a/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py index 8ad051924f2740dceaa0b8d5b9f66ba0dd743f36..134aea363b55e1c2d0730fee083f5ef00185af61 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle from paddle import fluid import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetLambMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py index 87c4823693e2e3f9cc759ac22029c8c12841d35d..b15db0b12d0011b4fef663bd41aa250e2ada1539 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle from paddle import fluid import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetLarsMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py index f4bb87048494974fd4cf855573d47c5f9dabb4d9..86098d42b823b7e528846e3186dcd0d345bdb018 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetLocalSGDMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py index d35f2fe5e62884304fa1dfbbc7fbede234ef84b4..ca969bc4032b1dedc578f3b5d4ff13491e276e57 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py @@ -25,7 +25,7 @@ class TestFleetMetaOptimizer(unittest.TestCase): def test_pipeline_optimizer(self): import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker + import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) with paddle.fluid.device_guard("cpu"): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py index f07c6421192a0f873d477048589b3b712f2e59e7..95e1c3a36025707284edd8fc99b216d3e117341d 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py @@ -27,7 +27,7 @@ class TestFleetRecomputeMetaOptimizer(unittest.TestCase): def test_recompute_optimizer(self): import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker + import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) input_x = paddle.fluid.layers.data( @@ -43,7 +43,7 @@ class TestFleetRecomputeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.recompute = True - strategy.recompute_configs = {"checkpoints": ["fc2"]} + strategy.recompute_configs = {"checkpoints": ["fc_1.tmp_0"]} optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py index f80d45ed5e09d026f7a971ad328f656baeb37a66..cf9b3e1e9a1605a714b47d99183511b24c903722 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py @@ -34,6 +34,7 @@ class TestRoleMakerBase(unittest.TestCase): self.assertRaises(Exception, role.worker_index) self.assertRaises(Exception, role.server_index) self.assertRaises(Exception, role.role_id) + self.assertRaises(Exception, role.node_num) trainer_endpoints = role.get_trainer_endpoints() self.assertTrue(len(trainer_endpoints) == 0) @@ -80,10 +81,12 @@ class TestCloudRoleMaker(unittest.TestCase): worker_endpoints = ro.get_trainer_endpoints() self.assertEqual(worker_endpoints[0], '127.0.0.1:36001') self.assertEqual(ro.role_id(), 0) + self.assertEqual(ro.node_num(), 2) def test_tr_rolemaker_collective(self): ro = role_maker.PaddleCloudRoleMaker(is_collective=True) self.assertEqual(ro.worker_num(), 2) + self.assertEqual(ro.node_num(), 2) def test_ps_rolemaker(self): """Test ps rolemaker."""