diff --git a/paddle/fluid/framework/new_executor/interpreter/plan.cc b/paddle/fluid/framework/new_executor/interpreter/plan.cc index a996c13716ae1a3370185fcd00342517577053b8..0a4e465bd3523cb6d37d52eff5c201b53e7edc36 100644 --- a/paddle/fluid/framework/new_executor/interpreter/plan.cc +++ b/paddle/fluid/framework/new_executor/interpreter/plan.cc @@ -19,7 +19,9 @@ namespace paddle { namespace framework { -const std::vector& Plan::GetJobList() const { return job_list_; } +const std::vector>& Plan::GetJobList() const { + return job_list_; +} const std::unordered_map& Plan::GetTypeToProgram() const { diff --git a/paddle/fluid/framework/new_executor/interpreter/plan.h b/paddle/fluid/framework/new_executor/interpreter/plan.h index eeb0f9417d86647ee980c26f4a298717ade39d88..6c50b46f6856f1b81567b4c19621a77cace0ef51 100644 --- a/paddle/fluid/framework/new_executor/interpreter/plan.h +++ b/paddle/fluid/framework/new_executor/interpreter/plan.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include #include @@ -26,18 +27,18 @@ class Job; class Plan final { public: - Plan(const std::vector& job_list, + Plan(const std::vector>& job_list, const std::unordered_map& type_to_program) : job_list_(job_list), type_to_program_(type_to_program) {} ~Plan() = default; - const std::vector& GetJobList() const; + const std::vector>& GetJobList() const; const std::unordered_map& GetTypeToProgram() const; private: DISABLE_COPY_AND_ASSIGN(Plan); - std::vector job_list_; + std::vector> job_list_; std::unordered_map type_to_program_; }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 0f1d9fc6782a45abfac200511dd34687f0c54ea0..4e8026fd92d03ddf907c6041aabb0ff505a27faa 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1857,14 +1857,14 @@ All parameter, weight, gradient are variables in Paddle. return py::cast(std::move(ret)); }); - py::class_(m, "job") + py::class_>(m, "job") .def(py::init(), py::arg("type")) .def("type", &framework::Job::GetJobType) .def("micro_batch_id", &framework::Job::GetMicroBatchId) .def("set_micro_batch_id", &framework::Job::SetMicroBatchId); py::class_(m, "plan") - .def(py::init &, + .def(py::init> &, const std::unordered_map &>(), py::arg("job_list"), diff --git a/python/paddle/distributed/passes/__init__.py b/python/paddle/distributed/passes/__init__.py index 8ab110e60c3b97712f3c46b1dd159e97df7b9e97..206c158769b2c9c2dcd9e319f5b375367bd8ca88 100644 --- a/python/paddle/distributed/passes/__init__.py +++ b/python/paddle/distributed/passes/__init__.py @@ -24,6 +24,7 @@ from .auto_parallel_data_parallel_optimization import * # noqa: F403 from .auto_parallel_grad_clip import * # noqa: F403 from .auto_parallel_supplement_explicit_dependencies import * # noqa: F403 from .auto_parallel_pipeline import * # noqa: F403 +from .pipeline_scheduler_pass import * # noqa: F403 from .cpp_pass import * # noqa: F403 from .ps_trainer_pass import * # noqa: F403 from .ps_server_pass import * # noqa: F403 diff --git a/python/paddle/distributed/passes/auto_parallel_pipeline.py b/python/paddle/distributed/passes/auto_parallel_pipeline.py index b11a4c78d7ac77415c57a86f4091a3febc7961a6..a8bb35f210ad921918f5ed640c639fc6eefe99de 100644 --- a/python/paddle/distributed/passes/auto_parallel_pipeline.py +++ b/python/paddle/distributed/passes/auto_parallel_pipeline.py @@ -24,11 +24,14 @@ from paddle.distributed.auto_parallel.static.utils import ( is_optimize_op, ) from paddle.distributed.fleet.fleet_executor_utils import TaskNode -from paddle.distributed.fleet.meta_optimizers.common import OpRole from paddle.fluid import core -from paddle.fluid.framework import Parameter, Program +from paddle.fluid.framework import Program from .pass_base import PassBase, register_pass +from .pipeline_scheduler_pass import ( + _create_program, + _insert_sync_for_fthenb_1f1b, +) __not_shape_var_type__ = [ core.VarDesc.VarType.READER, @@ -75,7 +78,7 @@ class PipelinePass(PassBase): self._cur_pp_stage = self._get_pp_stage(self._cur_rank) if self._mode == "1F1B": - self._insert_sync_ops_for_1f1b() + _insert_sync_for_fthenb_1f1b(self._program) self._task_1f1b() elif self._mode == "F-Then-B": raise NotImplementedError("F-Then-B has not been implemented") @@ -123,177 +126,6 @@ class PipelinePass(PassBase): block._sync_with_cpp() - def _insert_sync_ops_for_1f1b(self): - """ - This implementation refers to lots of Paddle/python/paddle/fluid/optimizer.py. - The difference between this function with 'PipelineOptimizer' is that - 'send_v2' op and 'recv_v2' op have been inserted in program by 'reshard'. - """ - - for block in self._program.blocks: - offset = 0 - first_optimize_index = None - for index, op in enumerate(list(block.ops)): - if is_optimize_op(op): - first_optimize_index = index - break - - # insert sync ops - for index, op in enumerate(list(block.ops)): - # NOTE: pipeline might hang when dynamic_shape is True - if op.type in ['send_v2', 'recv_v2']: - op._set_attr("dynamic_shape", False) - # set send op on comm stream - if op.type == 'send_v2': - # step1: set 'use_calc_stream' False - op._set_attr("use_calc_stream", False) - op_role = op.attr('op_role') - ring_id = op.attr('ring_id') - # step2: insert 'c_sync_calc_stream' op before 'send_v2' op - var_name = op.input_arg_names[0] - var = block.var(var_name) - block._insert_op_without_sync( - index=index + offset, - type="c_sync_calc_stream", - inputs={'X': [var]}, - outputs={'Out': [var]}, - attrs={'op_role': op_role}, - ) - offset += 1 - # step3: insert 'c_sync_comm_stream' op after 'send_v2' op or - # before the first optimize op - if int(op_role) == int(OpRole.Backward): - index = first_optimize_index + offset - new_op_role = OpRole.Optimize - else: - index = index + offset + 1 - new_op_role = OpRole.Backward - sync_comm_op = block._insert_op_without_sync( - index=index, - type="c_sync_comm_stream", - inputs={'X': [var]}, - outputs={'Out': [var]}, - attrs={ - 'op_role': new_op_role, - 'ring_id': ring_id, - }, - ) - # step4: If 'send_v2' op in forward parse, set 'pipeline_flag' to distinguish - # whether the 'c_sync_comm_stream' op is inserted for pipeline. - if int(op_role) == int(OpRole.Forward): - sync_comm_op._set_attr('pipeline_flag', '') - offset += 1 - block._sync_with_cpp() - - offset = 0 - backward_recv_index = None - for index, op in enumerate(block.ops): - if op.type == "recv_v2" and is_backward_op(op): - backward_recv_index = index - break - if backward_recv_index is None: - continue - - # replace 'c_sync_comm_stream' op with 'nop' op - # use nop op for gc - for index, op in enumerate(list(block.ops)): - if index >= backward_recv_index: - break - if op.type == 'c_sync_comm_stream' and op.has_attr( - 'pipeline_flag' - ): - var_name = op.output_arg_names[0] - var = block.var(var_name) - block._remove_op(index + offset, sync=False) - offset -= 1 - block._insert_op_without_sync( - index=backward_recv_index, - type="nop", - inputs={'X': [var]}, - outputs={'Out': [var]}, - attrs={'op_role': OpRole.Backward}, - ) - block._sync_with_cpp() - - def _create_param(self, dst_block, src_var): - copied_kwargs = {} - copied_kwargs['trainable'] = src_var.trainable - copied_kwargs['optimize_attr'] = src_var.optimize_attr - copied_kwargs['regularizer'] = src_var.regularizer - copied_kwargs['do_model_average'] = src_var.do_model_average - copied_kwargs['need_clip'] = src_var.need_clip - - Parameter( - block=dst_block, - type=src_var.type, - name=src_var.name, - shape=src_var.shape, - dtype=src_var.dtype, - lod_level=src_var.lod_level, - error_clip=src_var.error_clip, - stop_gradient=src_var.stop_gradient, - is_data=src_var.is_data, - belong_to_optimizer=src_var.belong_to_optimizer, - **copied_kwargs - ) - - def _create_inter(self, dst_block, src_var): - dst_block.create_var( - type=src_var.type, - name=src_var.name, - shape=src_var.shape, - dtype=src_var.dtype, - lod_level=src_var.lod_level, - persistable=src_var.persistable, - error_clip=src_var.error_clip, - stop_gradient=src_var.stop_gradient, - is_data=src_var.is_data, - belong_to_optimizer=src_var.belong_to_optimizer, - ) - - def _create_var( - self, src_block, dst_block, src_varname, force_create=False - ): - - if not force_create: - src_var = src_block.var(src_varname) - else: - src_var = src_block._var_recursive(src_varname) - if src_var.type in __not_shape_var_type__: - persist = getattr(src_var, 'persistable', False) - dst_block.create_var( - type=src_var.type, - name=src_var.name, - persistable=persist, - error_clip=src_var.error_clip, - stop_gradient=src_var.stop_gradient, - is_data=src_var.is_data, - belong_to_optimizer=src_var.belong_to_optimizer, - ) - else: - if isinstance(src_var, Parameter): - self._create_param(dst_block, src_var) - else: - self._create_inter(dst_block, src_var) - - def _create_program(self, src_block, dst_block, src_op, force_create=False): - dst_op_desc = dst_block.desc.append_op() - dst_op_desc.copy_from(src_op.desc) - for input_varname in src_op.input_arg_names: - if src_block.has_var(input_varname) or ( - force_create and src_block._find_var_recursive(input_varname) - ): - self._create_var( - src_block, dst_block, input_varname, force_create - ) - for output_varname in src_op.output_arg_names: - if src_block.has_var(output_varname) or ( - force_create and src_block._find_var_recursive(output_varname) - ): - self._create_var( - src_block, dst_block, output_varname, force_create - ) - def _get_pp_stage(self, rank): pp_idx = None for idx, process_mesh in enumerate(self._dist_context.process_meshes): @@ -337,13 +169,13 @@ class PipelinePass(PassBase): # split the program based on the op_role for op in src_block.ops: if is_lr_sched_op(op): - self._create_program(src_block, lr_block, op) + _create_program(src_block, lr_block, op) if is_forward_op(op): - self._create_program(src_block, fwd_block, op) + _create_program(src_block, fwd_block, op) elif is_backward_op(op): - self._create_program(src_block, bwd_block, op) + _create_program(src_block, bwd_block, op) elif is_optimize_op(op): - self._create_program(src_block, opt_block, op) + _create_program(src_block, opt_block, op) else: raise ValueError( "The op role: " @@ -505,11 +337,11 @@ class PipelinePass(PassBase): continue if not is_after_while_op: - self._create_program( + _create_program( src_block, strat_block, op, force_create=True ) else: - self._create_program( + _create_program( src_block, end_block, op, force_create=True ) elif ib == 1: @@ -572,7 +404,7 @@ class PipelinePass(PassBase): if op.type == "send_v2": remove_process_group(op.attr("ring_id")) continue - self._create_program( + _create_program( src_block, send_block, op, force_create=True ) continue @@ -615,7 +447,7 @@ class PipelinePass(PassBase): op.desc._rename_input( in_name, recv_vars_name[in_name] ) - self._create_program( + _create_program( src_block, recv_block, op, force_create=True ) continue diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass.py b/python/paddle/distributed/passes/pipeline_scheduler_pass.py new file mode 100644 index 0000000000000000000000000000000000000000..6b6a7f2dab5a4bb927d68dcf5ae8c54220c676a3 --- /dev/null +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass.py @@ -0,0 +1,312 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.distributed.auto_parallel.static.utils import ( + is_backward_op, + is_forward_op, + is_lr_sched_op, + is_optimize_op, +) +from paddle.distributed.fleet.meta_optimizers.common import OpRole +from paddle.fluid import core +from paddle.fluid.framework import Parameter, Program + +from .pass_base import PassBase, register_pass + +__not_shape_var_type__ = [ + core.VarDesc.VarType.READER, + core.VarDesc.VarType.STEP_SCOPES, + core.VarDesc.VarType.LOD_TENSOR_ARRAY, + core.VarDesc.VarType.FEED_MINIBATCH, + core.VarDesc.VarType.FETCH_LIST, +] + + +def _create_param(dst_block, src_var): + copied_kwargs = {} + copied_kwargs['trainable'] = src_var.trainable + copied_kwargs['optimize_attr'] = src_var.optimize_attr + copied_kwargs['regularizer'] = src_var.regularizer + copied_kwargs['do_model_average'] = src_var.do_model_average + copied_kwargs['need_clip'] = src_var.need_clip + + Parameter( + block=dst_block, + type=src_var.type, + name=src_var.name, + shape=src_var.shape, + dtype=src_var.dtype, + lod_level=src_var.lod_level, + error_clip=src_var.error_clip, + stop_gradient=src_var.stop_gradient, + is_data=src_var.is_data, + belong_to_optimizer=src_var.belong_to_optimizer, + **copied_kwargs + ) + + +def _create_inter(dst_block, src_var): + dst_block.create_var( + type=src_var.type, + name=src_var.name, + shape=src_var.shape, + dtype=src_var.dtype, + lod_level=src_var.lod_level, + persistable=src_var.persistable, + error_clip=src_var.error_clip, + stop_gradient=src_var.stop_gradient, + is_data=src_var.is_data, + belong_to_optimizer=src_var.belong_to_optimizer, + ) + + +def _create_var(src_block, dst_block, src_varname, force_create=False): + if not force_create: + src_var = src_block.var(src_varname) + else: + src_var = src_block._var_recursive(src_varname) + if src_var.type in __not_shape_var_type__: + persist = getattr(src_var, 'persistable', False) + dst_block.create_var( + type=src_var.type, + name=src_var.name, + persistable=persist, + error_clip=src_var.error_clip, + stop_gradient=src_var.stop_gradient, + is_data=src_var.is_data, + belong_to_optimizer=src_var.belong_to_optimizer, + ) + else: + if isinstance(src_var, Parameter): + _create_param(dst_block, src_var) + else: + _create_inter(dst_block, src_var) + + +def _create_program(src_block, dst_block, src_op, force_create=False): + dst_op_desc = dst_block.desc.append_op() + dst_op_desc.copy_from(src_op.desc) + for input_varname in src_op.input_arg_names: + if src_block.has_var(input_varname) or ( + force_create and src_block._find_var_recursive(input_varname) + ): + _create_var(src_block, dst_block, input_varname, force_create) + for output_varname in src_op.output_arg_names: + if src_block.has_var(output_varname) or ( + force_create and src_block._find_var_recursive(output_varname) + ): + _create_var(src_block, dst_block, output_varname, force_create) + + +def _insert_sync_for_fthenb_1f1b(program): + """ + This implementation refers to lots of Paddle/python/paddle/fluid/optimizer.py. + The difference between this function with 'PipelineOptimizer' is that + 'send_v2' op and 'recv_v2' op have been inserted in program by 'reshard'. + """ + + for block in program.blocks: + offset = 0 + first_optimize_index = None + for index, op in enumerate(list(block.ops)): + if is_optimize_op(op): + first_optimize_index = index + break + + # insert sync ops + for index, op in enumerate(list(block.ops)): + # NOTE: pipeline might hang when dynamic_shape is True + if op.type in ['send_v2', 'recv_v2']: + op._set_attr("dynamic_shape", False) + # set send op on comm stream + if op.type == 'send_v2': + # step1: set 'use_calc_stream' False + op._set_attr("use_calc_stream", False) + op_role = op.attr('op_role') + ring_id = op.attr('ring_id') + # step2: insert 'c_sync_calc_stream' op before 'send_v2' op + var_name = op.input_arg_names[0] + var = block.var(var_name) + block._insert_op_without_sync( + index=index + offset, + type="c_sync_calc_stream", + inputs={'X': [var]}, + outputs={'Out': [var]}, + attrs={'op_role': op_role}, + ) + offset += 1 + # step3: insert 'c_sync_comm_stream' op after 'send_v2' op or + # before the first optimize op + if int(op_role) == int(OpRole.Backward): + index = first_optimize_index + offset + new_op_role = OpRole.Optimize + else: + index = index + offset + 1 + new_op_role = OpRole.Backward + sync_comm_op = block._insert_op_without_sync( + index=index, + type="c_sync_comm_stream", + inputs={'X': [var]}, + outputs={'Out': [var]}, + attrs={ + 'op_role': new_op_role, + 'ring_id': ring_id, + }, + ) + # step4: If 'send_v2' op in forward parse, set 'pipeline_flag' to distinguish + # whether the 'c_sync_comm_stream' op is inserted for pipeline. + if int(op_role) == int(OpRole.Forward): + sync_comm_op._set_attr('pipeline_flag', '') + offset += 1 + block._sync_with_cpp() + + offset = 0 + backward_recv_index = None + for index, op in enumerate(block.ops): + if op.type == "recv_v2" and is_backward_op(op): + backward_recv_index = index + break + if backward_recv_index is None: + continue + + # replace 'c_sync_comm_stream' op with 'nop' op + # use nop op for gc + for index, op in enumerate(list(block.ops)): + if index >= backward_recv_index: + break + if op.type == 'c_sync_comm_stream' and op.has_attr('pipeline_flag'): + var_name = op.output_arg_names[0] + var = block.var(var_name) + block._remove_op(index + offset, sync=False) + offset -= 1 + block._insert_op_without_sync( + index=backward_recv_index, + type="nop", + inputs={'X': [var]}, + outputs={'Out': [var]}, + attrs={'op_role': OpRole.Backward}, + ) + block._sync_with_cpp() + + +def _program_for_fthenb_and_1f1b(program): + lr_prog = Program() + fwd_prog = Program() + bwd_prog = Program() + opt_prog = Program() + + for idx, src_block in enumerate(program.blocks): + if idx == 0: + lr_block = lr_prog.block(0) + fwd_block = fwd_prog.block(0) + bwd_block = bwd_prog.block(0) + opt_block = opt_prog.block(0) + else: + lr_block = lr_prog._create_block(parent_idx=src_block.parent_idx) + fwd_block = fwd_prog._create_block(parent_idx=src_block.parent_idx) + bwd_block = bwd_prog._create_block(parent_idx=src_block.parent_idx) + opt_block = opt_prog._create_block(parent_idx=src_block.parent_idx) + lr_block._set_forward_block_idx(src_block.forward_block_idx) + fwd_block._set_forward_block_idx(src_block.forward_block_idx) + bwd_block._set_forward_block_idx(src_block.forward_block_idx) + opt_block._set_forward_block_idx(src_block.forward_block_idx) + + # split the program based on the op_role + for op in src_block.ops: + if is_lr_sched_op(op): + _create_program(src_block, lr_block, op) + if is_forward_op(op): + _create_program(src_block, fwd_block, op) + elif is_backward_op(op): + _create_program(src_block, bwd_block, op) + elif is_optimize_op(op): + _create_program(src_block, opt_block, op) + else: + raise ValueError( + "The op role: " + + str(op.attr('op_role')) + + " isn't one of LRSched, Forward, Backward or Optimizer." + ) + + lr_prog._sync_with_cpp() + fwd_prog._sync_with_cpp() + bwd_prog._sync_with_cpp() + opt_prog._sync_with_cpp() + + lr_prog._rollback() + fwd_prog._rollback() + bwd_prog._rollback() + opt_prog._rollback() + + return { + "lr": lr_prog.desc, + "forward": fwd_prog.desc, + "backward": bwd_prog.desc, + "optimizer": opt_prog.desc, + } + + +@register_pass("pipeline_fthenb_scheduler") +class PipelineFThenBPass(PassBase): + def __init__(self): + super().__init__() + + def _check_self(self): + return True + + def _check_conflict(self, other_pass): + return True + + def _create_job_list(self): + job_list = [] + lr_job = core.job("lr") + job_list.append(lr_job) + for i in range(self._micro_batch_size): + forward_job = core.job("forward") + forward_job.set_micro_batch_id(i) + job_list.append(forward_job) + + for i in range(self._micro_batch_size): + backward_job = core.job("backward") + backward_job.set_micro_batch_id(i) + job_list.append(backward_job) + + opt_job = core.job("optimizer") + job_list = [] + lr_job = core.job("lr") + job_list.append(lr_job) + for i in range(self._micro_batch_size): + forward_job = core.job("forward") + forward_job.set_micro_batch_id(i) + job_list.append(forward_job) + + for i in range(self._micro_batch_size): + backward_job = core.job("backward") + backward_job.set_micro_batch_id(i) + job_list.append(backward_job) + + opt_job = core.job("optimizer") + job_list.append(opt_job) + return job_list + + def _apply_single_impl(self, main_program, startup_program, context): + self._micro_batch_size = self.get_attr("micro_batch_size") + self._program = main_program + + _insert_sync_for_fthenb_1f1b(self._program) + type_to_program = _program_for_fthenb_and_1f1b(self._program) + job_list = self._create_job_list() + + plan = core.plan(job_list, type_to_program) + context.set_attr("plan", plan) diff --git a/test/standalone_executor/test_standalone_executor_fthenb_plan.py b/test/standalone_executor/test_standalone_executor_fthenb_plan.py new file mode 100644 index 0000000000000000000000000000000000000000..6912dc0e609f8e84b3b462a0ef5966a287be9104 --- /dev/null +++ b/test/standalone_executor/test_standalone_executor_fthenb_plan.py @@ -0,0 +1,54 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from paddle import static +from paddle.distributed.passes import PassContext, new_pass + + +class TestStandaloneExecutorFThenBPlan(unittest.TestCase): + def test_standalone_executor_fthenb_plan(self): + config = {} + config["micro_batch_size"] = 4 + pass_context = PassContext() + + startup_program = static.Program() + main_program = static.Program() + + pipeline_fthenb_pass = new_pass("pipeline_fthenb_scheduler", config) + pipeline_fthenb_pass.apply( + [main_program], [startup_program], pass_context + ) + plan = pass_context.get_attr("plan") + job_type_list = [] + for job in plan.job_list(): + job_type_list.append(job.type()) + expect_job_type_list = [ + "lr", + "forward", + "forward", + "forward", + "forward", + "backward", + "backward", + "backward", + "backward", + "optimizer", + ] + self.assertEqual(job_type_list, expect_job_type_list) + + +if __name__ == '__main__': + unittest.main()