未验证 提交 0dadacc4 编写于 作者: J JZ-LIANG 提交者: GitHub

[sharding] doc, api, bug fixed (#28983)

* add lars to fleet meta optimizer

* add lamb to proto

* add lamb to fleet meta optimizer

* fixed syntax bug

* fixed syntax bug

* fixed syntax error in lamb, add config setter of lamb in distributed_strategy

* trigger unitest to rerun

* add new unitest func for lamb

* revise unitest for lars and lamb

* revise dgc meta unitest

* revise lars document in distribute_strategy

* revise lars lamb document in distributed_strategy.py

* revise lars lamb document in distributed_strategy.py

* add weight decay exclude logic to lars

* restore optimzier.py

* restore optimizer.py as develop except lars

* add epsilon and exclude fn to distributed_sttrategy

* add lars epsilon

* revise unitest for fleet lars and lamb

* revise lars lamb unitest for CI coverage

* revise lars argument api

* revise lars argument api

* revise lars argument api

* revise api doc of lars

* fix op role

* add sharding save and add_sync_comm_for_test function

* add comm_analyse to utlis

* revise sharding_utils

* add sharding saving unittest

* revise sharding utils for unittest

* revise sharding en doc

* update sharding utils api

* add doc for sharding

* fixed bug in sharding var size count

* update varsize count in sharding

* fix sharding num_nccl_comm

* Revert "fix sharding num_nccl_comm"

This reverts commit d51587c15e9323acf226ddd36154275f0d1daf76.
上级 7ae3cb55
......@@ -615,12 +615,15 @@ class DistributedStrategy(object):
def sharding(self):
"""
Indicating whether we are using sharding Optimizer for memory
optimization
optimization. We implement the sharding optimizer following the ZeRO-DP
idea from [ZeRO: Memory Optimizations Toward Training Trillion Parameter Models](https://arxiv.org/abs/1910.02054).
Model parameters and Optimizer State are sharded into different ranks allowing to fit larger model.
Default value: False
Examples:
.. code-block:: python
import paddle.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.sharding = True
......@@ -638,10 +641,12 @@ class DistributedStrategy(object):
@property
def sharding_configs(self):
"""
Set sharding configurations.
Set sharding configurations.
**Note**:
fuse_broadcast_MB(float): size of a fused group of broadcasted parameters.
fuse_broadcast_MB(float): size of a fused group of broadcasted parameters.
This configuration will affect the communication speed in sharding training,
and should be an empirical value decided by your model size and network topology.
Examples:
.. code-block:: python
......
......@@ -265,7 +265,7 @@ def get_var_size(param):
input:
- param: var
return:
var size in Bytes
var size in MB
"""
assert -1 not in param.shape
return reduce(lambda x, y: x * y,
......@@ -299,10 +299,12 @@ def comm_analyse(main_program):
for op in block.ops:
if op.type == "c_broadcast":
var_name = op.desc.input_arg_names()[0]
broadcast_vars[var_name] = get_var_size(block.var(var_name))
# convert MB to KB
broadcast_vars[var_name] = get_var_size(block.var(
var_name)) * 1024.0
elif op.type == "c_allreduce_sum":
var_name = op.desc.input_arg_names()[0]
reduce_vars[var_name] = get_var_size(block.var(var_name))
reduce_vars[var_name] = get_var_size(block.var(var_name)) * 1024.0
varsize_count = {}
gap = 1
......@@ -329,7 +331,7 @@ def comm_analyse(main_program):
count))
def add_sync_comm_for_test(program, dist_strategy):
def add_sync_comm(program, dist_strategy):
"""
When clone a test prog by clone from the sharding main prog,
part of the sync_comm op maybe be pruned by mistake, this function
......@@ -361,7 +363,7 @@ def add_sync_comm_for_test(program, dist_strategy):
return
def sharding_save_persistables(exe, dirname, main_program, filename=None):
def save_persistables(exe, dirname, main_program, filename=None):
"""
When use sharding, part of persistable vars are unique and are partitioned in different ranks,
and part of persistable vars are duplicated and exist in all the ranks with different values.
......
......@@ -21,7 +21,7 @@ from dist_mnist import cnn_model
# from paddle.fluid.incubate.fleet.collective import fleet
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
from paddle.distributed.fleet.meta_optimizers.sharding.utils import sharding_save_persistables
import paddle.distributed.fleet.meta_optimizers.sharding as sharding
import os
import six
......@@ -32,6 +32,7 @@ import pickle
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
def runtime_main():
import paddle.distributed.fleet as fleet
......@@ -47,9 +48,7 @@ def runtime_main():
input_y = paddle.fluid.layers.data(
name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x,
size=64,
act='tanh')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=256, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2],
size=2,
......@@ -62,8 +61,10 @@ def runtime_main():
strategy.sharding = True
strategy.sharding_configs = {"fuse_broadcast_MB": 0.2}
optimizer = paddle.fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer = paddle.fluid.optimizer.Momentum(
learning_rate=0.01, momentum=0.9)
optimizer = fleet.distributed_optimizer(
optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
# execution
......@@ -71,15 +72,17 @@ def runtime_main():
place = fluid.CUDAPlace(device_id)
exe = fluid.Executor(place)
exe.run(startup_prog)
dirname="./ut_sharding_save_model"
sharding_save_persistables(exe, dirname, main_program=train_prog, filename=None)
dirname = "./ut_sharding_save_model"
sharding.utils.save_persistables(
exe, dirname, main_program=train_prog, filename=None)
out_losses=[]
out_losses = []
if six.PY2:
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
if __name__ == "__main__":
#NOTE(liangjianzhong): dist unittest should be imlpement using runtime_main in test_dist_base.py
# but the runtime_main in test_dist_base.py use the fleet, DistributedStrategy from
......@@ -87,4 +90,3 @@ if __name__ == "__main__":
# this should be update in future.
# runtime_main(TestDistMnist2x2)
runtime_main()
\ No newline at end of file
......@@ -21,7 +21,7 @@ import paddle.fluid.core as core
import paddle.fluid as fluid
from fleet_meta_optimizer_base import TestFleetMetaOptimizer
from paddle.distributed.fleet.meta_optimizers.sharding.utils import add_sync_comm_for_test, sharding_save_persistables, comm_analyse
import paddle.distributed.fleet.meta_optimizers.sharding as sharding
paddle.enable_static()
......@@ -279,19 +279,19 @@ class TestFleetShardingMetaOptimizer(TestFleetMetaOptimizer):
avg_cost, strategy = self.net(train_prog, startup_prog)
self.set_strategy(strategy, 'sharding')
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
comm_analyse(train_prog)
sharding.utils.comm_analyse(train_prog)
test_prog = train_prog.clone(for_test=True)
add_sync_comm_for_test(test_prog, strategy)
sharding.utils.add_sync_comm(test_prog, strategy)
ops = [op.type for op in test_prog.global_block().ops]
self.assertEqual(ops, ['fill_constant', 'fill_constant', 'fill_constant', 'c_sync_calc_stream', 'c_broadcast',
'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_sync_comm_stream', 'mul',
'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'softmax',
'cross_entropy2', 'mean'])
self.assertEqual(ops, [
'fill_constant', 'fill_constant', 'fill_constant',
'c_sync_calc_stream', 'c_broadcast', 'c_broadcast', 'c_broadcast',
'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_sync_comm_stream',
'mul', 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh',
'mul', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean'
])
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册