未验证 提交 1533d7e2 编写于 作者: W WangXi 提交者: GitHub

[hybrid] Fix row parallel linear bias (#35186)

上级 7debae3a
......@@ -1078,6 +1078,19 @@ def _linear(x, weight, bias=None, name=None):
return res
def _set_var_distributed(var):
if var is None:
return
var.is_distributed = True
# NOTE: use current_block and find_var_recursive to support while_loop
startup_block = paddle.static.default_startup_program().current_block()
main_block = paddle.static.default_main_program().current_block()
startup_block._find_var_recursive(var.name).is_distributed = True
main_block._find_var_recursive(var.name).is_distributed = True
def _parallel_linear(x,
num_rows,
num_cols,
......@@ -1095,7 +1108,7 @@ def _parallel_linear(x,
axis the dimension of the parameter of linear layer.
axis = 0: the row dimension
axid = 1: the col dimension
axis = 1: the col dimension
"""
if group is not None and not group.is_member():
......@@ -1108,40 +1121,35 @@ def _parallel_linear(x,
else:
x = _c_identity(x, group=group)
if core.is_compiled_with_npu():
linear = _Linear(
num_rows,
num_cols,
weight_attr=param_attr,
bias_attr=bias_attr,
name=name)
else:
linear = paddle.nn.Linear(
num_rows,
num_cols,
weight_attr=param_attr,
bias_attr=bias_attr,
name=name)
linear_out = linear(x)
startup_block = paddle.static.default_startup_program().current_block()
main_block = paddle.static.default_main_program().current_block()
startup_block._find_var_recursive(linear.weight.name).is_distributed = True
main_block._find_var_recursive(linear.weight.name).is_distributed = True
linear = paddle.nn.Linear(
num_rows,
num_cols,
weight_attr=param_attr,
bias_attr=bias_attr,
name=name)
# NOTE: npu linear function use matmul_v2 but linear use matmul
linear_function = _linear if core.is_compiled_with_npu()\
else paddle.nn.functional.linear
linear_out = linear_function(
x,
linear.weight,
# NOTE(wangxi): row split, bias need add after allreduce
None if axis == 0 else linear.bias,
linear.name)
_set_var_distributed(linear.weight)
# set is_distributed for splited bias
# if a linear layer is splited by row, each rank would hold a complete bias and they should be the same in each rank.
# if a linear layer is splited by col, the bias would also be split into each rank as its weight
if axis == 1 and linear._bias_attr != False:
startup_block._find_var_recursive(
linear.bias.name).is_distributed = True
main_block._find_var_recursive(linear.bias.name).is_distributed = True
_set_var_distributed(linear.bias)
if not gather_out: return linear_out
op_type = 'c_allreduce_sum' if axis == 0 else 'c_concat'
out_shape = list(linear_out.shape)
out_shape[0] *= 1 if axis == 0 else nranks
main_block = paddle.static.default_main_program().current_block()
out = main_block.create_var(
shape=out_shape,
dtype=linear_out.dtype,
......@@ -1160,6 +1168,8 @@ def _parallel_linear(x,
'use_calc_stream': True,
'use_model_parallel': True
})
if linear.bias is not None:
out = out + linear.bias
else:
main_block.append_op(
type='c_concat',
......
......@@ -43,29 +43,38 @@ OUT_SIZE = 2 * MODEL_PARALLEL_SIZE
#fluid.default_main_program().random_seed = 1
def get_param_attr(weight, bias):
weight_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(weight))
bias_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(bias))
return weight_attr, bias_attr
def create_model(data, rank):
np.random.seed(2021)
np_weight = np.random.uniform(-1, 1, size=(IN_SIZE, OUT_SIZE)).astype(DTYPE)
np_bias = np.random.uniform(-1, 1, size=(OUT_SIZE, )).astype(DTYPE)
if rank is not None:
start_col = 0 if rank == 0 else OUT_SIZE // 2
np_weight_part = np_weight[:, start_col:start_col + OUT_SIZE // 2]
np_bias_part = np_bias[start_col:start_col + OUT_SIZE // 2]
weight_attr, bias_attr = get_param_attr(np_weight_part, np_bias_part)
result = paddle.distributed.split(
data,
size=(IN_SIZE, OUT_SIZE),
operation='linear',
axis=1,
num_partitions=MODEL_PARALLEL_SIZE,
weight_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(
np_weight_part)),
bias_attr=False, )
weight_attr=weight_attr,
bias_attr=bias_attr)
else:
result = fluid.layers.fc(
data,
size=OUT_SIZE,
param_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(np_weight)),
bias_attr=False, )
weight_attr, bias_attr = get_param_attr(np_weight, np_bias)
result = fluid.layers.fc(data,
size=OUT_SIZE,
param_attr=weight_attr,
bias_attr=bias_attr)
predict = paddle.sum(result)
return predict
......
......@@ -43,29 +43,39 @@ OUT_SIZE = 2 * MODEL_PARALLEL_SIZE
#fluid.default_main_program().random_seed = 1
def get_param_attr(weight, bias):
weight_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(weight))
bias_attr = paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(bias))
return weight_attr, bias_attr
def create_model(data, rank):
np.random.seed(2021)
np_weight = np.random.uniform(-1, 1, size=(IN_SIZE, OUT_SIZE)).astype(DTYPE)
np_bias = np.random.uniform(-1, 1, size=(OUT_SIZE, )).astype(DTYPE)
if rank is not None:
start_row = 0 if rank == 0 else IN_SIZE // 2
np_weight_part = np_weight[start_row:start_row + IN_SIZE // 2, :]
weight_attr, bias_attr = get_param_attr(np_weight_part, np_bias)
result = paddle.distributed.split(
data,
size=(IN_SIZE, OUT_SIZE),
operation='linear',
axis=0,
num_partitions=MODEL_PARALLEL_SIZE,
weight_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(
np_weight_part)),
bias_attr=False, )
weight_attr=weight_attr,
bias_attr=bias_attr)
else:
weight_attr, bias_attr = get_param_attr(np_weight, np_bias)
result = fluid.layers.fc(
data,
size=OUT_SIZE,
param_attr=paddle.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(np_weight)),
bias_attr=False, )
bias_attr=bias_attr)
predict = paddle.sum(result)
return predict
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册