From cd48bdad31c73d632823eb9cc9f8f2460bfbea60 Mon Sep 17 00:00:00 2001 From: mapingshuo Date: Tue, 18 Aug 2020 09:46:26 +0800 Subject: [PATCH] add feature to fleet2.0 role_maker, distribute_strategy, test=develop (#26267) * add feature to fleet2.0 role_maker, distribute_strategy, test=develop --- .../fleet/base/distributed_strategy.py | 9 +++++ .../distributed/fleet/base/role_maker.py | 24 ++++++++++++ .../graph_execution_optimizer.py | 14 +++++-- .../meta_optimizers/recompute_optimizer.py | 3 +- .../test_fleet_amp_meta_optimizer.py | 4 +- .../fluid/tests/unittests/test_fleet_base.py | 38 +------------------ .../test_fleet_dgc_meta_optimizer.py | 2 +- .../test_fleet_distributed_strategy.py | 5 +++ ...est_fleet_gradient_merge_meta_optimizer.py | 2 +- .../unittests/test_fleet_graph_executor.py | 4 +- .../test_fleet_lamb_meta_optimizer.py | 2 +- .../test_fleet_lars_meta_optimizer.py | 2 +- .../test_fleet_localsgd_meta_optimizer.py | 2 +- .../test_fleet_pipeline_meta_optimizer.py | 2 +- .../test_fleet_recompute_meta_optimizer.py | 4 +- .../unittests/test_fleet_rolemaker_new.py | 3 ++ 16 files changed, 68 insertions(+), 52 deletions(-) diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 31bfd482766..5531160d7c5 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -81,6 +81,8 @@ class DistributedJobInfo(object): class DistributedStrategy(object): + __lock_attr = False + def __init__(self): """ DistributedStrategy is the main configuration entry for distributed training of Paddle. @@ -95,6 +97,13 @@ class DistributedStrategy(object): """ self.strategy = distributed_strategy_pb2.DistributedStrategy() + self.__lock_attr = True + + def __setattr__(self, key, value): + if self.__lock_attr and not hasattr(self, key): + raise TypeError("%s is not a attribute of %s" % + (key, self.__class__.__name__)) + object.__setattr__(self, key, value) def save_to_prototxt(self, output): """ diff --git a/python/paddle/distributed/fleet/base/role_maker.py b/python/paddle/distributed/fleet/base/role_maker.py index 0cf909c98c0..856290a0504 100644 --- a/python/paddle/distributed/fleet/base/role_maker.py +++ b/python/paddle/distributed/fleet/base/role_maker.py @@ -110,6 +110,14 @@ class RoleMakerBase(object): """ raise NotImplementedError("Please implement this method in child class") + def node_num(self): + """ + Get the training node number + Returns: + int: node num + """ + raise NotImplementedError("Please implement this method in child class") + def get_trainer_endpoints(self): """ return trainer endpoints @@ -286,6 +294,14 @@ class PaddleCloudRoleMaker(RoleMakerBase): self.generate_role() return self._trainers_num + def node_num(self): + """ + return the training node number + """ + if not self._role_is_generated: + self.generate_role() + return self._node_num + def get_trainer_endpoints(self): """ get endpoint of all trainers @@ -353,6 +369,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._trainers_num = trainers_num self._role = role self._current_id = current_id + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def _collective_env(self): self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) @@ -363,6 +381,8 @@ class PaddleCloudRoleMaker(RoleMakerBase): assert self._worker_endpoints is not None, "can't find PADDLE_TRAINER_ENDPOINTS" self._worker_endpoints = self._worker_endpoints.split(",") self._trainers_num = len(self._worker_endpoints) + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def _init_gloo_env(self): def init_gloo_instance(role="trainer"): @@ -513,12 +533,16 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): self._cur_endpoint = self._worker_endpoints[self._current_id] elif self._role == Role.SERVER: self._cur_endpoint = self._server_endpoints[self._current_id] + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def _user_defined_collective_env(self): self._worker_endpoints = self._kwargs.get("worker_endpoints") self._current_id = self._kwargs.get("current_id") self._trainers_num = len(self._worker_endpoints) self._training_role = Role.Worker + self._node_num = len( + set([x.split(':')[0] for x in self._worker_endpoints])) def generate_role(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py index 78478b9691b..b9ff31a068e 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/graph_execution_optimizer.py @@ -119,18 +119,26 @@ class GraphExecutionOptimizer(MetaOptimizerBase): local_build_strategy.nccl_comm_num = \ dist_strategy.nccl_comm_num + if self.user_defined_strategy.recompute == True: + logging.warn( + "set enable_sequential_execution=True since you have enable the recompute strategy" + ) + local_build_strategy.enable_sequential_execution = True + exe_strategy = self.user_defined_strategy.execution_strategy - node_num = self.role_maker.worker_num() + worker_num = self.role_maker.worker_num() + node_num = self.role_maker.node_num() if self.role_maker._is_collective: - assert node_num >= 1, "nccl2 node_num must >= 1, now:{}" % node_num + assert worker_num >= 1, "nccl2 worker_num must >= 1, now:{}" % worker_num - if node_num <= 1: + if worker_num <= 1: # local mode if local_build_strategy.nccl_comm_num > 1: logging.warn("set nccl_comm_num=1 since you only have 1 node.") local_build_strategy.nccl_comm_num = 1 + if node_num <= 1: if local_build_strategy.use_hierarchical_allreduce: logging.warn( "set hierachical_allreduce=False since you only have 1 node." diff --git a/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py index 96247474927..07b69f19e7e 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/recompute_optimizer.py @@ -30,7 +30,8 @@ class RecomputeOptimizer(MetaOptimizerBase): user_defined_strategy): super(RecomputeOptimizer, self)._set_basic_info( loss, role_maker, user_defined_optimizer, user_defined_strategy) - self.wrapped_opt._set_checkpoints([]) + self.wrapped_opt._set_checkpoints( + list(user_defined_strategy.recompute_configs["checkpoints"])) def _can_apply(self): if self.user_defined_strategy.recompute == True: diff --git a/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py index 0e19069d5c0..22a1434ae25 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_amp_meta_optimizer.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker import unittest import paddle import os @@ -23,8 +25,6 @@ class TestFleetAMPOptimizer(unittest.TestCase): os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" def test_amp_optimizer(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) input_x = paddle.fluid.layers.data( diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index 3a79b694cad..ca657a5a619 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -14,6 +14,8 @@ import unittest import paddle +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker import os @@ -26,67 +28,49 @@ class TestFleetBase(unittest.TestCase): "127.0.0.1:36001,127.0.0.2:36001" def test_init(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) def test_is_first_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_first_worker(): print("test fleet first worker done.") def test_worker_index(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) print(fleet.worker_index()) def test_worker_num(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) print(fleet.worker_num()) def test_is_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): print("test fleet is worker") def test_worker_endpoints(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) print(fleet.worker_endpoints(to_string=True)) def test_server_num(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): print("fleet server num: {}".format(fleet.server_num())) def test_server_index(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): print("fleet server index: {}".format(fleet.server_index())) def test_server_endpoints(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): @@ -94,55 +78,41 @@ class TestFleetBase(unittest.TestCase): fleet.server_endpoints(to_string=True))) def test_is_server(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_server(): print("test fleet is server") def test_util(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) self.assertEqual(fleet.util, None) def test_barrier_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.barrier_worker() def test_init_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.init_worker() def test_run_server(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.run_worker() def test_stop_worker(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) if fleet.is_worker(): fleet.stop_worker() def test_distributed_optimizer(self): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) @@ -150,10 +120,6 @@ class TestFleetBase(unittest.TestCase): optimizer = fleet.distributed_optimizer(optimizer) def test_minimize(self): - import paddle - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker - input_x = paddle.fluid.layers.data( name="x", shape=[32], dtype='float32') input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') diff --git a/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py index 1d211a77008..b43687ce1cd 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_dgc_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle from paddle import fluid import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetDGCOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py index 45dd461237b..40e0168e1ac 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -289,6 +289,11 @@ class TestStrategyConfig(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.execution_strategy = exe_strategy + def test_unknown_strategy(self): + strategy = paddle.distributed.fleet.DistributedStrategy() + with self.assertRaises(TypeError): + strategy.unknown_key = 'UNK' + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py index 49ce09877f0..581f8becbbf 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_gradient_merge_meta_optimizer.py @@ -16,7 +16,7 @@ import unittest import paddle import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py b/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py index d2e0112ba29..4d92c6f7054 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_graph_executor.py @@ -14,6 +14,8 @@ import unittest import paddle +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker import os from launch_function_helper import launch_func @@ -39,8 +41,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): } def node_func(): - import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) input_x = paddle.fluid.layers.data( diff --git a/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py index 8ad051924f2..134aea363b5 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_lamb_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle from paddle import fluid import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetLambMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py index 87c4823693e..b15db0b12d0 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_lars_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle from paddle import fluid import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetLarsMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py index f4bb8704849..86098d42b82 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_localsgd_meta_optimizer.py @@ -17,7 +17,7 @@ import paddle import os import paddle.distributed.fleet as fleet -import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import paddle.distributed.fleet.base.role_maker as role_maker class TestFleetLocalSGDMetaOptimizer(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py index d35f2fe5e62..ca969bc4032 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py @@ -25,7 +25,7 @@ class TestFleetMetaOptimizer(unittest.TestCase): def test_pipeline_optimizer(self): import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker + import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) with paddle.fluid.device_guard("cpu"): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py index f07c6421192..95e1c3a3602 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_recompute_meta_optimizer.py @@ -27,7 +27,7 @@ class TestFleetRecomputeMetaOptimizer(unittest.TestCase): def test_recompute_optimizer(self): import paddle.distributed.fleet as fleet - import paddle.fluid.incubate.fleet.base.role_maker as role_maker + import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) input_x = paddle.fluid.layers.data( @@ -43,7 +43,7 @@ class TestFleetRecomputeMetaOptimizer(unittest.TestCase): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.recompute = True - strategy.recompute_configs = {"checkpoints": ["fc2"]} + strategy.recompute_configs = {"checkpoints": ["fc_1.tmp_0"]} optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py index f80d45ed5e0..cf9b3e1e9a1 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py @@ -34,6 +34,7 @@ class TestRoleMakerBase(unittest.TestCase): self.assertRaises(Exception, role.worker_index) self.assertRaises(Exception, role.server_index) self.assertRaises(Exception, role.role_id) + self.assertRaises(Exception, role.node_num) trainer_endpoints = role.get_trainer_endpoints() self.assertTrue(len(trainer_endpoints) == 0) @@ -80,10 +81,12 @@ class TestCloudRoleMaker(unittest.TestCase): worker_endpoints = ro.get_trainer_endpoints() self.assertEqual(worker_endpoints[0], '127.0.0.1:36001') self.assertEqual(ro.role_id(), 0) + self.assertEqual(ro.node_num(), 2) def test_tr_rolemaker_collective(self): ro = role_maker.PaddleCloudRoleMaker(is_collective=True) self.assertEqual(ro.worker_num(), 2) + self.assertEqual(ro.node_num(), 2) def test_ps_rolemaker(self): """Test ps rolemaker.""" -- GitLab