From 9b5f614062645be6c839687dfea26ba3702ba196 Mon Sep 17 00:00:00 2001 From: lzydev Date: Tue, 22 Aug 2023 15:03:48 +0800 Subject: [PATCH] Optimize the memory in the case of using `pipeline` strategy and new executor (#56397) * optimize the memory * fix bug in static_build.cc * fix bug when using logging * change the static_build * fix bug in windows * fix code accordding to review --- .../new_executor/interpreter/static_build.cc | 13 +++++++++++++ .../new_executor/program_interpreter.cc | 15 +++++++++++++++ .../new_executor/program_interpreter.h | 2 ++ .../new_executor/standalone_executor.cc | 17 +++++++++++++++++ .../passes/auto_parallel_gradient_merge.py | 19 ++++++++++--------- .../paddle/distributed/passes/pass_utils.py | 4 +++- .../distributed/passes/pipeline_pass_base.py | 10 ++++++++++ .../passes/pipeline_scheduler_pass.py | 2 ++ 8 files changed, 72 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpreter/static_build.cc b/paddle/fluid/framework/new_executor/interpreter/static_build.cc index 10d75f1be6f..10f244b535c 100644 --- a/paddle/fluid/framework/new_executor/interpreter/static_build.cc +++ b/paddle/fluid/framework/new_executor/interpreter/static_build.cc @@ -38,6 +38,7 @@ std::set OpsCanSkipedFakeAllocInStaticBuild = { "create_py_reader", "depend", "fetch_v2", + "send_v2", "nop"}; std::set StaticBuildBlackList = { @@ -47,6 +48,14 @@ std::set StaticBuildBlackList = { "run_program" /*: to handle scope output*/, "sparse_sparse_coo_tensor" /*: to handle sparse output*/}; +// TODO(lizhiyu): This operator list is only for pipeline strategy temporarily. +std::set SkipCheckForPipelineTempList = {"c_broadcast", + "c_allreduce_sum", + "c_allgather", + "layer_norm", + "recv_v2", + "reshape2_grad", + "c_identity"}; namespace paddle { namespace framework { namespace interpreter { @@ -63,6 +72,10 @@ bool BlockCanBeStaticBuilt(const framework::BlockDesc& block) { std::set> invalid_ops; for (auto& op : block.AllOps()) { auto op_type = op->Type(); + if (SkipCheckForPipelineTempList.find(op_type) != + SkipCheckForPipelineTempList.end()) { + continue; + } const framework::OpInfo& info = OpInfoMap::Instance().Get(op_type); auto op_base = info.Creator()(op_type, op->Inputs(), op->Outputs(), op->GetAttrMap()); diff --git a/paddle/fluid/framework/new_executor/program_interpreter.cc b/paddle/fluid/framework/new_executor/program_interpreter.cc index e288804e09a..c31e0355f8d 100644 --- a/paddle/fluid/framework/new_executor/program_interpreter.cc +++ b/paddle/fluid/framework/new_executor/program_interpreter.cc @@ -30,6 +30,9 @@ #endif #include "paddle/fluid/platform/cuda_graph_with_memory_pool.h" #include "paddle/phi/backends/device_manager.h" +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/platform/device/gpu/nccl_helper.h" +#endif namespace paddle { namespace framework { @@ -1187,6 +1190,18 @@ void ProgramInterpreter::RecordStreamForGC(const Instruction& instr) { gpuStream_t stream = reinterpret_cast(instr.DeviceContext()).stream(); +// TODO(lizhiyu): Only analyse the 'send_v2' for GPT pp strategy right now. +// To support all the operators for communicating in the future. +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + auto operator_base_ptr = instr.OpBase(); + if ((operator_base_ptr->Type() == "send_v2") && + (operator_base_ptr->Attr("use_calc_stream") == false)) { + stream = platform::NCCLCommContext::Instance() + .Get(operator_base_ptr->Attr("ring_id"), + instr.DeviceContext().GetPlace()) + ->stream(); + } +#endif auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) { auto allocation = tensor.Holder(); if (allocation == nullptr) { diff --git a/paddle/fluid/framework/new_executor/program_interpreter.h b/paddle/fluid/framework/new_executor/program_interpreter.h index b258106cba9..29ec71059c7 100644 --- a/paddle/fluid/framework/new_executor/program_interpreter.h +++ b/paddle/fluid/framework/new_executor/program_interpreter.h @@ -81,6 +81,8 @@ class ProgramInterpreter : public InterpreterBaseImpl { hookfuncs_ = hookfuncs; } + bool IsStaticBuild() const { return static_build_; } + private: // build graph void Convert(std::vector* op_func_nodes); diff --git a/paddle/fluid/framework/new_executor/standalone_executor.cc b/paddle/fluid/framework/new_executor/standalone_executor.cc index 0e1d2de6bed..d0b93b83c6f 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor.cc +++ b/paddle/fluid/framework/new_executor/standalone_executor.cc @@ -15,6 +15,7 @@ #include "paddle/fluid/framework/new_executor/feed_fetch_utils.h" #include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h" +#include "paddle/fluid/framework/new_executor/program_interpreter.h" #include "paddle/fluid/platform/profiler/event_tracing.h" #include "paddle/fluid/ir/transforms/pd_op_to_kernel_pass.h" @@ -103,6 +104,22 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, micro_batch_scopes_[micro_batch_id], execution_config)); interpretercores_.back()->SetCopyProgram(program); + // NOTE(lizhiyu): Now we only check backward subprogram. After static + // build strategy is completely, we should + // check all the program in the PP strategy. + if (job_type == "backward" && jobs.size() > 1) { + PADDLE_ENFORCE_EQ(static_cast( + interpretercores_.back()->Impl()) + ->IsStaticBuild(), + true, + phi::errors::InvalidArgument( + "When using pipeline strategy in auto " + "prarallelism with new executor, " + "the backward subprogram must be builded in real " + "static build mode, but it can not " + "be staticly builded in this case. You can " + "enable 'GLOG_v=1' to obtain log information.")); + } } } } diff --git a/python/paddle/distributed/passes/auto_parallel_gradient_merge.py b/python/paddle/distributed/passes/auto_parallel_gradient_merge.py index 21816c34ee4..68bda168469 100644 --- a/python/paddle/distributed/passes/auto_parallel_gradient_merge.py +++ b/python/paddle/distributed/passes/auto_parallel_gradient_merge.py @@ -38,15 +38,13 @@ def _remove_and_get_optimizer_op(main_program, dist_context): # 2 mv optimizer op from global program to tmp block # 3 del the op from dist_context main_block = main_program.global_block() - temp_block = main_program._create_block() + optimize_ops_block = paddle.static.Program().global_block() removed_op_idx = [] - optimize_ops_desc = [] for idx, op in enumerate(main_block.ops): if is_optimize_op(op): # append optimizer op to tmp block - new_op_desc = temp_block.desc.append_op() + new_op_desc = optimize_ops_block.desc.append_op() new_op_desc.copy_from(op.desc) - optimize_ops_desc.append(new_op_desc) removed_op_idx.append(idx) # del op from dist_context @@ -57,7 +55,7 @@ def _remove_and_get_optimizer_op(main_program, dist_context): main_block._remove_op(idx, sync=False) main_block._sync_with_cpp() - return optimize_ops_desc + return optimize_ops_block def _get_gm_cond_var(main_program, k_steps, dist_context): @@ -228,7 +226,7 @@ def _create_cond_block_and_update_optimizer( cond_var, new_params_to_grads: List[Tuple[Any, Any]], grad_to_gradient_merge: Dict[str, str], - optimize_ops_desc: List[Any], + optimize_ops_block, k_steps, avg, ): @@ -255,7 +253,8 @@ def _create_cond_block_and_update_optimizer( new_grad.op._set_attr(OP_ROLE_KEY, OpRole.Optimize) # append optimizer ops - for op_desc in optimize_ops_desc: + for opt_op_idx in range(optimize_ops_block.desc.op_size()): + op_desc = optimize_ops_block.desc.op(opt_op_idx) new_op_desc = cur_block.desc.append_op() new_op_desc.copy_from(op_desc) @@ -305,7 +304,9 @@ def parse_program( main_program, startup_program, params_grads, k_steps, avg, dist_context ): # 1 remove optimizer_op from main_program - optimize_ops_desc = _remove_and_get_optimizer_op(main_program, dist_context) + optimize_ops_block = _remove_and_get_optimizer_op( + main_program, dist_context + ) # back to block 0 main_program._rollback() @@ -327,7 +328,7 @@ def parse_program( cond_var, new_params_to_grads, grad_to_gradient_merge, - optimize_ops_desc, + optimize_ops_block, k_steps, avg, ) diff --git a/python/paddle/distributed/passes/pass_utils.py b/python/paddle/distributed/passes/pass_utils.py index 7fba8b76b4a..8e601f74710 100644 --- a/python/paddle/distributed/passes/pass_utils.py +++ b/python/paddle/distributed/passes/pass_utils.py @@ -230,7 +230,9 @@ def get_skip_gc_vars(program_list: List[Program]): # NOTE(Ruibiao): Some vars maybe be the arguements of conditional_block op but no-need-buffer in the actual subblock, should not add them to the required_vars. if op.type == "conditional_block": continue - + # NOTE(lizhiyu): In the PP, 'nop','c_sync_comm_stream' don't need to hold memory, because we have add special code for 'send_v2' when recording stream for gc. + if op.type == "nop" or op.type == "c_sync_comm_stream": + continue op_info = OpInOutInfo() op_info.build_info(op) for arg_name in op.input_arg_names + op.output_arg_names: diff --git a/python/paddle/distributed/passes/pipeline_pass_base.py b/python/paddle/distributed/passes/pipeline_pass_base.py index 14dce9065af..c18a215012e 100644 --- a/python/paddle/distributed/passes/pipeline_pass_base.py +++ b/python/paddle/distributed/passes/pipeline_pass_base.py @@ -12,11 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +from paddle.distributed.auto_parallel.static.utils import get_logger from paddle.fluid import core from .pass_base import PassBase from .pass_utils import get_skip_gc_vars +_logger = get_logger(logging.INFO) + class PipelinePassBase(PassBase): def __init__(self): @@ -60,6 +65,11 @@ class PipelinePassBase(PassBase): type_to_gc_vars = {} for type, gc_var in zip(type_list, gc_vars_list): type_to_gc_vars[type] = gc_var + _logger.info(f"The skip_gc_vars : {gc_vars_list}") + if "backward" in type_to_gc_vars: + assert ( + len(type_to_gc_vars["backward"]) == 0 + ), f"When enabling pipeline parallelism stategy, the skip_gc_vars_set for backward subprogram must be empty, but it is {type_to_gc_vars['backward']}." for job in job_list: job.set_skip_gc_vars(type_to_gc_vars[job.type()]) diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass.py b/python/paddle/distributed/passes/pipeline_scheduler_pass.py index 9914b6e517c..dc45e73f77c 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass.py @@ -50,6 +50,7 @@ class PipelineFThenBPass(PipelinePassBase): job_list.append(backward_job) opt_job = core.Job("optimizer") + opt_job.set_micro_batch_id(0) job_list.append(opt_job) return job_list @@ -105,6 +106,7 @@ class Pipeline1F1BPass(PipelinePassBase): backward_micro_batch_id += 1 opt_job = core.Job("optimizer") + opt_job.set_micro_batch_id(0) job_list.append(opt_job) return job_list -- GitLab