未验证 提交 44e0393c 编写于 作者: N Nyakku Shigure 提交者: GitHub

bump black to 2023 style (#54523)

上级 e73ddd6c
......@@ -16,7 +16,6 @@
# Options affecting formatting.
# -----------------------------
with section("format"):
# How wide to allow formatted cmake files
line_width = 80
......
......@@ -53,7 +53,7 @@ repos:
)$
# For Python files
- repo: https://github.com/psf/black.git
rev: 22.8.0
rev: 23.3.0
hooks:
- id: black
files: (.*\.(py|pyi|bzl)|BUILD|.*\.BUILD|WORKSPACE)$
......
......@@ -547,7 +547,6 @@ def GenerateCoreOpInfoDeclaration():
def GenerateCoreOpInfoDefinition():
op_args_info_list = []
for op_name, arg_list in core_ops_args_info.items():
arg_str = ",".join(["\"" + v + "\"" for v in arg_list])
......@@ -803,7 +802,6 @@ class DygraphFunctionGeneratorBase(FunctionGeneratorBase):
self.backward_returns_list = backward_returns_list_new
def CollectForwardInfoFromBackwardContents(self):
backward_forward_str = self.backward_forward_str
(
......@@ -1910,7 +1908,6 @@ class DygraphNodeGenerator(DygraphFunctionGeneratorBase):
self.grad_api_contents["backward_op"] in prim_white_list
or is_invoke_forward_api
):
next_grad_node_creation_str = f"""
if (!paddle::prim::PrimCommonUtils::IsEagerPrimEnabled()) {{
if(trace_backward) {{
......@@ -2274,7 +2271,6 @@ class DygraphNodeGenerator(DygraphFunctionGeneratorBase):
egr::EagerUtils::HandleViewBetweenInputAndOutput({inplace_grad_input_str}, api_output_{out_index});
}}"""
if IsPlainTensorType(ttype):
if (
backward_inplace_map
and name in backward_inplace_map.values()
......
......@@ -604,7 +604,6 @@ def GenerateCoreOpsInfoMap():
def GeneratePythonCWrappers(python_c_function_str, python_c_function_reg_str):
(
core_ops_infos_definition,
core_ops_infos_registry,
......
......@@ -500,7 +500,7 @@ def parse_get_expected_kerneltype(
fw_name = op_comp_map['op'].split('(')[0].strip()
# deal the last underline of function name in op_comp_map['get_expected_kernel_type']
new_get_expected_kernel_type_func_map = {}
for (key, value) in op_comp_map['get_expected_kernel_type'].items():
for key, value in op_comp_map['get_expected_kernel_type'].items():
new_get_expected_kernel_type_func_map[
delete_last_underline(key)
] = value
......
......@@ -615,7 +615,7 @@ def cross_validate(ops):
assert len(fw_call["inputs"]) <= len(
fw_op["inputs"]
), f"{name}: forward call has more inputs than the op "
for (input, input_) in zip(fw_call["inputs"], fw_op["inputs"]):
for input, input_ in zip(fw_call["inputs"], fw_op["inputs"]):
assert (
input["typename"] == input_["typename"]
), f"type mismatch in {name} and {fw_name}"
......@@ -623,7 +623,7 @@ def cross_validate(ops):
assert len(fw_call["attrs"]) <= len(
fw_op["attrs"]
), f"{name}: forward call has more attrs than the op "
for (attr, attr_) in zip(fw_call["attrs"], fw_op["attrs"]):
for attr, attr_ in zip(fw_call["attrs"], fw_op["attrs"]):
if attr["typename"] == "Scalar":
# special case for Scalar, fw_call can omit the type
assert re.match(
......@@ -637,7 +637,7 @@ def cross_validate(ops):
assert len(fw_call["outputs"]) == len(
fw_op["outputs"]
), f"{name}: forward call has more outputs than the op "
for (output, output_) in zip(
for output, output_ in zip(
fw_call["outputs"], fw_op["outputs"]
):
assert (
......
......@@ -316,7 +316,6 @@ def generate_backward_api(
header_file_path,
source_file_path,
):
bw_apis = []
for each_api_yaml in backward_yaml_path:
with open(each_api_yaml, 'r') as f:
......
......@@ -92,7 +92,6 @@ def generate_intermediate_api(
dygraph_header_file_path,
dygraph_source_file_path,
):
dygraph_header_file = open(dygraph_header_file_path, 'w')
dygraph_source_file = open(dygraph_source_file_path, 'w')
......
......@@ -351,7 +351,6 @@ namespace sparse {
def generate_api(api_yaml_path, header_file_path, source_file_path):
with open(api_yaml_path, 'r') as f:
apis = yaml.load(f, Loader=yaml.FullLoader)
header_file = open(header_file_path, 'w')
......
......@@ -158,7 +158,6 @@ namespace sparse {
def generate_api(api_yaml_path, header_file_path, source_file_path):
with open(api_yaml_path, 'r') as f:
apis = yaml.load(f, Loader=yaml.FullLoader)
header_file = open(header_file_path, 'w')
......
......@@ -362,7 +362,6 @@ namespace strings {
def generate_api(api_yaml_path, header_file_path, source_file_path):
with open(api_yaml_path, 'r') as f:
apis = yaml.load(f, Loader=yaml.FullLoader)
header_file = open(header_file_path, 'w')
......
......@@ -68,7 +68,6 @@ def CreateGatherGemmScatterOperator(
for tile_description in tile_descriptions:
for alignment in alignment_constraints:
for complex_transform in complex_transforms:
alignment_c = min(8, alignment)
A = TensorDescription(
......@@ -98,7 +97,6 @@ def CreateGatherGemmScatterOperator(
def GenerateSM80_TensorOp_16816(manifest, cuda_version, debug=False):
if not CudaToolkitVersionSatisfies(cuda_version, 11, 0):
return
......@@ -211,7 +209,6 @@ def GenerateSM80_TensorOp_16816(manifest, cuda_version, debug=False):
# Avoid emitting two kernels if the accumulator type does not differ from the input type (e.g. F16 accumulation)
if math_inst.element_a != math_inst.element_accumulator:
data_type_mixed = [
math_inst.element_a,
math_inst.element_b,
......@@ -225,7 +222,6 @@ def GenerateSM80_TensorOp_16816(manifest, cuda_version, debug=False):
def GenerateSM80_TensorOp_1688(manifest, cuda_version, debug=False):
if not CudaToolkitVersionSatisfies(cuda_version, 11, 0):
return
......@@ -341,7 +337,6 @@ def GenerateSM80_TensorOp_1688(manifest, cuda_version, debug=False):
def GenerateSM80_TensorOp_1688_fast_math(manifest, cuda_version, debug=False):
if not CudaToolkitVersionSatisfies(cuda_version, 11, 0):
return
......@@ -443,7 +438,6 @@ def GenerateSM80_TensorOp_1688_fast_math(manifest, cuda_version, debug=False):
def GenerateSM80_TensorOp_1688_fast_fp32_math(
manifest, cuda_version, debug=False
):
if not CudaToolkitVersionSatisfies(cuda_version, 11, 0):
return
......@@ -525,7 +519,6 @@ def GenerateSM80_TensorOp_1688_fast_fp32_math(
def GenerateSM75_TensorOp_1688(manifest, cuda_version, debug=False):
if not CudaToolkitVersionSatisfies(cuda_version, 10, 2):
return
......@@ -649,7 +642,6 @@ class KernelCfg:
if __name__ == "__main__":
args = KernelCfg(
architectures='80',
build_dir=sys.argv[2],
......
......@@ -156,7 +156,6 @@ launchKernel<"""
class GatherGemmScatterManifest(Manifest):
def emit(self, target=GeneratorTarget.Library):
operation_emitters = {
GeneratorTarget.Library: GatherGemmScatterEmitOperationKindLibrary
}
......
......@@ -89,7 +89,6 @@ struct ${operation_name} {
return ""
def emit(self, operation):
threadblock_shape = operation.tile_description.threadblock_shape
warp_count = operation.tile_description.warp_count
......@@ -107,7 +106,6 @@ struct ${operation_name} {
and operation.B.layout in transpose_layouts.keys()
and operation.C.layout in transpose_layouts.keys()
):
instance_layout_A = transpose_layouts[operation.A.layout]
instance_layout_B = transpose_layouts[operation.B.layout]
instance_layout_C = transpose_layouts[operation.C.layout]
......@@ -124,7 +122,6 @@ struct ${operation_name} {
# Support built-in epilogue functors or user-defined functions
if isinstance(operation.epilogue_functor, enum.Enum):
epilogue_vector_length = (
min(
operation.C.alignment * DataTypeSize[operation.C.element],
......@@ -256,7 +253,6 @@ namespace sparse {
return self
def __exit__(self, exception_type, exception_value, traceback):
# Write instance definitions in top-level namespace
for instance_definition in self.instance_definitions:
self.configuration_file.write(instance_definition)
......@@ -278,7 +274,6 @@ class GatherGemmScatterOperation(GemmOperation):
epilogue_functor=EpilogueFunctor.LinearCombination,
swizzling_functor=SwizzlingFunctor.Identity8,
):
super().__init__(
gemm_kind,
arch,
......
......@@ -458,7 +458,6 @@ class ExcelWriter:
def add_worksheet(
self, mp_tensor_info_list, sheetname, loss_scale, skip_normal_tensors
):
assert self.workbook is not None
worksheet = self.workbook.add_worksheet(sheetname)
......
......@@ -137,7 +137,6 @@ class TensorCheckerConfig:
debug_step=None,
stack_height_limit=1,
):
self.enable = enable
self.debug_mode = debug_mode
self.output_dir = output_dir
......
......@@ -98,7 +98,6 @@ class AmpScaler:
decr_every_n_nan_or_inf=1,
use_dynamic_loss_scaling=True,
):
tracer = _dygraph_tracer()
if not tracer:
raise ValueError(
......
......@@ -52,7 +52,6 @@ class CostModel:
device='gpu',
fetch_cost_list=['time'],
):
place = paddle.set_device('gpu')
x = np.random.random(size=(10, 1)).astype('float32')
exe = paddle.static.Executor(place)
......
......@@ -151,7 +151,6 @@ def reader_creator(
):
def reader():
for sentence, predicate, labels in corpus_reader():
sen_len = len(sentence)
verb_index = labels.index('B-V')
......
......@@ -42,7 +42,6 @@ CACHE_DIR = 'voc2012'
def reader_creator(filename, sub_name):
tarobject = tarfile.open(filename)
name2mem = {}
for ele in tarobject.getmembers():
......
......@@ -69,7 +69,6 @@ def parallel_manual_seed(seed):
def determinate_rng(rank, dims_mapping, process_mesh):
# TODO(JZ-LIANG) Support Mesh with any high rank
# use a string to unique integer hashing algorithm for seed computation.
# instead of using offsets to coodinate seed across devices.
......@@ -119,7 +118,6 @@ def determinate_rng(rank, dims_mapping, process_mesh):
def init_auto_parallel_rng():
if not is_enable_auto_rand_ctrl():
return
......
......@@ -319,7 +319,7 @@ class AutoAlignTool:
assert os.path.isfile(filepath)
if "vars" in filename:
assert filename.endswith("pkl")
with (open(filepath, "rb")) as f:
with open(filepath, "rb") as f:
vars_list.append(pickle.load(f))
elif "program" in filename:
assert filename.endswith("pdmodel")
......@@ -328,7 +328,7 @@ class AutoAlignTool:
program_list.append(deserialize_program(program_string))
elif "dist_attr" in filename:
assert filename.endswith("pkl")
with (open(filepath, "rb")) as f:
with open(filepath, "rb") as f:
dist_attr_list.append(pickle.load(f))
dist_attr_map = {}
......
......@@ -147,7 +147,6 @@ class Device:
class Link:
default_hop = 1
default_nic_bandwidth = 24
......
......@@ -1257,7 +1257,6 @@ class Completer:
# grad ops that have not a corresponding mapping in grad_op_id_to_op_id
else:
if grad_op.type == 'sum':
assert all(map(_is_grad_var_name, grad_op.input_arg_names))
output_name = grad_op.output_arg_names[0]
......@@ -1382,7 +1381,6 @@ class Completer:
]
for idx in range(first_backward_op_idx, len(ops)):
# complete the initial grad loss op
if idx == first_backward_op_idx:
assert ops[idx].type == "fill_constant"
......@@ -1656,7 +1654,6 @@ class Completer:
learning_rate_completed = False
for idx in range(len(ops)):
# complete the annotation of the optimizer op.
# TODO to add attribute for moment var
op = ops[idx]
......@@ -1823,7 +1820,6 @@ class Completer:
)
for input_name in op.desc.input_names():
if input_name in [
'Param',
'Grad',
......
......@@ -316,7 +316,6 @@ class CostModel:
if pred.type == CostNodeType.COMPUTATION and (
pred_id in graph[node_id][SUCC]
):
graph[pred_id][SUCC].remove(node_id)
graph[node_id][PRED].remove(pred_id)
......
......@@ -1154,7 +1154,6 @@ class DistributedOperatorContext:
return self._exceed_backward_init_op
def prepare_context(self, src_op):
self._cur_src_op = src_op
if is_loss_grad_op(src_op):
......@@ -1189,14 +1188,12 @@ class BlockState:
self.backward_to_forward_index_map = {}
def parse_forward_blocks(self, program):
while program.current_block_idx != 0:
program._rollback()
assert program.current_block_idx == 0
for idx, block in enumerate(program.blocks):
assert idx == block.idx, "index doesn't match"
assert (
block.forward_block_idx == -1
......@@ -1209,14 +1206,12 @@ class BlockState:
assert self.nblock >= 1
def parse_backward_blocks(self, program):
assert 0 in self.forward_indices, "forward block idx are{}".format(
self.forward_indices
)
self.backward_to_forward_index_map[0] = 0
for idx, block in enumerate(program.blocks):
if idx < len(self.forward_indices):
continue
......
......@@ -152,7 +152,6 @@ class DistributedSaver:
return state_dict, dist_attr
def save_inference_model(self, path, feed_vars, fetch_vars, exe, **kwargs):
dirname, filename = _process_path(path)
# save distributed inference program
......
......@@ -124,7 +124,6 @@ class Engine:
cluster=None,
strategy=None,
):
if (
model
and not isinstance(model, paddle.nn.Layer)
......@@ -1411,7 +1410,6 @@ class Engine:
epochs=1,
steps_per_epoch=None,
):
dist_context = self._dist_contexts[self._mode]
dist_main_prog = dist_context.dist_main_programs[self._cur_rank]
dist_startup_prog = dist_context.dist_startup_programs[self._cur_rank]
......@@ -1472,7 +1470,6 @@ class Engine:
steps_per_epoch=None,
collate_fn=None,
):
dist_context = self._dist_contexts[self._mode]
dist_main_prog = dist_context.dist_main_programs[self._cur_rank]
dist_startup_prog = dist_context.dist_startup_programs[self._cur_rank]
......
......@@ -338,7 +338,6 @@ def set_comm_op_dist_attr_for_program(
def naive_copy_op_dist_attr_for_program(new_op, ref_op, ctx):
ref_dist_attr = ctx.get_op_dist_attr_for_program(ref_op)
new_op_dist_attr = OperatorDistAttr()
new_op_dist_attr.process_mesh = ref_dist_attr.process_mesh
......
......@@ -77,7 +77,6 @@ class DistributedCheckFiniteAndUnscaleImpl(DistributedOperatorImpl):
@staticmethod
def backward(ctx, *args, **kwargs):
# by now the backward function only insert the gradient allreduce for dist op itself
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.main_block
......
......@@ -570,7 +570,6 @@ class DistributedDefaultImpl0(DistributedOperatorImpl):
@staticmethod
def backward(ctx, *args, **kwargs):
# by now the backward function only insert the gradient allreduce for dist op itself
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
......
......@@ -52,7 +52,6 @@ class DistributedDropoutImpl0(DistributedElementwiseImpl0):
@staticmethod
def forward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
startup_block = dist_op_context.startup_block
......@@ -61,7 +60,6 @@ class DistributedDropoutImpl0(DistributedElementwiseImpl0):
op_dist_attr = ctx.get_op_dist_attr_for_program(src_op)
if is_enable_auto_rand_ctrl() and not op_dist_attr.is_recompute:
assert (
op_dist_attr is not None
), f"forward op [{str(src_op)}] don't have dist attribute !"
......
......@@ -70,7 +70,6 @@ register_distributed_operator_impl_container(
def adopt_lookup_table_v1(ctx, main_block, src_op, Ids_var):
assert (
len(Ids_var.shape) == 3
), "input Ids to lookup_table should have 3 dimensions but got [{}] with shape [{}]".format(
......@@ -577,7 +576,6 @@ class DistributedEmbeddingImpl(DistributedOperatorImpl):
@staticmethod
def backward(ctx, *args, **kwargs):
# by now the backward function only insert the gradient allreduce for dist op itself
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
......
......@@ -75,7 +75,6 @@ class DistributedFillConstantBatchSizeLikeImpl0(DistributedOperatorImpl):
return res_cost
def is_input_compatible(self, dist_op):
return True
def is_output_compatible(self, dist_op):
......
......@@ -52,7 +52,6 @@ class DistributedFlashAttnImpl0(DistributedElementwiseImpl0):
@staticmethod
def forward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
startup_block = dist_op_context.startup_block
......@@ -65,7 +64,6 @@ class DistributedFlashAttnImpl0(DistributedElementwiseImpl0):
and not op_dist_attr.is_recompute
and rank_id in op_dist_attr.process_mesh.process_ids
):
assert (
op_dist_attr is not None
), f"forward op [{str(src_op)}] don't have dist attribute !"
......
......@@ -154,7 +154,6 @@ class DistributedFusedAttentionImpl(DistributedOperatorImpl):
@staticmethod
def forward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
startup_block = dist_op_context.startup_block
......
......@@ -71,7 +71,6 @@ class DistributedDropoutImpl0(DistributedElementwiseImpl0):
op_dist_attr = ctx.get_op_dist_attr_for_program(src_op)
if is_enable_auto_rand_ctrl() and not op_dist_attr.is_recompute:
assert (
op_dist_attr is not None
), f"forward op [{str(src_op)}] don't have dist attribute !"
......
......@@ -146,7 +146,6 @@ class DistributedFusedFeedForwardImpl(DistributedOperatorImpl):
@staticmethod
def forward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
startup_block = dist_op_context.startup_block
......@@ -188,7 +187,6 @@ class DistributedFusedFeedForwardImpl(DistributedOperatorImpl):
@staticmethod
def backward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
startup_block = dist_op_context.startup_block
......
......@@ -305,7 +305,6 @@ def _is_auto_compatible_for_matmul(dist_op):
def _right_operand_parameter_matmul_backward(ctx, *args, **kwargs):
# by now the backward function only insert the gradient allreduce for dist op itself
dist_op_context = ctx.dist_op_context
......@@ -386,7 +385,6 @@ def _right_operand_parameter_matmul_backward(ctx, *args, **kwargs):
break
if is_parameter_related(Y_var.name, main_block) and Y_var_partitioned:
if Y_var_dim_mapping[0] >= 0:
# row parallel: c_identity + matmul
assert Y_var_dim_mapping[1] < 0
......@@ -541,7 +539,6 @@ def _right_operand_parameter_matmul_backward(ctx, *args, **kwargs):
def _init_param_sync(Weight_var, dist_op_context, startup_block, ctx, rank_id):
if Weight_var.name in dist_op_context.already_init_sync_vars:
return
assert startup_block.has_var(Weight_var.name)
......
......@@ -157,7 +157,6 @@ class DistributedPNormImpl0(DistributedOperatorImpl):
@staticmethod
def forward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
src_op = dist_op_context.cur_src_op
......@@ -271,7 +270,6 @@ class DistributedPNormImpl0(DistributedOperatorImpl):
@staticmethod
def backward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
backward_op = dist_op_context.cur_src_op
......
......@@ -78,7 +78,6 @@ class DistributedReduceSumPrimtiveImpl0(DistributedOperatorImpl):
@staticmethod
def forward(ctx, *args, **kwargs):
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.work_block
startup_block = dist_op_context.startup_block
......
......@@ -66,7 +66,6 @@ class DistributedUpdateLossScalingImpl(DistributedOperatorImpl):
@staticmethod
def backward(ctx, *args, **kwargs):
# the backward function only filter the gradient with current rank id
dist_op_context = ctx.dist_op_context
main_block = dist_op_context.main_block
......
......@@ -143,7 +143,6 @@ class AutoParallelizer:
no_grad_set,
callbacks,
):
with program_guard(main_program, startup_program):
params_grads = append_backward(
loss,
......@@ -158,7 +157,6 @@ class AutoParallelizer:
return params_grads
def _apply_optimize(self, main_program, startup_program, params_grads):
optimizer = copy.deepcopy(self._optimizer)
with program_guard(main_program, startup_program):
optimize_ops = optimizer.apply_gradients(params_grads)
......@@ -173,7 +171,6 @@ class AutoParallelizer:
def _apply_post_optimization_passes(
self, main_program, startup_program, rank, params_grads
):
if self._dist_strategy.sharding:
config = copy.deepcopy(self._dist_strategy.sharding_configs)
config["dist_context"] = self._dist_context
......
......@@ -112,7 +112,6 @@ class Partitioner:
def partition_startup_program(
self, serial_main_program, serial_startup_program
):
if not isinstance(serial_startup_program, (Program)):
raise TypeError(
"dist_context be paddle.framework.Program, got %s here"
......@@ -232,7 +231,6 @@ class Partitioner:
return partitioned_main_prog, partitioned_params_and_grads
def partition_block(self, ref_block, target_block):
dist_op_context = self._dist_context.dist_op_context
serial_ops = ref_block.ops
......@@ -256,7 +254,6 @@ class Partitioner:
# partition
appended_grad_times = 0
for idx, op in enumerate(serial_ops):
op_dist_attr = self._dist_context.get_op_dist_attr_for_program(op)
if is_backward_op(op) and (
is_forward_op(serial_ops[idx - 1])
......@@ -358,7 +355,6 @@ class Partitioner:
)
def _is_valid_annotated_program(self, program):
# TODO (ZJ-LIANG) should check all block
ops = program.global_block().ops
vars_ = program.list_vars()
......@@ -381,7 +377,6 @@ class Partitioner:
return all_ops_annotated and all_vars_annotated
def _get_dist_var_by_serial_var(self, serial_var, partitioned_main_prog):
block_idx = serial_var.block.idx
target_block = partitioned_main_prog.blocks[block_idx]
dist_var_name = self._serial2dist_varname_mapping[serial_var.name]
......@@ -390,7 +385,6 @@ class Partitioner:
def _get_dist_shape(var, dist_attr):
var_shape = var.shape
mapping = dist_attr.dims_mapping
mesh = dist_attr.process_mesh.shape
......
......@@ -58,7 +58,6 @@ def remove_process_group(ring_id):
def new_process_group(
ranks, group_id=None, force_new_group=False, group_type=None
):
global _g_process_group_map
if not force_new_group:
# A key constructed from ranks is used for avoiding duplication
......
......@@ -106,7 +106,6 @@ def new_algorithm(name, config):
@register_algor("sharding")
class ShardingStageAlgorithm(AlgorithmBase):
# TODO import trial class & copy strategy
def __init__(self, config):
super().__init__(config)
......@@ -131,9 +130,7 @@ class ShardingStageAlgorithm(AlgorithmBase):
self._total_num_trial = len(self._stage_range)
def next_trial(self):
if self._trial_idx < self._total_num_trial:
stage = self._stage_range[self._trial_idx]
new_strategy = copy.deepcopy(self._config.dist_strategy)
......@@ -148,7 +145,6 @@ class ShardingStageAlgorithm(AlgorithmBase):
return Trial(None, None, None, status=TrialStatus.STOPPED)
def update(self, results):
et = results.get("ErrorType", None)
if et and et == "ResourceExhaustedError":
self._trial_idx = self._total_num_trial
......@@ -211,7 +207,6 @@ class ReccomputeCheckpointAlgorithm(AlgorithmBase):
return Trial(None, None, None, status=TrialStatus.STOPPED)
def update(self, results):
et = results.get("ErrorType", None)
if self._recompute_mode == "all":
if et and et == "ResourceExhaustedError":
......
......@@ -33,7 +33,6 @@ class TuningConfig:
"""
def __init__(self, strategy):
if not isinstance(strategy, Strategy):
raise TypeError("'strategy' must be object of class `Strategy`.")
......
......@@ -111,7 +111,6 @@ def parse_results(results):
# TODO only dependent on dist context
# all env need to be start a new pass are member of dist context
def _copy_context(ref_dist_context):
# clear all process groups and recover the world process group
clear_all_process_groups()
ranks = []
......@@ -210,7 +209,6 @@ class OptimizationTuner:
batch_size,
rank,
):
self._config = TuningConfig(dist_context.strategy)
# should not modify dist context from calling function
self._baseline_dist_context = _copy_context(dist_context)
......@@ -250,7 +248,6 @@ class OptimizationTuner:
# TODO Generate compelet program with all parts like forward, backward, update
# as well as parallelism transformation.
def _build_programs_without_optimization(self):
serial_main_program = self._baseline_dist_context.serial_main_program
serial_startup_program = (
self._baseline_dist_context.serial_startup_program
......@@ -287,7 +284,6 @@ class OptimizationTuner:
)
def _select_tuning_algorithm(self):
selected_passes_set = self._config.tuning_passes_name
algorithm_name = "_".join(sorted(selected_passes_set))
self._algorithm = new_algorithm(algorithm_name, self._config)
......@@ -415,7 +411,6 @@ class OptimizationTuner:
return trial
def _get_profile_context(self, trial, result_path):
profile_ctx = {}
profile_ctx['distributed_env'] = copy.deepcopy(
......@@ -446,7 +441,6 @@ class OptimizationTuner:
return input_names
def _launch_profile(self, ctx_path, trial_dir):
if os.environ.get("WITH_COVERAGE", "OFF") == "ON":
coverage_args = ["-m", "coverage", "run", "--branch", "-p"]
else:
......@@ -528,7 +522,6 @@ class OptimizationTuner:
return Error_results
def _evaluate_trial(self, trial):
self._logger.info(f"Trial {trial.name} evaluation start.")
self._apply_optimization(trial)
......
......@@ -96,7 +96,6 @@ def init_process_groups(group_map, rank):
def get_cpp_error_type(error):
msg = str(error).splitlines()
cpp_error_types = [
'InvalidArgumentError',
......@@ -123,7 +122,6 @@ def get_cpp_error_type(error):
def create_dataloader(
main_program, startup_program, profile_ctx, epochs=1, steps_per_epoch=None
):
dataset = profile_ctx["dataset"]
main_block = main_program.global_block()
feed_list = []
......@@ -274,7 +272,6 @@ def profiler(args):
data_loader._inner_dataloader.reset()
except Exception as e:
error_type = get_cpp_error_type(e)
result_dict = {
"Throughtput": -1,
......
......@@ -1822,7 +1822,6 @@ class RuleBasedTuner:
)
for input_name in op.desc.input_names():
if input_name in [
'Param',
'Grad',
......
......@@ -130,7 +130,6 @@ class OptimizationTunerTrial(Trial):
return self._name
def summary(self):
spacing = 2
max_k = 38
max_v = 38
......
......@@ -422,7 +422,6 @@ def _linear_idx2coordinate(mesh_shape, linear_idx):
def _get_corresponding_rank(dist_context, target_mesh, rank):
# TODO(JZ-LIANG) a hack method to support varying mesh in Pipeline parallelism case.
# we assume that all mesh are evenly divide from a parent mesh and should have same size.
# to revise this in future.
......@@ -1190,7 +1189,6 @@ def set_grad_var_shape(program, dist_context):
grad_var_to_var = dist_context.dist_op_context.grad_var_to_var
for idx, op in enumerate(block.ops):
if int(op.attr('op_role')) != int(OpRole.Backward):
continue
......@@ -1210,7 +1208,6 @@ def set_grad_var_shape(program, dist_context):
assert op_dist_attr is not None
for var_name in op.output_arg_names:
if "@GRAD" not in var_name:
continue
if var_name in grad_var_to_var[appended_grad_times]:
......@@ -1809,7 +1806,6 @@ def to_list(value):
def debug_program(program, path, name):
filename = os.path.join(
path, name + '_program' + ".%d" % (paddle.distributed.get_rank())
)
......@@ -1827,7 +1823,6 @@ def ring_id_to_process_group(ring_id):
def find_higher_order_backward_op(program):
higher_order_op_suffix = ['_grad_grad', 'triple_grad']
for block in program.blocks:
for op in block.ops:
......@@ -2237,7 +2232,6 @@ def insert_dependencies_for_two_ops(
)
def _select_best_depend_var(vars):
# parameter should not be dep var since it maybe partition in sharding pass
vars = [var for var in vars if not var.is_parameter]
assert len(vars) > 0
......
......@@ -58,7 +58,6 @@ def gather(
sync_op=True,
use_calc_stream=False,
):
"""
Gather tensors from all participators.
......@@ -120,7 +119,6 @@ def gather(
)
gather_list = []
else:
assert (
gather_list is not None
), "gather_list must not be none for dst rank"
......
......@@ -44,7 +44,6 @@ class Command:
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Elastic Command')
parser.add_argument(
"--elastic_server", type=str, help="etcd server host:port"
......
......@@ -83,7 +83,6 @@ class Gloo:
need_init_all=False,
kwargs=None,
):
self._rendezvous = rendezvous
self._role = role
self._role_id = role_id
......
......@@ -47,7 +47,6 @@ def enable_elastic(args, distribute_mode):
def launch_elastic(args, distribute_mode):
server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
srv, port = server.split(':')
import etcd3
......@@ -60,7 +59,6 @@ def launch_elastic(args, distribute_mode):
signal.signal(signal.SIGINT, elastic.signal_handler)
while True:
# wait for all nodes ready to run
elastic.wait()
......
......@@ -123,7 +123,6 @@ class LauncherInterface:
class ElasticManager:
def __init__(self, args, etcd_client):
self.args = args
server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')
......@@ -603,7 +602,6 @@ class ElasticManager:
self.launcher.launch()
def watch(self):
if self.need_sync:
self.need_sync = False
......
......@@ -494,7 +494,6 @@ def run_with_coverage(*args):
def start_local_trainers(
cluster, pod, training_script, training_script_args, log_dir=None, envs=None
):
if envs is None:
current_env = copy.copy(os.environ.copy())
else:
......
......@@ -59,7 +59,6 @@ class ASPOptimizer(MetaOptimizerBase):
def minimize_impl(
self, loss, startup_program=None, parameter_list=None, no_grad_set=None
):
optimize_ops, params_grads = ASPHelper._minimize(
self.inner_opt,
loss,
......
......@@ -51,7 +51,6 @@ class DygraphShardingOptimizer:
inner_optimizer_class,
**inner_optimizer_kargs
):
if not isinstance(params, list):
raise TypeError(
"`parameters` argument given to the DygraphShardingOptimizer should be "
......@@ -89,7 +88,6 @@ class DygraphShardingOptimizer:
p.clear_gradient()
def _build_sharding_mapping(self):
self._rank2params = self._partition_parameters()
self._param2rank = self._map_param_to_rank()
......@@ -172,7 +170,6 @@ class DygraphShardingOptimizer:
def minimize(
self, loss, startup_program=None, parameters=None, no_grad_set=None
):
# NOTE in dygraph mode, the only different between step and minimize is that minimize
# allow user to customize the parameters for updating on each step
......
......@@ -51,7 +51,6 @@ class HeterParallelOptimizer:
def minimize(
self, loss, startup_program=None, parameters=None, no_grad_set=None
):
# minimize does not support parameters in the form of param_group,
# so no need use _obtain_optimizer_parameters_list
parameter_list = (
......
......@@ -428,7 +428,6 @@ class HybridParallelOptimizer:
def minimize(
self, loss, startup_program=None, parameters=None, no_grad_set=None
):
# minimize does not support parameters in the form of param_group,
# so no need use _obtain_optimizer_parameters_list
parameter_list = (
......
......@@ -196,7 +196,6 @@ class ParameterServerOptimizer(MetaOptimizerBase):
)
if not compiled_config.is_geo_mode():
from paddle.incubate.distributed.fleet.parameter_server.ir.public import (
_get_optimize_ops,
)
......
......@@ -47,7 +47,6 @@ class ProgramDeps:
def _build_deps(
self,
):
for var_name in self._start_vars:
self._var_to_use_op[var_name] = []
self._var_to_generate_op[var_name] = []
......
......@@ -834,7 +834,6 @@ def get_grad_device(grad_name, shard):
def get_first_check_finite_and_unscale_op_idx(block, raise_error=True):
for idx, op in enumerate(block.ops):
if op.type == "check_finite_and_unscale":
return idx
......
......@@ -848,7 +848,6 @@ class ShardingOptimizer(MetaOptimizerBase):
elif self._sharding_segment_strategy == "segment_anchors":
if int(op.attr('op_role')) == int(OpRole.Backward):
for input_name in op.desc.input_arg_names():
# NOTE (JZ-LIANG) naive rule to support amp, if amp change, should modify here accordingly
if self.user_defined_strategy.amp:
if ".cast_fp16@GRAD" not in input_name:
......@@ -1766,7 +1765,6 @@ class ShardingOptimizer(MetaOptimizerBase):
def create_persistable_gradients_and_insert_merge_ops(
self, main_block, startup_block, insert_idx, grad_names, shard
):
for grad_name in grad_names:
assert (
get_grad_device(grad_name, shard) == shard.worker_idx
......
......@@ -109,7 +109,6 @@ class SegmentLayers:
), "layer number should be greater than number of segments"
def do_segment(self):
if isinstance(self.method, list):
seg_method = self.method[:]
source_num_parts = len(seg_method) - 1
......
......@@ -357,7 +357,6 @@ def _p2p_helper(
tensor_recv_prev.append(tmp)
tensor_recv_prev = tuple(tensor_recv_prev)
else:
tensor_recv_prev = paddle.empty(
shape=recv_shape_msg, dtype=number_2_dtype(recv_dtype_msg)
)
......
......@@ -245,7 +245,6 @@ class FusedCommBuffer:
def assign_group_by_size(parameters, group_size=128 * 1024 * 1024):
group_idx = 0
memory_counter = 0
var_groups = OrderedDict()
......
......@@ -79,7 +79,6 @@ class GroupShardedOptimizerStage2(Optimizer):
dp_group=None,
**kw
):
super().__init__(learning_rate=optim._learning_rate, parameters=params)
assert (
core.is_compiled_with_cuda()
......@@ -418,7 +417,6 @@ class GroupShardedOptimizerStage2(Optimizer):
for dst_rank, params in enumerate(per_rank_params):
if len(params) > 0:
# Merge all the trainable params in a single InternalStorage
trainable_params = list(
filter(lambda x: x.trainable, params)
......
......@@ -780,7 +780,6 @@ def ForwardPreHooks(
offload,
task_flow,
):
# Record layer's id
layer_id = id(layer)
use_calc, sync_wait = False, False
......@@ -837,7 +836,6 @@ class ForwardPostHooks(PyLayer):
offload,
task_flow,
):
layer_id = id(layer)
# release current layer full params
_release_param(
......@@ -970,7 +968,6 @@ def _wait_layer(
use_calc_stream,
offload=False,
):
for param in trainable_params:
if param.status == "all":
param.use_count += 1
......@@ -1007,7 +1004,6 @@ def _allgather_buffer(
offload=False,
convert2cpu=False,
):
for param in trainable_params:
if param.status == "all":
param.use_count += 1
......
......@@ -157,7 +157,6 @@ class ParamStorage(InternalStorage):
@paddle.autograd.no_grad()
def _add_param_as_view(self, param, align, convert_gpu=True):
assert (
param.dtype == self.buffer.dtype
), "Different types for the InternalStorage and the param, cannot proceed: {} - {}".format(
......@@ -192,7 +191,6 @@ class ParamStorage(InternalStorage):
@paddle.autograd.no_grad()
def _convert_buffer(self, param, p_shape, align):
var_end = self._fill + np.prod(p_shape).tolist()
offset = var_end + align
assert offset <= self.buffer._numel()
......
......@@ -238,7 +238,6 @@ def GroupShardedScaler(scaler):
if getattr(optimizer._optim, '_param_groups', None) and isinstance(
optimizer._optim._param_groups[0], dict
):
for group in optimizer._optim._param_groups:
for param in group['params']:
if param.grad is not None:
......
......@@ -134,7 +134,6 @@ def distributed_model(model):
if fleet_env._hcg.get_parallel_mode() == ParallelMode.SHARDING_PARALLEL:
model = ShardingParallel(model, fleet_env._hcg, strategy=strategy)
elif fleet_env._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL:
# NOTE (JZ-LIANG) init parameters broadcast within sharding group
# normally it should be done inside DataParallel
if fleet_env.sharding_degree > 1:
......
......@@ -29,7 +29,6 @@ __all__ = []
def _split_activation(tensor, mp_group):
mp_degree = mp_group.nranks
mp_rank = mp_group.rank
if mp_degree < 2:
......@@ -87,7 +86,6 @@ class _HPRecomputeFunction(PyLayer):
*args,
**kwargs,
):
# store for recomputing
ctx.run_function = run_function
......
......@@ -354,7 +354,7 @@ class CommonAccessor:
attr_varnames = self.opt_attr_map[oop.type]
self.accessor_class = oop.type
for (formal_name, shape) in param_varnames:
for formal_name, shape in param_varnames:
params.append(formal_name)
if self.accessor_class == "adam_d2sum":
# for dims
......@@ -424,7 +424,7 @@ class CommonAccessor:
)
initializers.append(initializer)
for (attr_varname, type_) in attr_varnames:
for attr_varname, type_ in attr_varnames:
value = oop.attr(attr_varname)
attrs.append("&".join([attr_varname, type_, str(value)]))
......@@ -1307,7 +1307,6 @@ class TheOnePSRuntime(RuntimeBase):
def _save_distributed_persistables(
self, executor, dirname, main_program, mode=0
):
denses = self.compiled_strategy.get_the_one_recv_context(
is_dense=True,
split_dense_table=self.role_maker._is_heter_parameter_server_mode,
......
......@@ -1223,7 +1223,6 @@ class AFSClient(FS):
return self._ls_dir(fs_path)
def _ls_dir(self, fs_path):
files = self._fs.list(fs_path)
dirs = [fs_path]
return dirs, files
......
......@@ -195,7 +195,6 @@ class HybridParallelInferenceHelper:
init_comm=True,
role_maker=None,
):
assert isinstance(startup_program, Program)
assert isinstance(main_program, Program)
......
......@@ -235,7 +235,6 @@ def sharding_reduce_gradients(parameter_list, hcg):
# TODO merge grad / nrank with dp
logger.debug("sharding start gradients sync")
with framework.no_grad():
sharding_nrank = hcg.get_sharding_parallel_group().nranks
for param in parameter_list:
g_var = None
......
......@@ -77,7 +77,6 @@ class MixPrecisionLayer(nn.Layer):
include_sublayers=True,
structured_name_prefix="",
):
return self._layers.state_dict(
destination=destination,
include_sublayers=include_sublayers,
......@@ -86,7 +85,6 @@ class MixPrecisionLayer(nn.Layer):
@framework.deprecate_stat_dict
def set_state_dict(self, state_dict, use_structured_name=True):
self._layers.set_state_dict(
state_dict, use_structured_name=use_structured_name
)
......@@ -113,7 +111,6 @@ class MixPrecisionOptimizer:
@imperative_base.no_grad
@framework.dygraph_only
def step(self):
if not isinstance(self._parameter_list[0], dict):
params_grads = []
for param in self._parameter_list:
......@@ -179,7 +176,6 @@ class MixPrecisionOptimizer:
@framework.dygraph_only
def clear_grad(self, set_to_zero=True):
param_list = []
if self._parameter_list is None or not isinstance(
self._parameter_list[0], dict
......
......@@ -65,7 +65,7 @@ class LayerReNamingManager:
def get_new_layer_name(self, old_name: str):
layer_name = ""
for (k, v) in self._renaming_helpers.items():
for k, v in self._renaming_helpers.items():
if old_name.startswith(k):
layer_name = v.get_new_layer_name(old_name)
break
......@@ -128,12 +128,12 @@ class PipeLineModelAdaptor:
]
# 4、merge layers belonging to the same node
for (layer_segment, dir_) in zip(layer_segments, dst_dirs):
for layer_segment, dir_ in zip(layer_segments, dst_dirs):
print(f"merge {len(layer_segment)} layers to {dir_}")
self.merge_layers(layer_segment, dir_)
# 5、copy meta_state.pdopt
for (src_dir, dst_dir) in zip(src_dirs, dst_dirs):
for src_dir, dst_dir in zip(src_dirs, dst_dirs):
shutil.copyfile(
f"{src_dir}/meta_state.pdopt",
f"{dst_dir}/meta_state.pdopt",
......@@ -155,7 +155,7 @@ class PipeLineModelAdaptor:
def peek_partial_model(self, sub_dir: str):
state_dict = paddle.load(f"{sub_dir}/model.pdparams")
for (k, v) in state_dict.items():
for k, v in state_dict.items():
print(f"\t{k} -> {v.name}")
def extract_layers(self, dir: str, with_shared: bool):
......@@ -164,7 +164,7 @@ class PipeLineModelAdaptor:
shared_layer_parsed = False
# tname -> (layer, param_name)
tname_to_layer_and_pname = {}
for (k, v) in params.items():
for k, v in params.items():
layer = self._extract_layer_name(k)
assert layer
# special treatment for embedding layer, skip duplicated shared layer
......@@ -192,7 +192,7 @@ class PipeLineModelAdaptor:
opt_to_t = self._opt_name_to_tname(tensor_names, opt_names)
# gather tensors belonging to one layer togather
layers = OrderedDict()
for (k, v) in params.items():
for k, v in params.items():
layer, p = tname_to_layer_and_pname[v.name]
if layer not in layers:
layers[layer] = {}
......@@ -201,14 +201,14 @@ class PipeLineModelAdaptor:
layers[layer]["master_weights"] = OrderedDict()
layers[layer]["params"][p] = v
for (k, v) in opt.items():
for k, v in opt.items():
if k in ["master_weights", "LR_Scheduler"]:
continue
layer, _ = tname_to_layer_and_pname[opt_to_t[v.name]]
layers[layer]["opt"][k] = v
if "master_weights" in opt:
for (k, v) in opt["master_weights"].items():
for k, v in opt["master_weights"].items():
layer, _ = tname_to_layer_and_pname[k]
layers[layer]["master_weights"][k] = v
......@@ -218,7 +218,7 @@ class PipeLineModelAdaptor:
ans = []
for (layer_name, layer) in layers.items():
for layer_name, layer in layers.items():
# special treatment for embedding layer
if (not with_shared) and "shared_layers" in layer_name:
continue
......@@ -311,7 +311,7 @@ class PipeLineModelAdaptor:
# name layers
segments = [[] for i in range(config.pp)]
for i in range(config.pp):
for (start, end) in index_segments[i]:
for start, end in index_segments[i]:
for j in range(start, end):
if config.vpp > 1:
segments[i].append(
......@@ -338,7 +338,7 @@ class PipeLineModelAdaptor:
for i in range(1, config.pp):
segments[i] = [([layers[0][0]], layers[0][1])] + segments[i]
for (pp_rank, segs) in enumerate(segments):
for pp_rank, segs in enumerate(segments):
print(f"segmentment result for pp_rank {pp_rank}:")
print(50 * "=")
for seg in segs:
......@@ -352,12 +352,12 @@ class PipeLineModelAdaptor:
renaming_manager = LayerReNamingManager()
def merge(src, dst, map_k=None):
for (k, v) in src.items():
for k, v in src.items():
k = map_k(k) if map_k is not None else k
dst[k] = v
lr_scheduler = None
for (layer_names, file_path) in layers_segment:
for layer_names, file_path in layers_segment:
print("load %s" % file_path)
layer = paddle.load(file_path)
......@@ -425,14 +425,14 @@ class PipeLineModelAdaptor:
# old name to new name
t_name_mapping = {}
# map tensor names
for (k, v) in params.items():
for k, v in params.items():
t_name_mapping[v.name] = renaming_manager.get_new_param_name(v.name)
v.name = t_name_mapping[v.name]
# map opt names
opt_to_tname = self._opt_name_to_tname(
t_name_mapping.keys(), opt.keys()
)
for (k, v) in opt.items():
for k, v in opt.items():
old_t_name = opt_to_tname[k]
t_name = t_name_mapping[old_t_name]
opt_name = t_name + k[len(old_t_name) :]
......@@ -440,7 +440,7 @@ class PipeLineModelAdaptor:
opt_renamed[opt_name] = v
# map master names
for (k, v) in master_weights.items():
for k, v in master_weights.items():
t_name = t_name_mapping[k]
v.name = t_name + v.name[len(k) :]
master_weights_renamed[t_name] = v
......@@ -448,7 +448,6 @@ class PipeLineModelAdaptor:
def parse_args():
parser = argparse.ArgumentParser(
prog='model converter', description='converter a model'
)
......@@ -591,7 +590,6 @@ def adaptor_from_args(args):
def main():
args = parse_args()
adaptor = adaptor_from_args(args)
if args.method == "peek_model":
......
......@@ -120,7 +120,6 @@ def copy_parameters(block_, params):
def insert_sync_op(
block, idx, tp_degree, sync_mode, sync_ring_id, src_rank, varname, op_role
):
if sync_mode == "broadcast":
block._insert_op_without_sync(
idx,
......@@ -171,11 +170,9 @@ def insert_synchronization(
sync_mode,
src_rank,
):
unsync_param_names = [p.name for p in params_to_sync]
for idx, op in reversed(list(enumerate(block.ops))):
if op.type in _supported_optimizer_type:
assert "Param" in op.input_names
assert len(op.input("Param")) == 1
......@@ -183,7 +180,6 @@ def insert_synchronization(
op_role = op.attr(OP_ROLE_KEY)
if param_name in unsync_param_names:
unsync_param_names.remove(param_name)
# Param sync after opt
......
......@@ -262,13 +262,11 @@ class CollectiveElasticController(CollectiveController):
self.master.register_heartbeat(self.job.id, self.pod.name)
def run(self):
timeout = int(self.ctx.args.elastic_timeout)
timeout = timeout if self.job.elastic else timeout * 10
self.register()
while self.pod.restart <= self.ctx.args.max_restart:
self.build_job()
self.ctx.logger.info("Waiting peer ready...")
......
......@@ -54,7 +54,6 @@ class ControllerBase:
self.join_server = None
def deploy_pod(self):
assert (
len(self.pod.containers) + len(self.pod.init_containers) > 0
), "No container in the pod"
......@@ -219,7 +218,6 @@ class Controller(ControllerBase):
log_file=None,
is_init=False,
):
if not container:
container = self.new_container(
entrypoint=entrypoint, envs=envs, out=log_file, err=log_file
......
......@@ -136,7 +136,6 @@ class HTTPMaster(Master):
self._stop_server()
def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
if size < 2:
return [value], 0
......
......@@ -115,7 +115,6 @@ class PSController(Controller):
self.add_container(envs=e, log_file=log_file)
def _build_pod_with_master(self):
self.pod.rank = int(self.ctx.args.rank)
server_num = self.ctx.args.server_num or 1
......
......@@ -287,14 +287,12 @@ def launch():
ctx = Context()
if ctx.is_legacy_mode():
# legacy mode
from paddle.distributed.fleet import launch
launch.launch()
else:
from . import controllers
# initialize the selected controller
......
......@@ -870,7 +870,6 @@ def _is_cpuonly(backend):
backend in ['auto', 'nccl', 'bkcl', 'heter']
and (core.is_compiled_with_cuda() or core.is_compiled_with_xpu())
) or backend == 'xccl':
# passes 'auto' and can use cuda or xpu, use the default logics. so return False
return False
else:
......
......@@ -231,7 +231,6 @@ class AMPState:
return is_train
def _mark_black_white_ops(self, op, ops, block):
# ernie inference trick
if op.type == "assign" and "array_" in op.input_arg_names[0]:
self._op_fp16_dict[op.desc.original_id()] = False
......@@ -814,7 +813,6 @@ class AMPPass(PassBase):
main_block._sync_with_cpp()
def _check_and_update_gradient(self):
main_block = paddle.static.default_main_program().global_block()
main_block._sync_with_cpp()
......@@ -916,7 +914,6 @@ class AMPPass(PassBase):
)
def _cast_loss(self):
main_block = paddle.static.default_main_program().global_block()
main_block._sync_with_cpp()
......@@ -928,7 +925,6 @@ class AMPPass(PassBase):
)
if loss.dtype != core.VarDesc.VarType.FP32:
tmp_name = unique_name.generate(loss.name + ".cast_fp32")
cast_loss = main_block.create_var(
name=tmp_name, dtype=core.VarDesc.VarType.FP32
......@@ -1010,7 +1006,6 @@ class AMPPass(PassBase):
main_block._sync_with_cpp()
def _scale_loss(self):
main_block = paddle.static.default_main_program().global_block()
loss = self.get_attr("loss")
assert loss is not None
......@@ -1023,7 +1018,6 @@ class AMPPass(PassBase):
self.get_attr("use_dynamic_loss_scaling")
or self.get_attr("init_loss_scaling") != 1.0
):
loss_op_idx = find_op_index(main_block.desc, loss_op.desc)
# forward
......@@ -1123,7 +1117,6 @@ class AMPPass(PassBase):
main_block._sync_with_cpp()
def _update_loss_scaling(self, grads, found_inf):
main_block = paddle.static.default_main_program().global_block()
main_block._sync_with_cpp()
......
......@@ -91,7 +91,6 @@ class DataParallelOptimizationPass(PassBase):
return PassType.COMM_OPT
def _apply_single_impl(self, main_program, startup_program, context):
self.dist_context = self.get_attr("dist_context")
self.global_rank = int(self.get_attr("global_rank"))
self.use_sharding = self.get_attr("use_sharding")
......@@ -110,7 +109,6 @@ class DataParallelOptimizationPass(PassBase):
self.summary(grad_group)
def _prune_grad_scaling(self):
if not self._could_be_prune():
return
......@@ -128,7 +126,6 @@ class DataParallelOptimizationPass(PassBase):
self._calc_wait_comms()
def _fuse_allreduce(self):
if not self._could_be_fuse():
return []
......@@ -149,7 +146,6 @@ class DataParallelOptimizationPass(PassBase):
scaled_grads = []
for op in ops:
if is_data_parallel_reduce_op(op):
grad_name = op.output_arg_names[0]
if grad_name in self._grad_name_to_group_map:
......@@ -198,7 +194,6 @@ class DataParallelOptimizationPass(PassBase):
return len(self._group_to_grad_name_map) > 0
def _could_be_prune(self):
return self.dist_context.gradient_scale and (
self._support_rescale_grad or self._all_dp_groups_same_degree()
)
......@@ -215,7 +210,6 @@ class DataParallelOptimizationPass(PassBase):
)
def _scale_backward_initial_grad(self):
block = default_main_program().global_block()
dp_degree = len(list(self._group_to_grad_name_map.keys())[0].ranks)
......@@ -241,7 +235,6 @@ class DataParallelOptimizationPass(PassBase):
block._sync_with_cpp()
def _update_opt_rescale_grad(self):
block = default_main_program().global_block()
scaled_grads = set()
......@@ -313,7 +306,6 @@ class DataParallelOptimizationPass(PassBase):
block._sync_with_cpp()
def _calc_wait_comms(self):
return
block = default_main_program().global_block()
......@@ -365,7 +357,6 @@ class DataParallelOptimizationPass(PassBase):
# here we try to wait for all kernel in that comm stream to be finish which is not that optimized.
for i in sorted(indices, reverse=True):
for ring_id in op_idx_to_sync_ring_id_map[i]:
block._insert_op_without_sync(
i,
type='c_wait_comm',
......@@ -451,13 +442,11 @@ class DataParallelOptimizationPass(PassBase):
return grad_groups
def _update_program(self, grad_groups):
block = default_main_program().global_block()
remove_op_types = ['scale', 'c_allreduce_sum', 'c_wait_compute']
for i, group in enumerate(grad_groups[::-1]):
# skip unfused big tensor
if len(group.gradients) <= 1:
group.coalesce_var = group.gradients[0]
......
......@@ -194,12 +194,10 @@ class FP16State:
return self.is_train
def _mark_op(self, op):
if op.type in __amp_skip_ops__:
return
if is_forward_op(op):
# ernie inference trick
if op.type == "assign" and "array_" in op.input_arg_names[0]:
self._op_fp16_dict[op.desc.original_id()] = False
......@@ -227,7 +225,6 @@ class FP16State:
self.forward_non_leaf_tensors[var_name] = op.desc.id()
elif is_backward_op(op) == int(OpRole.Backward):
if op.desc.original_id() in self.grad_op_to_op_map:
fwd_op_id = self.grad_op_to_op_map[op.desc.original_id()]
assert fwd_op_id in self._op_fp16_dict, f"{str(op)}"
......@@ -259,7 +256,6 @@ class FP16State:
var.desc.set_dtype(__target_dtype__)
def resolute_tensor_dtype(self, block):
for op in block.ops:
if is_forward_op(op):
# NOTE (JZ-LIANG) un-expected cast op when user call "+, -, *, /" in python
......@@ -382,7 +378,6 @@ class FP16State:
def _insert_forward_cast_ops(
self, op, idx, block, src_dtype, dst_dtype, dist_context
):
num_cast_ops = 0
for in_name in op.input_names:
......@@ -470,7 +465,6 @@ class FP16State:
def _insert_backward_cast_ops(
self, op, idx, block, src_dtype, dst_dtype, dist_context
):
num_cast_ops = 0
op_id = op.desc.id()
original_id = op.desc.original_id()
......@@ -495,11 +489,9 @@ class FP16State:
src_dtype,
slot_name,
) in self.forward_input_cast_ops[forward_op_id]:
# rename input
# some forward output is not need by backward computation, e.g. logit in softmax_with_cross_entropy
if slot_name in op.input_names:
assert src_name in op.input(
slot_name
), "var: {} not in op's {}. {}".format(
......@@ -567,7 +559,6 @@ class FP16State:
def _check_and_update_gradient(grads, loss_scaling, name, dist_context):
main_block = paddle.static.default_main_program().global_block()
main_block._sync_with_cpp()
......
......@@ -327,7 +327,6 @@ class ClipGradByGloblNormPass(PassBase):
self._remove_no_need_ops_vars(block)
def _remove_no_need_ops_vars(self, block):
removed_op_out_type = [
'squared_l2_norm',
'square',
......@@ -463,7 +462,6 @@ class ClipGradByGloblNormPass(PassBase):
self.clip_helper._init_dist_attr(allreduce_op)
if insert_leaf_fill_constant_node:
# NOTE add naive deps for global norm sync in graph exe
j = idx - 1
prior_op = None
......
......@@ -92,7 +92,6 @@ class PipelinePass(PassBase):
)
def _insert_sync_ops_for_stream(self):
for block in self._program.blocks:
offset = 0
send_vars = []
......@@ -243,7 +242,6 @@ class PipelinePass(PassBase):
# set upstream/downstream for task_nodes of cur_rank
for i, (task_role, task_node) in enumerate(task_nodes.items()):
cur_id = int(self._cur_rank * num_of_functionality + i)
ups = []
downs = []
......
......@@ -65,7 +65,6 @@ class QuantizationPass(PassBase):
return True
def _apply_single_impl(self, main_program, startup_program, context):
dist_context = self.get_attr("dist_context")
params_grads = self.get_attr("params_grads")
mode = self.get_attr("mode")
......
......@@ -225,7 +225,6 @@ class ShardingPass(PassBase):
)
def _build_sharding_infos(self, main_block, params_grads):
# order params
params_grads = re_order_program(
main_block, params_grads, self._dist_context
......@@ -233,7 +232,6 @@ class ShardingPass(PassBase):
# partition
for dp_group in self.dp_groups:
assert (
dp_group.nranks >= self.sharding_world_size
), "sharding world size [{}] should not larger than dp world size [{}]".format(
......@@ -297,7 +295,6 @@ class ShardingPass(PassBase):
self._insert_optimizer_broadcasts(main_block, startup_block)
def _shard_amp_related_op_and_vars(self, main_block):
if self.stage < 2:
return
......@@ -347,7 +344,6 @@ class ShardingPass(PassBase):
main_block._sync_with_cpp()
def _shard_gradient_clip(self, main_block):
if self.stage < 2:
return
......@@ -416,7 +412,6 @@ class ShardingPass(PassBase):
main_block._sync_with_cpp()
def _shard_weight_decay(self, main_block):
if self.stage < 2:
return
......@@ -430,7 +425,6 @@ class ShardingPass(PassBase):
main_block._sync_with_cpp()
def _shard_optimizer_ops_and_states(self, main_block, startup_block):
should_removed_optimizer_states = []
for idx, op in reversed(list(enumerate(main_block.ops))):
if not is_optimize_op(op):
......@@ -471,7 +465,6 @@ class ShardingPass(PassBase):
startup_block._sync_with_cpp()
def _insert_optimizer_broadcasts(self, main_block, startup_block):
if self.stage > 2 or self.param_bucket_size_numel > 1:
return
......@@ -519,7 +512,6 @@ class ShardingPass(PassBase):
return p_g
def _shard_gradient_synchronization(self, main_block):
if self.stage < 2:
return
......@@ -562,7 +554,6 @@ class ShardingPass(PassBase):
main_block._sync_with_cpp()
def _shard_parameter(self, main_block, startup_block):
if self.stage < 3:
return
......@@ -684,7 +675,6 @@ class ShardingPass(PassBase):
startup_block._sync_with_cpp()
def _optimization_pass(self, main_program, startup_program):
if self.stage <= 1:
return
......@@ -712,7 +702,6 @@ class ShardingPass(PassBase):
self._fuse_overlap_parameter_comm_stage_three(sharding_info)
def _gradient_sync_optimization(self, sharding_info):
if self.grad_bucket_size_numel <= 1 and (not self.enable_overlap):
return
......@@ -730,7 +719,6 @@ class ShardingPass(PassBase):
)
def _fuse_overlap_parameter_comm_stage_two(self, sharding_info):
main_block = default_main_program().global_block()
startup_block = default_startup_program().global_block()
......@@ -777,7 +765,6 @@ class ShardingPass(PassBase):
self.op_to_stream_idx = {}
for i, param_group in enumerate(group_to_param_map.keys()):
assert len(param_group) >= 1
if len(param_group) > 1:
coalesce_var_name = unique_name.generate(
......@@ -1087,7 +1074,6 @@ class ShardingPass(PassBase):
# update block
for idx, op in reversed(list(enumerate(block.ops))):
if idx in modify_reduce_op_map:
group = modify_reduce_op_map[idx]
grad_name = op.output_arg_names[0]
......@@ -1202,7 +1188,6 @@ class ShardingPass(PassBase):
grad_comm_op_to_stream_idx = {}
for idx, op in enumerate(ops):
if is_data_parallel_reduce_op(op):
if op.type == "c_allreduce_sum":
continue
stream_idx = reduce_op_count % self.grad_comm_stream_num
......@@ -1429,7 +1414,6 @@ def _insert_init_and_broadcast_op(
dist_context,
)
if local_rank != root_rank:
new_op = block._insert_op_without_sync(
insert_idx,
type="empty",
......@@ -1523,7 +1507,6 @@ def _is_param_grad_fp32_cast_op(block, op):
def _is_param_fp16_cast_op(block, op, params):
if is_optimize_op(op):
return False
if not _is_desired_cast_op(block, op):
......@@ -1563,7 +1546,6 @@ def _get_base_name_from_grad_name(grad_name):
def _is_param_grad_allreduce_op(op, block):
if not is_data_parallel_reduce_op(op):
return False
......@@ -1577,7 +1559,6 @@ def _is_param_grad_allreduce_op(op, block):
def _is_param_grad_sum_op(op, block):
if not is_backward_op(op):
return False
if op.type != "sum":
......@@ -1601,7 +1582,6 @@ def is_sharding_param_broadcast_op(op):
def _inference_data_parallel_group_for_operator(rank_id, op, dist_context):
dp_group = None
for input_name in op.input_arg_names:
# TODO(zhaoyingli): maintain a dict in dist_context to record all variables which are renamed,
......@@ -1696,7 +1676,6 @@ def partition_parameters(params, group_size, algor="greedy_even"):
def re_order_program(block, param_grads, dist_context):
# record order
pname_to_pg_pairs = {}
for p, g in param_grads:
......
......@@ -67,7 +67,6 @@ class AutoParalSupplementDepPass(PassBase):
return True
def _apply_single_impl(self, main_program, startup_program, context):
# TODO general this pass for all case.
if not _sharding_pass_applied(context):
return
......
......@@ -175,7 +175,6 @@ class BuildCINNPass(CPPPassWrapper):
return PassType.CALC_OPT
def _apply_single_impl(self, main_program, startup_program, context):
assert (
'FLAGS_allow_cinn_ops' in core.globals()
), "PaddlePaddle is not compiled with CINN support"
......@@ -201,7 +200,6 @@ class BuildCINNPass(CPPPassWrapper):
)
else:
tmp_main_program = Executor._add_fetch_ops(
main_program, fetch_list, 'fetch'
)
......
......@@ -466,7 +466,7 @@ class DistributedOpsPass(PassBase):
PSGPU = core.PSGPU()
try:
gpu_slot = [int(var.name) for var in gpups_inputs]
except (ValueError):
except ValueError:
raise ValueError(
"The slot name in gpups Should be able to convert to integer."
)
......@@ -922,7 +922,6 @@ class SplitHeterWorkerOpsPass(PassBase):
first_op_index_fp = len(heter_block.ops)
if stage_id < len(program_block_ops_list):
heter_block_bp = heter_program._create_block(pre_block_idx)
optimizer_block.append(heter_block_bp)
......
......@@ -470,7 +470,7 @@ class CommonAccessor(Accessor):
attr_varnames = self.opt_attr_map[oop.type]
self.accessor_class = oop.type
for (formal_name, shape) in param_varnames:
for formal_name, shape in param_varnames:
params.append(formal_name)
if self.accessor_class == "adam_d2sum":
# for dims
......@@ -573,7 +573,7 @@ class CommonAccessor(Accessor):
oop = op
break
for (attr_varname, type_) in attr_varnames:
for attr_varname, type_ in attr_varnames:
value = oop.attr(attr_varname)
attrs.append("&".join([attr_varname, str(value)]))
......
......@@ -273,7 +273,6 @@ class GpuPsProgramBuilder(PsProgramBuilder):
super().__init__(pass_ctx)
def _build_trainer_programs(self):
add_lr_decay_table_pass = new_pass(
"add_lr_decay_table_pass", self.attrs
)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册