diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index d547800bf6c915f158f33cbc9f418d58c901cc21..ef6f35c057894a7b704dcd969f384b56155c1455 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -85,7 +85,7 @@ message DistributedStrategy { optional bool pipeline = 9 [ default = false ]; optional bool elastic = 10 [ default = false ]; optional bool auto = 11 [ default = false ]; - optional bool async = 12 [ default = true ]; + optional bool a_sync = 12 [ default = true ]; optional bool sync_nccl_allreduce = 13 [ default = true ]; optional int32 nccl_comm_num = 14 [ default = 1 ]; optional bool use_hierarchical_allreduce = 15 [ default = false ]; @@ -99,7 +99,7 @@ message DistributedStrategy { optional LocalSGDConfig localsgd_configs = 103; optional GradientMergeConfig gradient_merge_configs = 104; optional PipelineConfig pipeline_configs = 106; - optional AsyncConfig async_configs = 107; + optional AsyncConfig a_sync_configs = 107; optional BuildStrategy build_strategy = 201; optional ExecutionStrategy execution_strategy = 202; diff --git a/python/paddle/fleet/base/distributed_strategy.py b/python/paddle/fleet/base/distributed_strategy.py index 74629cef615e73f840d00656921bbe156647ff91..bd49abd998b88ec82030ee46354816815c44de92 100644 --- a/python/paddle/fleet/base/distributed_strategy.py +++ b/python/paddle/fleet/base/distributed_strategy.py @@ -201,7 +201,7 @@ class DistributedStrategy(object): f.name).extend(getattr(strategy, f.name)) @property - def async_update(self): + def a_sync(self): """ Indicating whether we are using asynchronous stocastic gradient descent updates for training. This property is valid when we are using parameter server training, @@ -216,29 +216,29 @@ class DistributedStrategy(object): fleet.init(role_maker) strategy = fleet.DistributedStrategy() - strategy.async_update = True # by default this is True + strategy.a_sync = True # by default this is True # code block for defining loss and local optimizer # sgd = fleet.distributed_optimizer(optimizer, strategy) """ - return self.strategy.async + return self.strategy.a_sync - @async_update.setter - def async_update(self, flag): + @a_sync.setter + def a_sync(self, flag): if isinstance(flag, bool): - self.strategy.async = flag + self.strategy.a_sync = flag else: - print("WARNING: async_update should have value of bool type") + print("WARNING: a_sync should have value of bool type") @property - def async_update_configs(self): + def a_sync_configs(self): """ - Set async update configurations. In general, asynchronous parameter server + Set a_sync update configurations. In general, asynchronous parameter server training has serveral configurable settings that can be configured through a dict. **Notes**: - **Detailed arguments for async_update_configs** + **Detailed arguments for a_sync_configs** **k_step**: number of local optimization updates before communication **max_merge_var_num**: maximum number of merged gradients before communication **send_queue_size**: a buffer size of worker communication @@ -255,19 +255,20 @@ class DistributedStrategy(object): fleet.init(role_maker) strategy = fleet.DistributedStrategy() - strategy.async_update = True # by default this is True + strategy.a_sync = True # by default this is True configs = {"k_step": 10000, "send_queue_size": 32} - strategy.async_update_configs = configs + strategy.a_sync_configs = configs # code block for defining loss and local optimizer # sgd = fleet.distributed_optimizer(optimizer, strategy) """ - return get_msg_dict(self.strategy.async_configs) + return get_msg_dict(self.strategy.a_sync_configs) - @async_update_configs.setter - def async_update_configs(self, configs): - check_configs_key(self.strategy.async_configs, configs, "async_configs") - assign_configs_value(self.strategy.async_configs, configs) + @a_sync_configs.setter + def a_sync_configs(self, configs): + check_configs_key(self.strategy.a_sync_configs, configs, + "a_sync_configs") + assign_configs_value(self.strategy.a_sync_configs, configs) @property def amp(self): @@ -584,4 +585,7 @@ class DistributedStrategy(object): print("WARNING: auto should have value of bool type") def __repr__(self): + fields = self.strategy.DESCRIPTOR.fields + for f in fields: + print("{}: {}".format(f.name, f.default_value)) return str(self.strategy) diff --git a/python/paddle/fleet/base/fleet_base.py b/python/paddle/fleet/base/fleet_base.py index 13b9fc3220a0911415b6abc6f4cf91298038e298..8257f6a9db3bdeedf6047aa04beba0918b7f1826 100644 --- a/python/paddle/fleet/base/fleet_base.py +++ b/python/paddle/fleet/base/fleet_base.py @@ -228,6 +228,7 @@ class Fleet(object): """ self.user_defined_optimizer = optimizer self.user_defined_strategy = strategy + self.valid_strategy = None return self def minimize(self, @@ -292,8 +293,10 @@ class Fleet(object): distributed_optimizer_list = \ MetaOptimizerFactory()._get_valid_meta_optimizers( self.user_defined_optimizer) + valid_optimizer_list = [] valid_graph_optimizer_list = [] + can_not_apply_optimizer_list = [] # recall meta optimizers for ranking for opt in distributed_optimizer_list: opt._set_basic_info(loss, self._role_maker, @@ -301,15 +304,21 @@ class Fleet(object): self.user_defined_strategy) if opt._can_apply() and not opt._is_graph_out(): valid_optimizer_list.append(opt) - if opt._can_apply() and opt._is_graph_out(): + elif opt._can_apply() and opt._is_graph_out(): valid_graph_optimizer_list.append(opt) + else: + can_not_apply_optimizer_list.append(opt) # combine recalled meta optimizers to be a valid meta optimizer - meta_optimizer, graph_optimizer, final_dist_strategy = \ + meta_optimizer, graph_optimizer = \ self.strategy_compiler.generate_optimizer( loss, self._role_maker, self.user_defined_optimizer, self.user_defined_strategy, valid_optimizer_list, valid_graph_optimizer_list) + valid_strategy = self.strategy_compiler._get_valid_strategy( + self.user_defined_strategy, can_not_apply_optimizer_list) + self.valid_strategy = valid_strategy + optimize_ops = [] params_grads = [] if meta_optimizer: @@ -332,12 +341,10 @@ class Fleet(object): if self._runtime_handle is None: self._runtime_handle = RuntimeFactory()._create_runtime( - final_dist_strategy, self._role_maker, optimize_ops, - params_grads) + valid_strategy, self._role_maker, optimize_ops, params_grads) if self._util is None: - self._util = UtilFactory()._create_util(final_dist_strategy, - self._role_maker, - optimize_ops, params_grads) + self._util = UtilFactory()._create_util( + valid_strategy, self._role_maker, optimize_ops, params_grads) return optimize_ops, params_grads diff --git a/python/paddle/fleet/base/strategy_compiler.py b/python/paddle/fleet/base/strategy_compiler.py index 92b50781f65ba928a47f5d4c0ecda2d739739c56..f0e23713e4f3f98217280f2cbe071bf1e23c823e 100644 --- a/python/paddle/fleet/base/strategy_compiler.py +++ b/python/paddle/fleet/base/strategy_compiler.py @@ -30,7 +30,7 @@ def maximum_path_len_algo(optimizer_list): return None for idx, opt in enumerate(candidates[max_idx][:-1]): opt._update_inner_optimizer(candidates[max_idx][idx + 1]) - return candidates[max_idx][0] + return candidates[max_idx] class StrategyCompilerBase(object): @@ -51,19 +51,55 @@ class StrategyCompiler(StrategyCompilerBase): def __init__(self): super(StrategyCompiler, self).__init__() + self._meta_optimizer = None + self._graph_optimizer = None + self._valid_optimizer_list = None + self._user_defined_strategy = None + self._meta_optimizer_candidates = [] + self._graph_optimizer_candidates = [] + + def _get_valid_strategy(self, dist_strategy, can_not_apply_optimizer_list): + import copy + valid_strategy = copy.copy(dist_strategy) + invalid_optimizers = [] + for candidate in self._meta_optimizer_candidates: + is_valid = False + for valid in self._meta_optimizers: + if candidate.__class__.__name__ == valid.__class__.__name__: + is_valid = True + break + if not is_valid: + invalid_optimizers.append(candidate) + for opt in invalid_optimizers: + opt._disable_strategy(valid_strategy) + for opt in can_not_apply_optimizer_list: + opt._disable_strategy(valid_strategy) + return valid_strategy def generate_optimizer(self, loss, role_maker, optimizer, - userd_defined_strategy, meta_optimizer_list, + user_defined_strategy, meta_optimizer_list, graph_optimizer_list): + self._user_defined_strategy = user_defined_strategy + self._meta_optimizer_candidates = meta_optimizer_list + self._graph_optimizer_candidates = graph_optimizer_list + if len(meta_optimizer_list) == 0 and len(graph_optimizer_list) == 0: return optimizer, None else: # currently, we use heuristic algorithm to select # meta optimizers combinations - meta_optimizer = maximum_path_len_algo(meta_optimizer_list) - graph_optimizer = maximum_path_len_algo(graph_optimizer_list) + meta_optimizers = maximum_path_len_algo(meta_optimizer_list) + graph_optimizers = maximum_path_len_algo(graph_optimizer_list) # should design a distributed strategy update interface # when we have finally decided the combination of meta_optimizer # and graph_optimizer, the corresponding distributed strategy # should be updated. - return meta_optimizer, graph_optimizer, None + + self._meta_optimizers = meta_optimizers + self._graph_optimizers = graph_optimizers + + return_meta = None if meta_optimizers == None else meta_optimizers[ + 0] + return_graph = None if graph_optimizers == None else graph_optimizers[ + 0] + return return_meta, return_graph diff --git a/python/paddle/fleet/meta_optimizers/graph_execution_optimizer.py b/python/paddle/fleet/meta_optimizers/graph_execution_optimizer.py index 2991f80aa538dae8276f2e81e359bf95390ee0c1..7a943b6531fd2d8a1802e6552a7f255297e51765 100644 --- a/python/paddle/fleet/meta_optimizers/graph_execution_optimizer.py +++ b/python/paddle/fleet/meta_optimizers/graph_execution_optimizer.py @@ -16,6 +16,7 @@ from paddle.fluid.framework import core from paddle.fluid import compiler from .meta_optimizer_base import MetaOptimizerBase from ..base.private_helper_function import wait_server_ready +import logging class GraphExecutionOptimizer(MetaOptimizerBase): @@ -32,6 +33,10 @@ class GraphExecutionOptimizer(MetaOptimizerBase): """ Basically, this is PE, and almost all programs can be executed here """ + if not self.role_maker._is_collective: + # update me. currently, if parameter server is used + # graph execution optimizer can not be applied + return False return True def backward(self, @@ -178,6 +183,10 @@ class GraphExecutionOptimizer(MetaOptimizerBase): return self._compiled_program + def _disable_strategy(self, dist_strategy): + # TODO(guru4elephant): should close all PE related flags here + pass + def minimize(self, loss, startup_program=None, diff --git a/python/paddle/fleet/meta_optimizers/meta_optimizer_base.py b/python/paddle/fleet/meta_optimizers/meta_optimizer_base.py index 33b7b2bb1e85269d1d91bbc50f996f3e56a84435..1a3cfda94b98c9514208433dfcf5947caea8537c 100644 --- a/python/paddle/fleet/meta_optimizers/meta_optimizer_base.py +++ b/python/paddle/fleet/meta_optimizers/meta_optimizer_base.py @@ -39,6 +39,9 @@ class MetaOptimizerBase(object): if str(optimizer.__class__.__name__) in self.meta_optimizers_white_list: return True + def _disable_strategy(self, dist_strategy): + raise NotImplementedError("you should implement disable strategy") + def minimize_impl(self, loss, startup_program=None, diff --git a/python/paddle/fleet/meta_optimizers/recompute_optimizer.py b/python/paddle/fleet/meta_optimizers/recompute_optimizer.py index 902b8367b34f65e95a3501d5586db056ef5a55f3..73119d81094ac611c0d3545b59342b5dbd8b5d16 100644 --- a/python/paddle/fleet/meta_optimizers/recompute_optimizer.py +++ b/python/paddle/fleet/meta_optimizers/recompute_optimizer.py @@ -34,11 +34,16 @@ class RecomputeOptimizer(MetaOptimizerBase): def _can_apply(self): if self.user_defined_strategy.recompute == True: - if len(self.user_defined_strategy.recompute_checkpoints) == 0: + if len(self.user_defined_strategy.recompute_configs[ + "checkpoints"]) == 0: return False else: return True + def _disable_strategy(self, dist_strategy): + dist_strategy.recompute = False + dist_strategy.recompute_configs = {"checkpoints": []} + def backward(self, loss, startup_program=None, diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py index 890d716ff0cb8fc7804dd838cc2e2fe52073dd3d..a2bb2ad107d27f883021a3f3e4153cf27c2627cb 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -178,20 +178,20 @@ class TestStrategyConfig(unittest.TestCase): strategy.lamb = "True" self.assertEqual(strategy.lamb, False) - def test_async_update(self): + def test_a_sync(self): strategy = paddle.fleet.DistributedStrategy() - strategy.async_update = True - self.assertEqual(strategy.async_update, True) - strategy.async_update = False - self.assertEqual(strategy.async_update, False) - strategy.async_update = "True" - self.assertEqual(strategy.async_update, False) - - def test_async_configs(self): + strategy.a_sync = True + self.assertEqual(strategy.a_sync, True) + strategy.a_sync = False + self.assertEqual(strategy.a_sync, False) + strategy.a_sync = "True" + self.assertEqual(strategy.a_sync, False) + + def test_a_sync_configs(self): strategy = paddle.fleet.DistributedStrategy() configs = {"k_steps": 1000} - strategy.async_update_configs = configs - self.assertEqual(strategy.async_update_configs["k_steps"], 1000) + strategy.a_sync_configs = configs + self.assertEqual(strategy.a_sync_configs["k_steps"], 1000) def test_elastic(self): strategy = paddle.fleet.DistributedStrategy() @@ -213,7 +213,7 @@ class TestStrategyConfig(unittest.TestCase): def test_strategy_prototxt(self): strategy = paddle.fleet.DistributedStrategy() - strategy.async_update = True + strategy.a_sync = True strategy.localsgd = True strategy.dgc = True localsgd_configs = {"k_steps": 5} diff --git a/python/paddle/fluid/tests/unittests/test_fleet_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_meta_optimizer.py index 9cb300f83d9c4ced68f69f06170a32f82f039d8c..2b2082c4ee459d30e83447a963042d2c716f9f0d 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_meta_optimizer.py @@ -25,6 +25,27 @@ class TestFleetMetaOptimizer(unittest.TestCase): os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ "127.0.0.1:36001,127.0.0.2:36001" + def test_graph_execution_optimizer_not_apply(self): + import paddle.fleet as fleet + import paddle.fluid.incubate.fleet.base.role_maker as role_maker + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.fleet.DistributedStrategy() + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + def test_graph_execution_optimizer(self): import paddle.fleet as fleet import paddle.fluid.incubate.fleet.base.role_maker as role_maker @@ -42,7 +63,7 @@ class TestFleetMetaOptimizer(unittest.TestCase): avg_cost = paddle.fluid.layers.mean(x=cost) strategy = paddle.fleet.DistributedStrategy() - + strategy.nccl_comm_num = 2 optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) @@ -65,7 +86,7 @@ class TestFleetMetaOptimizer(unittest.TestCase): strategy = paddle.fleet.DistributedStrategy() strategy.recompute = True - strategy.recompute_checkpoints = [fc_2] + strategy.recompute_configs = {"checkpoints": ["fc2"]} optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)