未验证 提交 911c8593 编写于 作者: W WangXi 提交者: GitHub

optimize pipeline performance with recompute and amp, test=allcase (#34519)

上级 1d7b75dd
...@@ -945,6 +945,13 @@ def _append_backward_ops_with_checkpoints_( ...@@ -945,6 +945,13 @@ def _append_backward_ops_with_checkpoints_(
for op_desc in reversed(added_descs): for op_desc in reversed(added_descs):
grad_op_desc, op_grad_to_var = core.get_grad_op_desc( grad_op_desc, op_grad_to_var = core.get_grad_op_desc(
op_desc, cpt.to_text(no_grad_dict[block.idx]), []) op_desc, cpt.to_text(no_grad_dict[block.idx]), [])
# Set device for grad_op according to forward Op
if op_desc.has_attr(device_attr_name):
op_device = op_desc.attr(device_attr_name)
for g_op_desc in grad_op_desc:
g_op_desc._set_attr(device_attr_name, op_device)
for key in var_name_dict: for key in var_name_dict:
_rename_arg_(grad_op_desc, key, var_name_dict[key]) _rename_arg_(grad_op_desc, key, var_name_dict[key])
grad_op_descs.extend(grad_op_desc) grad_op_descs.extend(grad_op_desc)
......
...@@ -150,6 +150,8 @@ gray_list = { ...@@ -150,6 +150,8 @@ gray_list = {
'c_identity', 'c_identity',
'c_concat', 'c_concat',
'c_allreduce_sum', 'c_allreduce_sum',
'concat',
'split',
} }
# The set of ops that don't support fp16 calculation # The set of ops that don't support fp16 calculation
......
...@@ -110,6 +110,27 @@ def _insert_cast_op(block, op, idx, src_dtype, dest_dtype): ...@@ -110,6 +110,27 @@ def _insert_cast_op(block, op, idx, src_dtype, dest_dtype):
cast_name = in_var.name + '.cast_' + _dtype_to_str(dest_dtype) cast_name = in_var.name + '.cast_' + _dtype_to_str(dest_dtype)
out_var = block.vars.get(cast_name) out_var = block.vars.get(cast_name)
if out_var is None or out_var.dtype != dest_dtype: if out_var is None or out_var.dtype != dest_dtype:
op_device = op.attr('op_device')
# NOTE(wangxi): optimize for pipeline, reduce one send.
# if in_var is stop_gradient and prev_op device is `all`,
# set cast_op device to `all`, can reduce send cast_var.
# TODO: need remove this after we unified the dynamic
# and static pipeline interface.
if src_dtype == core.VarDesc.VarType.FP32 and in_var.stop_gradient:
prev_op = None
if in_var.op is op:
prev_op = find_true_prev_op(block.ops, op,
in_var_name)
elif in_var.op is not None:
prev_op = in_var.op
prev_op_device = None
if prev_op is not None:
prev_op_device = prev_op.attr('op_device')
if prev_op_device is not None and 'all' in prev_op_device:
op_device = prev_op_device
out_var = block.create_var( out_var = block.create_var(
name=cast_name, name=cast_name,
dtype=dest_dtype, dtype=dest_dtype,
...@@ -124,7 +145,7 @@ def _insert_cast_op(block, op, idx, src_dtype, dest_dtype): ...@@ -124,7 +145,7 @@ def _insert_cast_op(block, op, idx, src_dtype, dest_dtype):
attrs={ attrs={
"in_dtype": in_var.dtype, "in_dtype": in_var.dtype,
"out_dtype": out_var.dtype, "out_dtype": out_var.dtype,
"op_device": op.attr("op_device") "op_device": op_device
}) })
num_cast_ops += 1 num_cast_ops += 1
_rename_arg(op, in_var.name, out_var.name) _rename_arg(op, in_var.name, out_var.name)
......
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
import unittest import unittest
import paddle import paddle
import paddle.fluid as fluid
import paddle.static as static
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import os import os
paddle.enable_static() paddle.enable_static()
...@@ -25,26 +29,34 @@ class TestFleetMetaOptimizer(unittest.TestCase): ...@@ -25,26 +29,34 @@ class TestFleetMetaOptimizer(unittest.TestCase):
os.environ[ os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.1:36002" "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001,127.0.0.1:36002"
def test_pipeline_optimizer(self): def net(self):
import paddle.distributed.fleet as fleet with static.device_guard("gpu:0"):
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
with paddle.fluid.device_guard("gpu:0"):
input_x = paddle.fluid.layers.data( input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32') name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data( input_y = paddle.fluid.layers.data(
name="y", shape=[1], dtype='int64') name="y", shape=[1], dtype='int64')
input_z = paddle.fluid.layers.data(
name="z", shape=[1], dtype="float32")
with static.device_guard("gpu:all"):
input_z = input_z * 1.0
input_z.stop_gradient = True
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_1 = fc_1 * input_z
with paddle.fluid.device_guard("gpu:1"): with static.device_guard("gpu:1"):
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
fc_2 = fc_2 * input_z
prediction = paddle.fluid.layers.fc(input=[fc_2], prediction = paddle.fluid.layers.fc(input=[fc_2],
size=2, size=2,
act='softmax') act='softmax')
cost = paddle.fluid.layers.cross_entropy( cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y) input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost) avg_cost = paddle.fluid.layers.mean(x=cost)
return avg_cost
def test_pipeline_optimizer(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy() strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.pipeline = True strategy.pipeline = True
...@@ -53,10 +65,44 @@ class TestFleetMetaOptimizer(unittest.TestCase): ...@@ -53,10 +65,44 @@ class TestFleetMetaOptimizer(unittest.TestCase):
'accumulate_steps': 2 'accumulate_steps': 2
} }
train_prog, startup_prog = static.Program(), static.Program()
with static.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
avg_cost = self.net()
optimizer = paddle.fluid.optimizer.Adam(0.01) optimizer = paddle.fluid.optimizer.Adam(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(
optimizer, strategy=strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
def test_pipeline_amp_optimizer(self):
""" test pipeline&amp with device:all """
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.amp = True
strategy.pipeline = True
strategy.pipeline_configs = {
'micro_batch_size': 1,
'accumulate_steps': 2
}
train_prog, startup_prog = static.Program(), static.Program()
with static.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
avg_cost = self.net()
optimizer = paddle.fluid.optimizer.Adam(0.01)
optimizer = fleet.distributed_optimizer(
optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
ops = train_prog._pipeline_opt['section_program'].global_block().ops
ops = [op.type for op in ops]
self.assertEqual(ops.count('send_v2'), 1)
self.assertEqual(ops.count('recv_v2'), 1)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册