未验证 提交 9b5f6140 编写于 作者: L lzydev 提交者: GitHub

Optimize the memory in the case of using `pipeline` strategy and new executor (#56397)

* optimize the memory

* fix bug in static_build.cc

* fix bug when using logging

* change the static_build

* fix bug in windows

* fix code accordding to review
上级 5dc7ff04
...@@ -38,6 +38,7 @@ std::set<std::string> OpsCanSkipedFakeAllocInStaticBuild = { ...@@ -38,6 +38,7 @@ std::set<std::string> OpsCanSkipedFakeAllocInStaticBuild = {
"create_py_reader", "create_py_reader",
"depend", "depend",
"fetch_v2", "fetch_v2",
"send_v2",
"nop"}; "nop"};
std::set<std::string> StaticBuildBlackList = { std::set<std::string> StaticBuildBlackList = {
...@@ -47,6 +48,14 @@ std::set<std::string> StaticBuildBlackList = { ...@@ -47,6 +48,14 @@ std::set<std::string> StaticBuildBlackList = {
"run_program" /*: to handle scope output*/, "run_program" /*: to handle scope output*/,
"sparse_sparse_coo_tensor" /*: to handle sparse output*/}; "sparse_sparse_coo_tensor" /*: to handle sparse output*/};
// TODO(lizhiyu): This operator list is only for pipeline strategy temporarily.
std::set<std::string> SkipCheckForPipelineTempList = {"c_broadcast",
"c_allreduce_sum",
"c_allgather",
"layer_norm",
"recv_v2",
"reshape2_grad",
"c_identity"};
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace interpreter { namespace interpreter {
...@@ -63,6 +72,10 @@ bool BlockCanBeStaticBuilt(const framework::BlockDesc& block) { ...@@ -63,6 +72,10 @@ bool BlockCanBeStaticBuilt(const framework::BlockDesc& block) {
std::set<std::pair<std::string, KernelCode>> invalid_ops; std::set<std::pair<std::string, KernelCode>> invalid_ops;
for (auto& op : block.AllOps()) { for (auto& op : block.AllOps()) {
auto op_type = op->Type(); auto op_type = op->Type();
if (SkipCheckForPipelineTempList.find(op_type) !=
SkipCheckForPipelineTempList.end()) {
continue;
}
const framework::OpInfo& info = OpInfoMap::Instance().Get(op_type); const framework::OpInfo& info = OpInfoMap::Instance().Get(op_type);
auto op_base = auto op_base =
info.Creator()(op_type, op->Inputs(), op->Outputs(), op->GetAttrMap()); info.Creator()(op_type, op->Inputs(), op->Outputs(), op->GetAttrMap());
......
...@@ -30,6 +30,9 @@ ...@@ -30,6 +30,9 @@
#endif #endif
#include "paddle/fluid/platform/cuda_graph_with_memory_pool.h" #include "paddle/fluid/platform/cuda_graph_with_memory_pool.h"
#include "paddle/phi/backends/device_manager.h" #include "paddle/phi/backends/device_manager.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -1187,6 +1190,18 @@ void ProgramInterpreter::RecordStreamForGC(const Instruction& instr) { ...@@ -1187,6 +1190,18 @@ void ProgramInterpreter::RecordStreamForGC(const Instruction& instr) {
gpuStream_t stream = gpuStream_t stream =
reinterpret_cast<const phi::GPUContext&>(instr.DeviceContext()).stream(); reinterpret_cast<const phi::GPUContext&>(instr.DeviceContext()).stream();
// TODO(lizhiyu): Only analyse the 'send_v2' for GPT pp strategy right now.
// To support all the operators for communicating in the future.
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
auto operator_base_ptr = instr.OpBase();
if ((operator_base_ptr->Type() == "send_v2") &&
(operator_base_ptr->Attr<bool>("use_calc_stream") == false)) {
stream = platform::NCCLCommContext::Instance()
.Get(operator_base_ptr->Attr<int>("ring_id"),
instr.DeviceContext().GetPlace())
->stream();
}
#endif
auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) { auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) {
auto allocation = tensor.Holder(); auto allocation = tensor.Holder();
if (allocation == nullptr) { if (allocation == nullptr) {
......
...@@ -81,6 +81,8 @@ class ProgramInterpreter : public InterpreterBaseImpl { ...@@ -81,6 +81,8 @@ class ProgramInterpreter : public InterpreterBaseImpl {
hookfuncs_ = hookfuncs; hookfuncs_ = hookfuncs;
} }
bool IsStaticBuild() const { return static_build_; }
private: private:
// build graph // build graph
void Convert(std::vector<paddle::framework::OpFuncNode>* op_func_nodes); void Convert(std::vector<paddle::framework::OpFuncNode>* op_func_nodes);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "paddle/fluid/framework/new_executor/feed_fetch_utils.h" #include "paddle/fluid/framework/new_executor/feed_fetch_utils.h"
#include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h" #include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h"
#include "paddle/fluid/framework/new_executor/program_interpreter.h"
#include "paddle/fluid/platform/profiler/event_tracing.h" #include "paddle/fluid/platform/profiler/event_tracing.h"
#include "paddle/fluid/ir/transforms/pd_op_to_kernel_pass.h" #include "paddle/fluid/ir/transforms/pd_op_to_kernel_pass.h"
...@@ -103,6 +104,22 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, ...@@ -103,6 +104,22 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
micro_batch_scopes_[micro_batch_id], micro_batch_scopes_[micro_batch_id],
execution_config)); execution_config));
interpretercores_.back()->SetCopyProgram(program); interpretercores_.back()->SetCopyProgram(program);
// NOTE(lizhiyu): Now we only check backward subprogram. After static
// build strategy is completely, we should
// check all the program in the PP strategy.
if (job_type == "backward" && jobs.size() > 1) {
PADDLE_ENFORCE_EQ(static_cast<const ProgramInterpreter*>(
interpretercores_.back()->Impl())
->IsStaticBuild(),
true,
phi::errors::InvalidArgument(
"When using pipeline strategy in auto "
"prarallelism with new executor, "
"the backward subprogram must be builded in real "
"static build mode, but it can not "
"be staticly builded in this case. You can "
"enable 'GLOG_v=1' to obtain log information."));
}
} }
} }
} }
......
...@@ -38,15 +38,13 @@ def _remove_and_get_optimizer_op(main_program, dist_context): ...@@ -38,15 +38,13 @@ def _remove_and_get_optimizer_op(main_program, dist_context):
# 2 mv optimizer op from global program to tmp block # 2 mv optimizer op from global program to tmp block
# 3 del the op from dist_context # 3 del the op from dist_context
main_block = main_program.global_block() main_block = main_program.global_block()
temp_block = main_program._create_block() optimize_ops_block = paddle.static.Program().global_block()
removed_op_idx = [] removed_op_idx = []
optimize_ops_desc = []
for idx, op in enumerate(main_block.ops): for idx, op in enumerate(main_block.ops):
if is_optimize_op(op): if is_optimize_op(op):
# append optimizer op to tmp block # append optimizer op to tmp block
new_op_desc = temp_block.desc.append_op() new_op_desc = optimize_ops_block.desc.append_op()
new_op_desc.copy_from(op.desc) new_op_desc.copy_from(op.desc)
optimize_ops_desc.append(new_op_desc)
removed_op_idx.append(idx) removed_op_idx.append(idx)
# del op from dist_context # del op from dist_context
...@@ -57,7 +55,7 @@ def _remove_and_get_optimizer_op(main_program, dist_context): ...@@ -57,7 +55,7 @@ def _remove_and_get_optimizer_op(main_program, dist_context):
main_block._remove_op(idx, sync=False) main_block._remove_op(idx, sync=False)
main_block._sync_with_cpp() main_block._sync_with_cpp()
return optimize_ops_desc return optimize_ops_block
def _get_gm_cond_var(main_program, k_steps, dist_context): def _get_gm_cond_var(main_program, k_steps, dist_context):
...@@ -228,7 +226,7 @@ def _create_cond_block_and_update_optimizer( ...@@ -228,7 +226,7 @@ def _create_cond_block_and_update_optimizer(
cond_var, cond_var,
new_params_to_grads: List[Tuple[Any, Any]], new_params_to_grads: List[Tuple[Any, Any]],
grad_to_gradient_merge: Dict[str, str], grad_to_gradient_merge: Dict[str, str],
optimize_ops_desc: List[Any], optimize_ops_block,
k_steps, k_steps,
avg, avg,
): ):
...@@ -255,7 +253,8 @@ def _create_cond_block_and_update_optimizer( ...@@ -255,7 +253,8 @@ def _create_cond_block_and_update_optimizer(
new_grad.op._set_attr(OP_ROLE_KEY, OpRole.Optimize) new_grad.op._set_attr(OP_ROLE_KEY, OpRole.Optimize)
# append optimizer ops # append optimizer ops
for op_desc in optimize_ops_desc: for opt_op_idx in range(optimize_ops_block.desc.op_size()):
op_desc = optimize_ops_block.desc.op(opt_op_idx)
new_op_desc = cur_block.desc.append_op() new_op_desc = cur_block.desc.append_op()
new_op_desc.copy_from(op_desc) new_op_desc.copy_from(op_desc)
...@@ -305,7 +304,9 @@ def parse_program( ...@@ -305,7 +304,9 @@ def parse_program(
main_program, startup_program, params_grads, k_steps, avg, dist_context main_program, startup_program, params_grads, k_steps, avg, dist_context
): ):
# 1 remove optimizer_op from main_program # 1 remove optimizer_op from main_program
optimize_ops_desc = _remove_and_get_optimizer_op(main_program, dist_context) optimize_ops_block = _remove_and_get_optimizer_op(
main_program, dist_context
)
# back to block 0 # back to block 0
main_program._rollback() main_program._rollback()
...@@ -327,7 +328,7 @@ def parse_program( ...@@ -327,7 +328,7 @@ def parse_program(
cond_var, cond_var,
new_params_to_grads, new_params_to_grads,
grad_to_gradient_merge, grad_to_gradient_merge,
optimize_ops_desc, optimize_ops_block,
k_steps, k_steps,
avg, avg,
) )
......
...@@ -230,7 +230,9 @@ def get_skip_gc_vars(program_list: List[Program]): ...@@ -230,7 +230,9 @@ def get_skip_gc_vars(program_list: List[Program]):
# NOTE(Ruibiao): Some vars maybe be the arguements of conditional_block op but no-need-buffer in the actual subblock, should not add them to the required_vars. # NOTE(Ruibiao): Some vars maybe be the arguements of conditional_block op but no-need-buffer in the actual subblock, should not add them to the required_vars.
if op.type == "conditional_block": if op.type == "conditional_block":
continue continue
# NOTE(lizhiyu): In the PP, 'nop','c_sync_comm_stream' don't need to hold memory, because we have add special code for 'send_v2' when recording stream for gc.
if op.type == "nop" or op.type == "c_sync_comm_stream":
continue
op_info = OpInOutInfo() op_info = OpInOutInfo()
op_info.build_info(op) op_info.build_info(op)
for arg_name in op.input_arg_names + op.output_arg_names: for arg_name in op.input_arg_names + op.output_arg_names:
......
...@@ -12,11 +12,16 @@ ...@@ -12,11 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from paddle.distributed.auto_parallel.static.utils import get_logger
from paddle.fluid import core from paddle.fluid import core
from .pass_base import PassBase from .pass_base import PassBase
from .pass_utils import get_skip_gc_vars from .pass_utils import get_skip_gc_vars
_logger = get_logger(logging.INFO)
class PipelinePassBase(PassBase): class PipelinePassBase(PassBase):
def __init__(self): def __init__(self):
...@@ -60,6 +65,11 @@ class PipelinePassBase(PassBase): ...@@ -60,6 +65,11 @@ class PipelinePassBase(PassBase):
type_to_gc_vars = {} type_to_gc_vars = {}
for type, gc_var in zip(type_list, gc_vars_list): for type, gc_var in zip(type_list, gc_vars_list):
type_to_gc_vars[type] = gc_var type_to_gc_vars[type] = gc_var
_logger.info(f"The skip_gc_vars : {gc_vars_list}")
if "backward" in type_to_gc_vars:
assert (
len(type_to_gc_vars["backward"]) == 0
), f"When enabling pipeline parallelism stategy, the skip_gc_vars_set for backward subprogram must be empty, but it is {type_to_gc_vars['backward']}."
for job in job_list: for job in job_list:
job.set_skip_gc_vars(type_to_gc_vars[job.type()]) job.set_skip_gc_vars(type_to_gc_vars[job.type()])
......
...@@ -50,6 +50,7 @@ class PipelineFThenBPass(PipelinePassBase): ...@@ -50,6 +50,7 @@ class PipelineFThenBPass(PipelinePassBase):
job_list.append(backward_job) job_list.append(backward_job)
opt_job = core.Job("optimizer") opt_job = core.Job("optimizer")
opt_job.set_micro_batch_id(0)
job_list.append(opt_job) job_list.append(opt_job)
return job_list return job_list
...@@ -105,6 +106,7 @@ class Pipeline1F1BPass(PipelinePassBase): ...@@ -105,6 +106,7 @@ class Pipeline1F1BPass(PipelinePassBase):
backward_micro_batch_id += 1 backward_micro_batch_id += 1
opt_job = core.Job("optimizer") opt_job = core.Job("optimizer")
opt_job.set_micro_batch_id(0)
job_list.append(opt_job) job_list.append(opt_job)
return job_list return job_list
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册