diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 50b7d62547bb3acda37b2ac56fc3396d6be17b1e..9f3af174f607792eb416a6648cc1ff76818c2ecd 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -135,6 +135,7 @@ message DistributedStrategy { optional bool adaptive_localsgd = 24 [ default = false ]; optional bool fp16_allreduce = 25 [ default = false ]; optional bool sharding = 26 [ default = false ]; + optional float last_comm_group_size_MB = 27 [ default = 1 ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index c94b77dd8c6428909a4a2b62cc19659a8635dcff..98b6bc0cc89f1d79c9bae88068bdd3d484fcd659 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -18,6 +18,7 @@ from paddle.fluid.framework import Variable, set_flags, core from paddle.fluid.wrapped_decorator import wrap_decorator import google.protobuf.text_format import google.protobuf +from paddle.fluid.framework import dygraph_only __all__ = ["DistributedStrategy"] @@ -555,6 +556,32 @@ class DistributedStrategy(object): else: print("WARNING: fuse_grad_size_in_MB should have value of int type") + @property + def last_comm_group_size_MB(self): + """ + Specifying the size of gradient to fuse in Mega-Bytes when + the last group of each batch communicates. Making the last group + small is useful to improve performance. + + Default value: 1 + + Examples: + .. code-block:: python + + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + strategy.last_comm_group_size_MB = 2 + """ + return self.strategy.last_comm_group_size_MB + + @last_comm_group_size_MB.setter + @is_strict_auto + def last_comm_group_size_MB(self, value): + if value > 0: + self.strategy.last_comm_group_size_MB = value + else: + raise ValueError("last_comm_group_size_MB should be greater than 0") + @property def _fuse_grad_size_in_TFLOPS(self): return self.strategy.fuse_grad_size_in_TFLOPS diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index c5be6a7a8bb142ef4fe25a906785560f6b22e93c..5a09e0be98ce8fdef950e6ab1306658ee0dee1de 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -92,12 +92,11 @@ class Fleet(object): import paddle paddle.enable_static() import paddle.distributed.fleet as fleet - - fleet.init() - strategy = fleet.DistributedStrategy() + fleet.init(strategy) + optimizer = paddle.optimizer.SGD(learning_rate=0.001) - optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer = fleet.distributed_optimizer(optimizer) if fleet.is_first_worker(): print("this is first worker") @@ -127,7 +126,7 @@ class Fleet(object): self._util = None self._context = {} - def init(self, role_maker=None, is_collective=False): + def init(self, role_maker=None, is_collective=False, strategy=None): """ Initialize role_maker in Fleet. @@ -142,6 +141,10 @@ class Fleet(object): is_collective (Boolean, optional): A ``Boolean`` variable determines whether the program runs on the CPU or GPU. False means set distributed training using CPU, and True means GPU.The default value is False.The default value is False. + strategy (DistributedStrategy): Extra properties for distributed training. + For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None. + + Returns: None @@ -167,6 +170,14 @@ class Fleet(object): role = fleet.PaddleCloudRoleMaker() fleet.init(role) + Examples4: + + .. code-block:: python + + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + fleet.init(strategy) + """ if role_maker is None: @@ -209,6 +220,10 @@ class Fleet(object): else: paddle.distributed.init_parallel_env() + if strategy is None: + strategy = DistributedStrategy() + self._user_defined_strategy = copy.deepcopy(strategy) + def is_first_worker(self): """ Check whether the node is the first instance of worker. @@ -575,7 +590,11 @@ class Fleet(object): Args: optimizer(Optimizer): The executor to run for init server. - strategy(DistributedStrategy): Extra properties for distributed optimizer. + strategy(DistributedStrategy): Extra properties for distributed optimizer. + It is recommended to use DistributedStrategy in fleet.init(). The strategy + here is for compatibility. If the strategy in fleet.distributed_optimizer() + is not None, then it will overwrite the DistributedStrategy in fleet.init(), + which will take effect in distributed training. Returns: Fleet: instance of fleet. @@ -594,27 +613,25 @@ class Fleet(object): """ self.user_defined_optimizer = optimizer - if strategy == None: - strategy = DistributedStrategy() + if strategy is not None: + warnings.warn( + "It is recommended to pass in DistributedStrategy" + "in fleet.init. The strategy here is for compatibility." + "If the `strategy` in fleet.distributed_optimizer() is" + "not None, then it will overwrite the DistributedStrategy in fleet.init()," + "which will take effect in distributed training.") + self._user_defined_strategy = copy.deepcopy(strategy) - self._user_defined_strategy = copy.deepcopy(strategy) self._context = {} return self @dygraph_only - def distributed_model(self, model, group_size_limits=25, - small_group_size=1): + def distributed_model(self, model): """ Return distributed data parallel model (Only work in dygraph mode) Args: model (Layer): the user-defind model which inherits Layer. - group_size_limits(int, optional): It is up limited memory size(MB) of one group - parameters' gradient which is the input of communication - calling(e.g NCCLAllReduce). Default: 25. - small_group_size(int, optional): It is up limited memory size(MB) of last group in communication - calling. Making the last group small is useful to - improve performance. Default: 1. Returns: distributed data parallel model which inherits Layer. @@ -667,8 +684,9 @@ class Fleet(object): assert model is not None self.model = paddle.DataParallel( model, - group_size_limits=group_size_limits, - small_group_size=small_group_size) + comm_buffer_size=self._user_defined_strategy.fuse_grad_size_in_MB, + last_comm_buffer_size=self._user_defined_strategy. + last_comm_group_size_MB) return self.model @dygraph_only diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 46fdf05d0ddfa2c6c418215d299adcbc4f9dd540..852684cb95d1ad678e5c914b48ab71919a2af55d 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -309,11 +309,11 @@ class DataParallel(layers.Layer): layers(Layer): The module that should be executed by data parallel. strategy(ParallelStrategy, optional): (deprecated) The strategy of data parallelism, contains environment configuration related to parallel execution. Default: None. - group_size_limits(int, optional): It is up limited memory size(MB) of one group + comm_buffer_size(int, optional): It limits the memory size(MB) of one buffer parameters' gradient which is the input of communication calling(e.g NCCLAllReduce). Default: 25. - small_group_size(int, optional): It is up limited memory size(MB) of last group in communication - calling. Making the last group small is useful to + last_comm_buffer_size(float, optional): It limits memory size(MB) of last buffer in communication + calling. Making the last communication buffer size small is useful to improve performance. Default: 1. Returns: @@ -369,8 +369,8 @@ class DataParallel(layers.Layer): def __init__(self, layers, strategy=None, - group_size_limits=25, - small_group_size=1): + comm_buffer_size=25, + last_comm_buffer_size=1): super(DataParallel, self).__init__(layers.full_name() + "_data_parallel") @@ -386,12 +386,13 @@ class DataParallel(layers.Layer): self._strategy = _build_default_parallel_strategy() if self._strategy.nranks > 1: - self.group_size_limits = int(group_size_limits * 1024 * 1024) + self.comm_buffer_size = int(comm_buffer_size * 1024 * 1024) # NOTE(shenliang03): We can set environment variables to control # the size of the group, Default: 1MB. The role of this small group is: # when the last group allreduce, the overlap cannot work. Making the # the last group small is useful to improve performance. - self.small_group_size = int(small_group_size * 1024 * 1024) + self.last_comm_buffer_size = int(last_comm_buffer_size * 1024 * + 1024) self.init_reducer() else: warnings.warn( @@ -431,7 +432,7 @@ class DataParallel(layers.Layer): self.group_indices = core.assign_group_by_size( trainable_parameters, is_sparse_gradient, - [self.small_group_size, self.group_size_limits]) + [self.last_comm_buffer_size, self.comm_buffer_size]) assert parallel_helper.__parallel_ctx__clz__ is not None, \ "ParallelContext must be initialized before. You should use init_parallel_env() before" \ diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py index deaf342da12af9ab7d7b0c659961b3ee6c1ee478..7375049b3c8646db509846dc7c12260e6b72d4b9 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -169,6 +169,13 @@ class TestStrategyConfig(unittest.TestCase): strategy.fuse_grad_size_in_MB = "40" self.assertEqual(strategy.fuse_grad_size_in_MB, 50) + def test_last_comm_group_size_MB(self): + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.last_comm_group_size_MB = 50 + self.assertEqual(strategy.last_comm_group_size_MB, 50) + with self.assertRaises(ValueError): + strategy.last_comm_group_size_MB = -1 + def test_fuse_grad_size_in_TFLOPS(self): strategy = paddle.distributed.fleet.DistributedStrategy() strategy._fuse_grad_size_in_TFLOPS = 0.1