diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py index 46ccb4663e8b7a41f8cee6608521ebda2feca7a3..cb1c28b39b69952f87488a0fe2a9c3ee3bc96387 100755 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -615,12 +615,15 @@ class DistributedStrategy(object): def sharding(self): """ Indicating whether we are using sharding Optimizer for memory - optimization + optimization. We implement the sharding optimizer following the ZeRO-DP + idea from [ZeRO: Memory Optimizations Toward Training Trillion Parameter Models](https://arxiv.org/abs/1910.02054). + Model parameters and Optimizer State are sharded into different ranks allowing to fit larger model. Default value: False Examples: .. code-block:: python + import paddle.fleet as fleet strategy = fleet.DistributedStrategy() strategy.sharding = True @@ -638,10 +641,12 @@ class DistributedStrategy(object): @property def sharding_configs(self): """ - Set sharding configurations. + Set sharding configurations. **Note**: - fuse_broadcast_MB(float): size of a fused group of broadcasted parameters. + fuse_broadcast_MB(float): size of a fused group of broadcasted parameters. + This configuration will affect the communication speed in sharding training, + and should be an empirical value decided by your model size and network topology. Examples: .. code-block:: python diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 2aa4bdd68c9907c80728bf285e23c172fc2979ac..b5c34f87cdf22534054fb5c8499734c59b061b18 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -265,7 +265,7 @@ def get_var_size(param): input: - param: var return: - var size in Bytes + var size in MB """ assert -1 not in param.shape return reduce(lambda x, y: x * y, @@ -299,10 +299,12 @@ def comm_analyse(main_program): for op in block.ops: if op.type == "c_broadcast": var_name = op.desc.input_arg_names()[0] - broadcast_vars[var_name] = get_var_size(block.var(var_name)) + # convert MB to KB + broadcast_vars[var_name] = get_var_size(block.var( + var_name)) * 1024.0 elif op.type == "c_allreduce_sum": var_name = op.desc.input_arg_names()[0] - reduce_vars[var_name] = get_var_size(block.var(var_name)) + reduce_vars[var_name] = get_var_size(block.var(var_name)) * 1024.0 varsize_count = {} gap = 1 @@ -329,7 +331,7 @@ def comm_analyse(main_program): count)) -def add_sync_comm_for_test(program, dist_strategy): +def add_sync_comm(program, dist_strategy): """ When clone a test prog by clone from the sharding main prog, part of the sync_comm op maybe be pruned by mistake, this function @@ -361,7 +363,7 @@ def add_sync_comm_for_test(program, dist_strategy): return -def sharding_save_persistables(exe, dirname, main_program, filename=None): +def save_persistables(exe, dirname, main_program, filename=None): """ When use sharding, part of persistable vars are unique and are partitioned in different ranks, and part of persistable vars are duplicated and exist in all the ranks with different values. diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py old mode 100644 new mode 100755 diff --git a/python/paddle/fluid/tests/unittests/dist_sharding_save.py b/python/paddle/fluid/tests/unittests/dist_sharding_save.py index 05578c9e4a57f8694f5b596f8f57e4cf463030a4..22c930bf8948aa41eacd9c68b870be7b69719ef7 100755 --- a/python/paddle/fluid/tests/unittests/dist_sharding_save.py +++ b/python/paddle/fluid/tests/unittests/dist_sharding_save.py @@ -21,7 +21,7 @@ from dist_mnist import cnn_model # from paddle.fluid.incubate.fleet.collective import fleet import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker -from paddle.distributed.fleet.meta_optimizers.sharding.utils import sharding_save_persistables +import paddle.distributed.fleet.meta_optimizers.sharding as sharding import os import six @@ -32,6 +32,7 @@ import pickle fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 + def runtime_main(): import paddle.distributed.fleet as fleet @@ -47,9 +48,7 @@ def runtime_main(): input_y = paddle.fluid.layers.data( name="y", shape=[1], dtype='int64') - fc_1 = paddle.fluid.layers.fc(input=input_x, - size=64, - act='tanh') + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') fc_2 = paddle.fluid.layers.fc(input=fc_1, size=256, act='tanh') prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, @@ -62,8 +61,10 @@ def runtime_main(): strategy.sharding = True strategy.sharding_configs = {"fuse_broadcast_MB": 0.2} - optimizer = paddle.fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) - optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer = paddle.fluid.optimizer.Momentum( + learning_rate=0.01, momentum=0.9) + optimizer = fleet.distributed_optimizer( + optimizer, strategy=strategy) optimizer.minimize(avg_cost) # execution @@ -71,15 +72,17 @@ def runtime_main(): place = fluid.CUDAPlace(device_id) exe = fluid.Executor(place) exe.run(startup_prog) - dirname="./ut_sharding_save_model" - sharding_save_persistables(exe, dirname, main_program=train_prog, filename=None) + dirname = "./ut_sharding_save_model" + sharding.utils.save_persistables( + exe, dirname, main_program=train_prog, filename=None) - out_losses=[] + out_losses = [] if six.PY2: print(pickle.dumps(out_losses)) else: sys.stdout.buffer.write(pickle.dumps(out_losses)) + if __name__ == "__main__": #NOTE(liangjianzhong): dist unittest should be imlpement using runtime_main in test_dist_base.py # but the runtime_main in test_dist_base.py use the fleet, DistributedStrategy from @@ -87,4 +90,3 @@ if __name__ == "__main__": # this should be update in future. # runtime_main(TestDistMnist2x2) runtime_main() - \ No newline at end of file diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index 063ff726b10e4b09d70e75ba23a7229dad989f5b..01a7e25abb6d6161f745f5170b8d66cff7f1b6f3 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -21,7 +21,7 @@ import paddle.fluid.core as core import paddle.fluid as fluid from fleet_meta_optimizer_base import TestFleetMetaOptimizer -from paddle.distributed.fleet.meta_optimizers.sharding.utils import add_sync_comm_for_test, sharding_save_persistables, comm_analyse +import paddle.distributed.fleet.meta_optimizers.sharding as sharding paddle.enable_static() @@ -279,19 +279,19 @@ class TestFleetShardingMetaOptimizer(TestFleetMetaOptimizer): avg_cost, strategy = self.net(train_prog, startup_prog) self.set_strategy(strategy, 'sharding') self.optimizer(avg_cost, strategy, train_prog, startup_prog) - comm_analyse(train_prog) + sharding.utils.comm_analyse(train_prog) test_prog = train_prog.clone(for_test=True) - add_sync_comm_for_test(test_prog, strategy) + sharding.utils.add_sync_comm(test_prog, strategy) ops = [op.type for op in test_prog.global_block().ops] - self.assertEqual(ops, ['fill_constant', 'fill_constant', 'fill_constant', 'c_sync_calc_stream', 'c_broadcast', - 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_sync_comm_stream', 'mul', - 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'softmax', - 'cross_entropy2', 'mean']) - - + self.assertEqual(ops, [ + 'fill_constant', 'fill_constant', 'fill_constant', + 'c_sync_calc_stream', 'c_broadcast', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_sync_comm_stream', + 'mul', 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', + 'mul', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean' + ]) - if __name__ == "__main__": unittest.main()