From 0dadacc4eb48c6412ea8798a5343c7eebd290f6c Mon Sep 17 00:00:00 2001 From: JZ-LIANG <38102074+JZ-LIANG@users.noreply.github.com> Date: Thu, 26 Nov 2020 19:26:49 +0800 Subject: [PATCH] [sharding] doc, api, bug fixed (#28983) * add lars to fleet meta optimizer * add lamb to proto * add lamb to fleet meta optimizer * fixed syntax bug * fixed syntax bug * fixed syntax error in lamb, add config setter of lamb in distributed_strategy * trigger unitest to rerun * add new unitest func for lamb * revise unitest for lars and lamb * revise dgc meta unitest * revise lars document in distribute_strategy * revise lars lamb document in distributed_strategy.py * revise lars lamb document in distributed_strategy.py * add weight decay exclude logic to lars * restore optimzier.py * restore optimizer.py as develop except lars * add epsilon and exclude fn to distributed_sttrategy * add lars epsilon * revise unitest for fleet lars and lamb * revise lars lamb unitest for CI coverage * revise lars argument api * revise lars argument api * revise lars argument api * revise api doc of lars * fix op role * add sharding save and add_sync_comm_for_test function * add comm_analyse to utlis * revise sharding_utils * add sharding saving unittest * revise sharding utils for unittest * revise sharding en doc * update sharding utils api * add doc for sharding * fixed bug in sharding var size count * update varsize count in sharding * fix sharding num_nccl_comm * Revert "fix sharding num_nccl_comm" This reverts commit d51587c15e9323acf226ddd36154275f0d1daf76. --- .../fleet/base/distributed_strategy.py | 11 +++++++--- .../fleet/meta_optimizers/sharding/utils.py | 12 +++++----- .../meta_optimizers/sharding_optimizer.py | 0 .../tests/unittests/dist_sharding_save.py | 22 ++++++++++--------- .../test_fleet_sharding_meta_optimizer.py | 20 ++++++++--------- 5 files changed, 37 insertions(+), 28 deletions(-) mode change 100644 => 100755 python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 46ccb4663e8..cb1c28b39b6 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -615,12 +615,15 @@ class DistributedStrategy(object): def sharding(self): """ Indicating whether we are using sharding Optimizer for memory - optimization + optimization. We implement the sharding optimizer following the ZeRO-DP + idea from [ZeRO: Memory Optimizations Toward Training Trillion Parameter Models](https://arxiv.org/abs/1910.02054). + Model parameters and Optimizer State are sharded into different ranks allowing to fit larger model. Default value: False Examples: .. code-block:: python + import paddle.fleet as fleet strategy = fleet.DistributedStrategy() strategy.sharding = True @@ -638,10 +641,12 @@ class DistributedStrategy(object): @property def sharding_configs(self): """ - Set sharding configurations. + Set sharding configurations. **Note**: - fuse_broadcast_MB(float): size of a fused group of broadcasted parameters. + fuse_broadcast_MB(float): size of a fused group of broadcasted parameters. + This configuration will affect the communication speed in sharding training, + and should be an empirical value decided by your model size and network topology. Examples: .. code-block:: python diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 2aa4bdd68c9..b5c34f87cdf 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -265,7 +265,7 @@ def get_var_size(param): input: - param: var return: - var size in Bytes + var size in MB """ assert -1 not in param.shape return reduce(lambda x, y: x * y, @@ -299,10 +299,12 @@ def comm_analyse(main_program): for op in block.ops: if op.type == "c_broadcast": var_name = op.desc.input_arg_names()[0] - broadcast_vars[var_name] = get_var_size(block.var(var_name)) + # convert MB to KB + broadcast_vars[var_name] = get_var_size(block.var( + var_name)) * 1024.0 elif op.type == "c_allreduce_sum": var_name = op.desc.input_arg_names()[0] - reduce_vars[var_name] = get_var_size(block.var(var_name)) + reduce_vars[var_name] = get_var_size(block.var(var_name)) * 1024.0 varsize_count = {} gap = 1 @@ -329,7 +331,7 @@ def comm_analyse(main_program): count)) -def add_sync_comm_for_test(program, dist_strategy): +def add_sync_comm(program, dist_strategy): """ When clone a test prog by clone from the sharding main prog, part of the sync_comm op maybe be pruned by mistake, this function @@ -361,7 +363,7 @@ def add_sync_comm_for_test(program, dist_strategy): return -def sharding_save_persistables(exe, dirname, main_program, filename=None): +def save_persistables(exe, dirname, main_program, filename=None): """ When use sharding, part of persistable vars are unique and are partitioned in different ranks, and part of persistable vars are duplicated and exist in all the ranks with different values. diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py old mode 100644 new mode 100755 diff --git a/python/paddle/fluid/tests/unittests/dist_sharding_save.py b/python/paddle/fluid/tests/unittests/dist_sharding_save.py index 05578c9e4a5..22c930bf894 100755 --- a/python/paddle/fluid/tests/unittests/dist_sharding_save.py +++ b/python/paddle/fluid/tests/unittests/dist_sharding_save.py @@ -21,7 +21,7 @@ from dist_mnist import cnn_model # from paddle.fluid.incubate.fleet.collective import fleet import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker -from paddle.distributed.fleet.meta_optimizers.sharding.utils import sharding_save_persistables +import paddle.distributed.fleet.meta_optimizers.sharding as sharding import os import six @@ -32,6 +32,7 @@ import pickle fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 + def runtime_main(): import paddle.distributed.fleet as fleet @@ -47,9 +48,7 @@ def runtime_main(): input_y = paddle.fluid.layers.data( name="y", shape=[1], dtype='int64') - fc_1 = paddle.fluid.layers.fc(input=input_x, - size=64, - act='tanh') + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') fc_2 = paddle.fluid.layers.fc(input=fc_1, size=256, act='tanh') prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, @@ -62,8 +61,10 @@ def runtime_main(): strategy.sharding = True strategy.sharding_configs = {"fuse_broadcast_MB": 0.2} - optimizer = paddle.fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) - optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer = paddle.fluid.optimizer.Momentum( + learning_rate=0.01, momentum=0.9) + optimizer = fleet.distributed_optimizer( + optimizer, strategy=strategy) optimizer.minimize(avg_cost) # execution @@ -71,15 +72,17 @@ def runtime_main(): place = fluid.CUDAPlace(device_id) exe = fluid.Executor(place) exe.run(startup_prog) - dirname="./ut_sharding_save_model" - sharding_save_persistables(exe, dirname, main_program=train_prog, filename=None) + dirname = "./ut_sharding_save_model" + sharding.utils.save_persistables( + exe, dirname, main_program=train_prog, filename=None) - out_losses=[] + out_losses = [] if six.PY2: print(pickle.dumps(out_losses)) else: sys.stdout.buffer.write(pickle.dumps(out_losses)) + if __name__ == "__main__": #NOTE(liangjianzhong): dist unittest should be imlpement using runtime_main in test_dist_base.py # but the runtime_main in test_dist_base.py use the fleet, DistributedStrategy from @@ -87,4 +90,3 @@ if __name__ == "__main__": # this should be update in future. # runtime_main(TestDistMnist2x2) runtime_main() - \ No newline at end of file diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index 063ff726b10..01a7e25abb6 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -21,7 +21,7 @@ import paddle.fluid.core as core import paddle.fluid as fluid from fleet_meta_optimizer_base import TestFleetMetaOptimizer -from paddle.distributed.fleet.meta_optimizers.sharding.utils import add_sync_comm_for_test, sharding_save_persistables, comm_analyse +import paddle.distributed.fleet.meta_optimizers.sharding as sharding paddle.enable_static() @@ -279,19 +279,19 @@ class TestFleetShardingMetaOptimizer(TestFleetMetaOptimizer): avg_cost, strategy = self.net(train_prog, startup_prog) self.set_strategy(strategy, 'sharding') self.optimizer(avg_cost, strategy, train_prog, startup_prog) - comm_analyse(train_prog) + sharding.utils.comm_analyse(train_prog) test_prog = train_prog.clone(for_test=True) - add_sync_comm_for_test(test_prog, strategy) + sharding.utils.add_sync_comm(test_prog, strategy) ops = [op.type for op in test_prog.global_block().ops] - self.assertEqual(ops, ['fill_constant', 'fill_constant', 'fill_constant', 'c_sync_calc_stream', 'c_broadcast', - 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_sync_comm_stream', 'mul', - 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'softmax', - 'cross_entropy2', 'mean']) - - + self.assertEqual(ops, [ + 'fill_constant', 'fill_constant', 'fill_constant', + 'c_sync_calc_stream', 'c_broadcast', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_sync_comm_stream', + 'mul', 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', + 'mul', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean' + ]) - if __name__ == "__main__": unittest.main() -- GitLab