未验证 提交 5434560a 编写于 作者: W Wennie396 提交者: GitHub

Pipeline pass base (#55174)

* format correction

* variable names adjustment

* variable names adjustment, name-->type, value-->sub_program
上级 94365855
......@@ -28,10 +28,7 @@ from paddle.fluid import core
from paddle.fluid.framework import Program
from .pass_base import PassBase, register_pass
from .pipeline_scheduler_pass import (
_create_program,
_insert_sync_for_fthenb_1f1b,
)
from .pass_utils import _create_program, _insert_sync_for_fthenb_1f1b
__not_shape_var_type__ = [
core.VarDesc.VarType.READER,
......
......@@ -15,8 +15,23 @@
from collections import OrderedDict
from typing import List
from paddle.distributed.auto_parallel.static.utils import (
is_backward_op,
is_forward_op,
is_lr_sched_op,
is_optimize_op,
)
from paddle.distributed.fleet.meta_optimizers.common import OpRole
from paddle.fluid import core
from paddle.fluid.framework import Program
from paddle.fluid.framework import Parameter, Program
__not_shape_var_type__ = [
core.VarDesc.VarType.READER,
core.VarDesc.VarType.STEP_SCOPES,
core.VarDesc.VarType.LOD_TENSOR_ARRAY,
core.VarDesc.VarType.FEED_MINIBATCH,
core.VarDesc.VarType.FETCH_LIST,
]
def list_to_ordered_dict(list_obj, ordered_dict=None):
......@@ -243,3 +258,228 @@ def get_skip_gc_vars(program_list: List[Program]):
skip_gc_vars[idx] = vars_set & union_set
return skip_gc_vars
def _create_param(dst_block, src_var):
copied_kwargs = {}
copied_kwargs['trainable'] = src_var.trainable
copied_kwargs['optimize_attr'] = src_var.optimize_attr
copied_kwargs['regularizer'] = src_var.regularizer
copied_kwargs['do_model_average'] = src_var.do_model_average
copied_kwargs['need_clip'] = src_var.need_clip
Parameter(
block=dst_block,
type=src_var.type,
name=src_var.name,
shape=src_var.shape,
dtype=src_var.dtype,
lod_level=src_var.lod_level,
error_clip=src_var.error_clip,
stop_gradient=src_var.stop_gradient,
is_data=src_var.is_data,
belong_to_optimizer=src_var.belong_to_optimizer,
**copied_kwargs
)
def _create_inter(dst_block, src_var):
dst_block.create_var(
type=src_var.type,
name=src_var.name,
shape=src_var.shape,
dtype=src_var.dtype,
lod_level=src_var.lod_level,
persistable=src_var.persistable,
error_clip=src_var.error_clip,
stop_gradient=src_var.stop_gradient,
is_data=src_var.is_data,
belong_to_optimizer=src_var.belong_to_optimizer,
)
def _create_var(src_block, dst_block, src_varname, force_create=False):
if not force_create:
src_var = src_block.var(src_varname)
else:
src_var = src_block._var_recursive(src_varname)
if src_var.type in __not_shape_var_type__:
persist = getattr(src_var, 'persistable', False)
dst_block.create_var(
type=src_var.type,
name=src_var.name,
persistable=persist,
error_clip=src_var.error_clip,
stop_gradient=src_var.stop_gradient,
is_data=src_var.is_data,
belong_to_optimizer=src_var.belong_to_optimizer,
)
else:
if isinstance(src_var, Parameter):
_create_param(dst_block, src_var)
else:
_create_inter(dst_block, src_var)
def _create_program(src_block, dst_block, src_op, force_create=False):
dst_op_desc = dst_block.desc.append_op()
dst_op_desc.copy_from(src_op.desc)
for input_varname in src_op.input_arg_names:
if src_block.has_var(input_varname) or (
force_create and src_block._find_var_recursive(input_varname)
):
_create_var(src_block, dst_block, input_varname, force_create)
for output_varname in src_op.output_arg_names:
if src_block.has_var(output_varname) or (
force_create and src_block._find_var_recursive(output_varname)
):
_create_var(src_block, dst_block, output_varname, force_create)
def _insert_sync_for_fthenb_1f1b(program):
"""
This implementation refers to lots of Paddle/python/paddle/fluid/optimizer.py.
The difference between this function with 'PipelineOptimizer' is that
'send_v2' op and 'recv_v2' op have been inserted in program by 'reshard'.
"""
for block in program.blocks:
offset = 0
first_optimize_index = None
for index, op in enumerate(list(block.ops)):
if is_optimize_op(op):
first_optimize_index = index
break
# insert sync ops
for index, op in enumerate(list(block.ops)):
# NOTE: pipeline might hang when dynamic_shape is True
if op.type in ['send_v2', 'recv_v2']:
op._set_attr("dynamic_shape", False)
# set send op on comm stream
if op.type == 'send_v2':
# step1: set 'use_calc_stream' False
op._set_attr("use_calc_stream", False)
op_role = op.attr('op_role')
ring_id = op.attr('ring_id')
# step2: insert 'c_sync_calc_stream' op before 'send_v2' op
var_name = op.input_arg_names[0]
var = block.var(var_name)
block._insert_op_without_sync(
index=index + offset,
type="c_sync_calc_stream",
inputs={'X': [var]},
outputs={'Out': [var]},
attrs={'op_role': op_role},
)
offset += 1
# step3: insert 'c_sync_comm_stream' op after 'send_v2' op or
# before the first optimize op
if int(op_role) == int(OpRole.Backward):
index = first_optimize_index + offset
new_op_role = OpRole.Optimize
else:
index = index + offset + 1
new_op_role = OpRole.Backward
sync_comm_op = block._insert_op_without_sync(
index=index,
type="c_sync_comm_stream",
inputs={'X': [var]},
outputs={'Out': [var]},
attrs={
'op_role': new_op_role,
'ring_id': ring_id,
},
)
# step4: If 'send_v2' op in forward parse, set 'pipeline_flag' to distinguish
# whether the 'c_sync_comm_stream' op is inserted for pipeline.
if int(op_role) == int(OpRole.Forward):
sync_comm_op._set_attr('pipeline_flag', '')
offset += 1
block._sync_with_cpp()
offset = 0
backward_recv_index = None
for index, op in enumerate(block.ops):
if op.type == "recv_v2" and is_backward_op(op):
backward_recv_index = index
break
if backward_recv_index is None:
continue
# replace 'c_sync_comm_stream' op with 'nop' op
# use nop op for gc
for index, op in enumerate(list(block.ops)):
if index >= backward_recv_index:
break
if op.type == 'c_sync_comm_stream' and op.has_attr('pipeline_flag'):
var_name = op.output_arg_names[0]
var = block.var(var_name)
block._remove_op(index + offset, sync=False)
offset -= 1
block._insert_op_without_sync(
index=backward_recv_index,
type="nop",
inputs={'X': [var]},
outputs={'Out': [var]},
attrs={'op_role': OpRole.Backward},
)
block._sync_with_cpp()
def _program_for_fthenb_and_1f1b(program):
"""
This implementation is for fthenb and 1f1b programs and is called in partial_programs function.
"""
_insert_sync_for_fthenb_1f1b(program)
lr_prog = Program()
fwd_prog = Program()
bwd_prog = Program()
opt_prog = Program()
for idx, src_block in enumerate(program.blocks):
if idx == 0:
lr_block = lr_prog.block(0)
fwd_block = fwd_prog.block(0)
bwd_block = bwd_prog.block(0)
opt_block = opt_prog.block(0)
else:
lr_block = lr_prog._create_block(parent_idx=src_block.parent_idx)
fwd_block = fwd_prog._create_block(parent_idx=src_block.parent_idx)
bwd_block = bwd_prog._create_block(parent_idx=src_block.parent_idx)
opt_block = opt_prog._create_block(parent_idx=src_block.parent_idx)
lr_block._set_forward_block_idx(src_block.forward_block_idx)
fwd_block._set_forward_block_idx(src_block.forward_block_idx)
bwd_block._set_forward_block_idx(src_block.forward_block_idx)
opt_block._set_forward_block_idx(src_block.forward_block_idx)
# split the program based on the op_role
for op in src_block.ops:
if is_lr_sched_op(op):
_create_program(src_block, lr_block, op)
if is_forward_op(op):
_create_program(src_block, fwd_block, op)
elif is_backward_op(op):
_create_program(src_block, bwd_block, op)
elif is_optimize_op(op):
_create_program(src_block, opt_block, op)
else:
raise ValueError(
"The op role: "
+ str(op.attr('op_role'))
+ " isn't one of LRSched, Forward, Backward or Optimizer."
)
lr_prog._sync_with_cpp()
fwd_prog._sync_with_cpp()
bwd_prog._sync_with_cpp()
opt_prog._sync_with_cpp()
lr_prog._rollback()
fwd_prog._rollback()
bwd_prog._rollback()
opt_prog._rollback()
# It MUST return in this order
return [lr_prog, fwd_prog, bwd_prog, opt_prog]
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.fluid import core
from .pass_base import PassBase
from .pass_utils import get_skip_gc_vars
class PipelinePassBase(PassBase):
def __init__(self):
super().__init__()
def _check_self(self):
return True
def _check_conflict(self, other_pass):
return True
def create_job_list(self):
"""
An interface that MUST be implemented by subclasses.
"""
pass
def partial_programs(self, program):
"""
An interface that MUST be implemented by subclasses.
The return value MUST be two lists, one is a list of types(str), another
is a list of sub programs.
For example:
return ["lr", "forward", "backward", "optimizer"], [lr_prog, fwd_prog, bwd_prog, opt_prog]
or
return ["forward"], [fwd_prog]
"""
pass
def _apply_single_impl(self, main_program, startup_program, context):
"""
The shared process is implemented in this function and new subclass only need
to implement two interfaces above, 'create_job_list' and 'partial_programs'.
"""
type_list, sub_program_list = self.partial_programs(main_program)
job_list = self.create_job_list()
# Following is a shared gc process for base class.
gc_vars_list = get_skip_gc_vars(sub_program_list)
type_to_gc_vars = {}
for type, gc_var in zip(type_list, gc_vars_list):
type_to_gc_vars[type] = gc_var
for job in job_list:
job.set_skip_gc_vars(type_to_gc_vars[job.type()])
type_to_program = {}
for type, sub_program in zip(type_list, sub_program_list):
type_to_program[type] = sub_program.desc
plan = core.Plan(job_list, type_to_program)
context.set_attr("plan", plan)
......@@ -12,18 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.distributed.auto_parallel.static.utils import (
is_backward_op,
is_forward_op,
is_lr_sched_op,
is_optimize_op,
)
from paddle.distributed.fleet.meta_optimizers.common import OpRole
from paddle.fluid import core
from paddle.fluid.framework import Parameter, Program
from .pass_base import PassBase, PassContext, new_pass, register_pass
from .pass_utils import get_skip_gc_vars
from .pass_base import PassContext, new_pass, register_pass
from .pass_utils import _program_for_fthenb_and_1f1b
from .pipeline_pass_base import PipelinePassBase
__not_shape_var_type__ = [
core.VarDesc.VarType.READER,
......@@ -34,316 +27,63 @@ __not_shape_var_type__ = [
]
def _create_param(dst_block, src_var):
copied_kwargs = {}
copied_kwargs['trainable'] = src_var.trainable
copied_kwargs['optimize_attr'] = src_var.optimize_attr
copied_kwargs['regularizer'] = src_var.regularizer
copied_kwargs['do_model_average'] = src_var.do_model_average
copied_kwargs['need_clip'] = src_var.need_clip
Parameter(
block=dst_block,
type=src_var.type,
name=src_var.name,
shape=src_var.shape,
dtype=src_var.dtype,
lod_level=src_var.lod_level,
error_clip=src_var.error_clip,
stop_gradient=src_var.stop_gradient,
is_data=src_var.is_data,
belong_to_optimizer=src_var.belong_to_optimizer,
**copied_kwargs
)
def _create_inter(dst_block, src_var):
dst_block.create_var(
type=src_var.type,
name=src_var.name,
shape=src_var.shape,
dtype=src_var.dtype,
lod_level=src_var.lod_level,
persistable=src_var.persistable,
error_clip=src_var.error_clip,
stop_gradient=src_var.stop_gradient,
is_data=src_var.is_data,
belong_to_optimizer=src_var.belong_to_optimizer,
)
def _create_var(src_block, dst_block, src_varname, force_create=False):
if not force_create:
src_var = src_block.var(src_varname)
else:
src_var = src_block._var_recursive(src_varname)
if src_var.type in __not_shape_var_type__:
persist = getattr(src_var, 'persistable', False)
dst_block.create_var(
type=src_var.type,
name=src_var.name,
persistable=persist,
error_clip=src_var.error_clip,
stop_gradient=src_var.stop_gradient,
is_data=src_var.is_data,
belong_to_optimizer=src_var.belong_to_optimizer,
)
else:
if isinstance(src_var, Parameter):
_create_param(dst_block, src_var)
else:
_create_inter(dst_block, src_var)
def _create_program(src_block, dst_block, src_op, force_create=False):
dst_op_desc = dst_block.desc.append_op()
dst_op_desc.copy_from(src_op.desc)
for input_varname in src_op.input_arg_names:
if src_block.has_var(input_varname) or (
force_create and src_block._find_var_recursive(input_varname)
):
_create_var(src_block, dst_block, input_varname, force_create)
for output_varname in src_op.output_arg_names:
if src_block.has_var(output_varname) or (
force_create and src_block._find_var_recursive(output_varname)
):
_create_var(src_block, dst_block, output_varname, force_create)
def _insert_sync_for_fthenb_1f1b(program):
"""
This implementation refers to lots of Paddle/python/paddle/fluid/optimizer.py.
The difference between this function with 'PipelineOptimizer' is that
'send_v2' op and 'recv_v2' op have been inserted in program by 'reshard'.
"""
for block in program.blocks:
offset = 0
first_optimize_index = None
for index, op in enumerate(list(block.ops)):
if is_optimize_op(op):
first_optimize_index = index
break
# insert sync ops
for index, op in enumerate(list(block.ops)):
# NOTE: pipeline might hang when dynamic_shape is True
if op.type in ['send_v2', 'recv_v2']:
op._set_attr("dynamic_shape", False)
# set send op on comm stream
if op.type == 'send_v2':
# step1: set 'use_calc_stream' False
op._set_attr("use_calc_stream", False)
op_role = op.attr('op_role')
ring_id = op.attr('ring_id')
# step2: insert 'c_sync_calc_stream' op before 'send_v2' op
var_name = op.input_arg_names[0]
var = block.var(var_name)
block._insert_op_without_sync(
index=index + offset,
type="c_sync_calc_stream",
inputs={'X': [var]},
outputs={'Out': [var]},
attrs={'op_role': op_role},
)
offset += 1
# step3: insert 'c_sync_comm_stream' op after 'send_v2' op or
# before the first optimize op
if int(op_role) == int(OpRole.Backward):
index = first_optimize_index + offset
new_op_role = OpRole.Optimize
else:
index = index + offset + 1
new_op_role = OpRole.Backward
sync_comm_op = block._insert_op_without_sync(
index=index,
type="c_sync_comm_stream",
inputs={'X': [var]},
outputs={'Out': [var]},
attrs={
'op_role': new_op_role,
'ring_id': ring_id,
},
)
# step4: If 'send_v2' op in forward parse, set 'pipeline_flag' to distinguish
# whether the 'c_sync_comm_stream' op is inserted for pipeline.
if int(op_role) == int(OpRole.Forward):
sync_comm_op._set_attr('pipeline_flag', '')
offset += 1
block._sync_with_cpp()
offset = 0
backward_recv_index = None
for index, op in enumerate(block.ops):
if op.type == "recv_v2" and is_backward_op(op):
backward_recv_index = index
break
if backward_recv_index is None:
continue
# replace 'c_sync_comm_stream' op with 'nop' op
# use nop op for gc
for index, op in enumerate(list(block.ops)):
if index >= backward_recv_index:
break
if op.type == 'c_sync_comm_stream' and op.has_attr('pipeline_flag'):
var_name = op.output_arg_names[0]
var = block.var(var_name)
block._remove_op(index + offset, sync=False)
offset -= 1
block._insert_op_without_sync(
index=backward_recv_index,
type="nop",
inputs={'X': [var]},
outputs={'Out': [var]},
attrs={'op_role': OpRole.Backward},
)
block._sync_with_cpp()
def _program_for_fthenb_and_1f1b(program):
lr_prog = Program()
fwd_prog = Program()
bwd_prog = Program()
opt_prog = Program()
for idx, src_block in enumerate(program.blocks):
if idx == 0:
lr_block = lr_prog.block(0)
fwd_block = fwd_prog.block(0)
bwd_block = bwd_prog.block(0)
opt_block = opt_prog.block(0)
else:
lr_block = lr_prog._create_block(parent_idx=src_block.parent_idx)
fwd_block = fwd_prog._create_block(parent_idx=src_block.parent_idx)
bwd_block = bwd_prog._create_block(parent_idx=src_block.parent_idx)
opt_block = opt_prog._create_block(parent_idx=src_block.parent_idx)
lr_block._set_forward_block_idx(src_block.forward_block_idx)
fwd_block._set_forward_block_idx(src_block.forward_block_idx)
bwd_block._set_forward_block_idx(src_block.forward_block_idx)
opt_block._set_forward_block_idx(src_block.forward_block_idx)
# split the program based on the op_role
for op in src_block.ops:
if is_lr_sched_op(op):
_create_program(src_block, lr_block, op)
if is_forward_op(op):
_create_program(src_block, fwd_block, op)
elif is_backward_op(op):
_create_program(src_block, bwd_block, op)
elif is_optimize_op(op):
_create_program(src_block, opt_block, op)
else:
raise ValueError(
"The op role: "
+ str(op.attr('op_role'))
+ " isn't one of LRSched, Forward, Backward or Optimizer."
)
lr_prog._sync_with_cpp()
fwd_prog._sync_with_cpp()
bwd_prog._sync_with_cpp()
opt_prog._sync_with_cpp()
lr_prog._rollback()
fwd_prog._rollback()
bwd_prog._rollback()
opt_prog._rollback()
lr_vars, fwd_vars, bwd_vars, opt_vars = get_skip_gc_vars(
[lr_prog, fwd_prog, bwd_prog, opt_prog]
)
return {
"lr": lr_prog.desc,
"forward": fwd_prog.desc,
"backward": bwd_prog.desc,
"optimizer": opt_prog.desc,
}, {
"lr": lr_vars,
"forward": fwd_vars,
"backward": bwd_vars,
"optimizer": opt_vars,
}
@register_pass("pipeline_scheduler_FThenB")
class PipelineFThenBPass(PassBase):
class PipelineFThenBPass(PipelinePassBase):
def __init__(self):
super().__init__()
def _check_self(self):
return True
def create_job_list(self):
num_micro_batches = self.get_attr("num_micro_batches")
def _check_conflict(self, other_pass):
return True
def _create_job_list(self, type_to_skip_vars):
job_list = []
lr_job = core.Job("lr")
lr_job.set_skip_gc_vars(type_to_skip_vars["lr"])
job_list.append(lr_job)
for i in range(self._num_micro_batches):
for i in range(num_micro_batches):
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(i)
forward_job.set_skip_gc_vars(type_to_skip_vars["forward"])
job_list.append(forward_job)
for i in range(self._num_micro_batches):
for i in range(num_micro_batches):
backward_job = core.Job("backward")
backward_job.set_micro_batch_id(i)
backward_job.set_skip_gc_vars(type_to_skip_vars["backward"])
job_list.append(backward_job)
opt_job = core.Job("optimizer")
opt_job.set_skip_gc_vars(type_to_skip_vars["optimizer"])
job_list.append(opt_job)
return job_list
def _apply_single_impl(self, main_program, startup_program, context):
self._num_micro_batches = self.get_attr("num_micro_batches")
self._program = main_program
_insert_sync_for_fthenb_1f1b(self._program)
type_to_program, type_to_skip_vars = _program_for_fthenb_and_1f1b(
self._program
)
job_list = self._create_job_list(type_to_skip_vars)
plan = core.Plan(job_list, type_to_program)
context.set_attr("plan", plan)
def partial_programs(self, program):
types = ["lr", "forward", "backward", "optimizer"]
sub_program_list = _program_for_fthenb_and_1f1b(program)
return types, sub_program_list
@register_pass("pipeline_scheduler_1F1B")
class Pipeline1F1BPass(PassBase):
class Pipeline1F1BPass(PipelinePassBase):
def __init__(self):
super().__init__()
def _check_self(self):
return True
def _check_conflict(self, other_pass):
return True
def create_job_list(self):
num_micro_batches = self.get_attr("num_micro_batches")
pp_stage = self.get_attr("pp_stage")
pp_degree = self.get_attr("pp_degree")
def _create_job_list(self, type_to_skip_vars):
job_list = []
lr_job = core.Job("lr")
lr_job.set_skip_gc_vars(type_to_skip_vars["lr"])
job_list.append(lr_job)
assert (
self._pp_degree <= self._num_micro_batches
pp_degree <= num_micro_batches
), "Num of micro batches should larger than pp degree."
micro_batch_in_warmup = self._pp_degree - self._pp_stage
micro_batch_in_1f1b = self._num_micro_batches - micro_batch_in_warmup
micro_batch_in_warmup = pp_degree - pp_stage
micro_batch_in_1f1b = num_micro_batches - micro_batch_in_warmup
forward_micro_batch_id = 0
for i in range(micro_batch_in_warmup):
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(forward_micro_batch_id)
forward_job.set_skip_gc_vars(type_to_skip_vars["forward"])
job_list.append(forward_job)
forward_micro_batch_id += 1
......@@ -351,41 +91,27 @@ class Pipeline1F1BPass(PassBase):
for i in range(micro_batch_in_1f1b):
backward_job = core.Job("backward")
backward_job.set_micro_batch_id(backward_micro_batch_id)
backward_job.set_skip_gc_vars(type_to_skip_vars["backward"])
job_list.append(backward_job)
backward_micro_batch_id += 1
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(forward_micro_batch_id)
forward_job.set_skip_gc_vars(type_to_skip_vars["forward"])
job_list.append(forward_job)
forward_micro_batch_id += 1
for i in range(micro_batch_in_warmup):
backward_job = core.Job("backward")
backward_job.set_micro_batch_id(backward_micro_batch_id)
backward_job.set_skip_gc_vars(type_to_skip_vars["backward"])
job_list.append(backward_job)
backward_micro_batch_id += 1
opt_job = core.Job("optimizer")
opt_job.set_skip_gc_vars(type_to_skip_vars["optimizer"])
job_list.append(opt_job)
return job_list
def _apply_single_impl(self, main_program, startup_program, context):
self._num_micro_batches = self.get_attr("num_micro_batches")
self._pp_stage = self.get_attr("pp_stage")
self._pp_degree = self.get_attr("pp_degree")
self._program = main_program
_insert_sync_for_fthenb_1f1b(self._program)
type_to_program, type_to_skip_vars = _program_for_fthenb_and_1f1b(
self._program
)
job_list = self._create_job_list(type_to_skip_vars)
plan = core.Plan(job_list, type_to_program)
context.set_attr("plan", plan)
def partial_programs(self, program):
types = ["lr", "forward", "backward", "optimizer"]
sub_program_list = _program_for_fthenb_and_1f1b(program)
return types, sub_program_list
def apply_pass(main_program, startup_program, pass_name, pass_attr={}):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册