diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 70e16d67fb9f17ea9b51197abe73c7bca1ec8deb..e70fe1e280fa4c0904791924e1ba55858aff4616 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -1078,6 +1078,19 @@ def _linear(x, weight, bias=None, name=None): return res +def _set_var_distributed(var): + if var is None: + return + + var.is_distributed = True + + # NOTE: use current_block and find_var_recursive to support while_loop + startup_block = paddle.static.default_startup_program().current_block() + main_block = paddle.static.default_main_program().current_block() + startup_block._find_var_recursive(var.name).is_distributed = True + main_block._find_var_recursive(var.name).is_distributed = True + + def _parallel_linear(x, num_rows, num_cols, @@ -1095,7 +1108,7 @@ def _parallel_linear(x, axis the dimension of the parameter of linear layer. axis = 0: the row dimension - axid = 1: the col dimension + axis = 1: the col dimension """ if group is not None and not group.is_member(): @@ -1108,40 +1121,35 @@ def _parallel_linear(x, else: x = _c_identity(x, group=group) - if core.is_compiled_with_npu(): - linear = _Linear( - num_rows, - num_cols, - weight_attr=param_attr, - bias_attr=bias_attr, - name=name) - else: - linear = paddle.nn.Linear( - num_rows, - num_cols, - weight_attr=param_attr, - bias_attr=bias_attr, - name=name) - - linear_out = linear(x) - startup_block = paddle.static.default_startup_program().current_block() - main_block = paddle.static.default_main_program().current_block() - startup_block._find_var_recursive(linear.weight.name).is_distributed = True - main_block._find_var_recursive(linear.weight.name).is_distributed = True + linear = paddle.nn.Linear( + num_rows, + num_cols, + weight_attr=param_attr, + bias_attr=bias_attr, + name=name) + # NOTE: npu linear function use matmul_v2 but linear use matmul + linear_function = _linear if core.is_compiled_with_npu()\ + else paddle.nn.functional.linear + linear_out = linear_function( + x, + linear.weight, + # NOTE(wangxi): row split, bias need add after allreduce + None if axis == 0 else linear.bias, + linear.name) + + _set_var_distributed(linear.weight) # set is_distributed for splited bias # if a linear layer is splited by row, each rank would hold a complete bias and they should be the same in each rank. # if a linear layer is splited by col, the bias would also be split into each rank as its weight if axis == 1 and linear._bias_attr != False: - startup_block._find_var_recursive( - linear.bias.name).is_distributed = True - main_block._find_var_recursive(linear.bias.name).is_distributed = True + _set_var_distributed(linear.bias) if not gather_out: return linear_out - op_type = 'c_allreduce_sum' if axis == 0 else 'c_concat' out_shape = list(linear_out.shape) out_shape[0] *= 1 if axis == 0 else nranks + main_block = paddle.static.default_main_program().current_block() out = main_block.create_var( shape=out_shape, dtype=linear_out.dtype, @@ -1160,6 +1168,8 @@ def _parallel_linear(x, 'use_calc_stream': True, 'use_model_parallel': True }) + if linear.bias is not None: + out = out + linear.bias else: main_block.append_op( type='c_concat', diff --git a/python/paddle/fluid/tests/unittests/static_model_parallel_by_col.py b/python/paddle/fluid/tests/unittests/static_model_parallel_by_col.py index 416f6bc4f0d417db6ae82380787f2a715b398ca6..6596eca4d397239d5268b1aeca8d990d471cb82e 100644 --- a/python/paddle/fluid/tests/unittests/static_model_parallel_by_col.py +++ b/python/paddle/fluid/tests/unittests/static_model_parallel_by_col.py @@ -43,29 +43,38 @@ OUT_SIZE = 2 * MODEL_PARALLEL_SIZE #fluid.default_main_program().random_seed = 1 +def get_param_attr(weight, bias): + weight_attr = paddle.ParamAttr( + initializer=fluid.initializer.NumpyArrayInitializer(weight)) + bias_attr = paddle.ParamAttr( + initializer=fluid.initializer.NumpyArrayInitializer(bias)) + return weight_attr, bias_attr + + def create_model(data, rank): np.random.seed(2021) np_weight = np.random.uniform(-1, 1, size=(IN_SIZE, OUT_SIZE)).astype(DTYPE) + np_bias = np.random.uniform(-1, 1, size=(OUT_SIZE, )).astype(DTYPE) if rank is not None: start_col = 0 if rank == 0 else OUT_SIZE // 2 np_weight_part = np_weight[:, start_col:start_col + OUT_SIZE // 2] + np_bias_part = np_bias[start_col:start_col + OUT_SIZE // 2] + + weight_attr, bias_attr = get_param_attr(np_weight_part, np_bias_part) result = paddle.distributed.split( data, size=(IN_SIZE, OUT_SIZE), operation='linear', axis=1, num_partitions=MODEL_PARALLEL_SIZE, - weight_attr=paddle.ParamAttr( - initializer=fluid.initializer.NumpyArrayInitializer( - np_weight_part)), - bias_attr=False, ) + weight_attr=weight_attr, + bias_attr=bias_attr) else: - result = fluid.layers.fc( - data, - size=OUT_SIZE, - param_attr=paddle.ParamAttr( - initializer=fluid.initializer.NumpyArrayInitializer(np_weight)), - bias_attr=False, ) + weight_attr, bias_attr = get_param_attr(np_weight, np_bias) + result = fluid.layers.fc(data, + size=OUT_SIZE, + param_attr=weight_attr, + bias_attr=bias_attr) predict = paddle.sum(result) return predict diff --git a/python/paddle/fluid/tests/unittests/static_model_parallel_by_row.py b/python/paddle/fluid/tests/unittests/static_model_parallel_by_row.py index 4a98792f8a0473ec855dccb0d06c7c5751e72f41..fd886e16ced5f4d9493507cbaa29b468a49435ff 100644 --- a/python/paddle/fluid/tests/unittests/static_model_parallel_by_row.py +++ b/python/paddle/fluid/tests/unittests/static_model_parallel_by_row.py @@ -43,29 +43,39 @@ OUT_SIZE = 2 * MODEL_PARALLEL_SIZE #fluid.default_main_program().random_seed = 1 +def get_param_attr(weight, bias): + weight_attr = paddle.ParamAttr( + initializer=fluid.initializer.NumpyArrayInitializer(weight)) + bias_attr = paddle.ParamAttr( + initializer=fluid.initializer.NumpyArrayInitializer(bias)) + return weight_attr, bias_attr + + def create_model(data, rank): np.random.seed(2021) np_weight = np.random.uniform(-1, 1, size=(IN_SIZE, OUT_SIZE)).astype(DTYPE) + np_bias = np.random.uniform(-1, 1, size=(OUT_SIZE, )).astype(DTYPE) if rank is not None: start_row = 0 if rank == 0 else IN_SIZE // 2 np_weight_part = np_weight[start_row:start_row + IN_SIZE // 2, :] + + weight_attr, bias_attr = get_param_attr(np_weight_part, np_bias) result = paddle.distributed.split( data, size=(IN_SIZE, OUT_SIZE), operation='linear', axis=0, num_partitions=MODEL_PARALLEL_SIZE, - weight_attr=paddle.ParamAttr( - initializer=fluid.initializer.NumpyArrayInitializer( - np_weight_part)), - bias_attr=False, ) + weight_attr=weight_attr, + bias_attr=bias_attr) else: + weight_attr, bias_attr = get_param_attr(np_weight, np_bias) result = fluid.layers.fc( data, size=OUT_SIZE, param_attr=paddle.ParamAttr( initializer=fluid.initializer.NumpyArrayInitializer(np_weight)), - bias_attr=False, ) + bias_attr=bias_attr) predict = paddle.sum(result) return predict