From 251f68e7a3f3686b11cbbc2eb72d3f2f22cbba36 Mon Sep 17 00:00:00 2001 From: xiongkun Date: Mon, 8 Nov 2021 09:52:53 +0800 Subject: [PATCH] Add Support for OperatorBase in new executor (#36945) * add scope as membership * functions complete * fix bugs: garbage collectior * deal unknow variable holder * add * 1. add unittest for operator_base * code format --- .../framework/new_executor/interpretercore.cc | 20 ++- .../interpretercore_garbage_collector.cc | 9 + .../new_executor/interpretercore_util.cc | 167 +++++++++++------- .../new_executor/new_executor_defs.h | 23 ++- .../test_standalone_controlflow.py | 131 ++++++++++++++ 5 files changed, 274 insertions(+), 76 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 5b00214566c..cb6fb8c1a7f 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -316,13 +316,18 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { auto place = instr_node.DeviceContext().GetPlace(); VLOG(4) << place << " " << op->DebugStringEx(global_scope_); + auto op_with_kernel = dynamic_cast(op); { platform::RecordEvent infershape_event("InferShape"); - static_cast(instr_node.OpBase()) - ->InferShape(instr_node.InnerInferShapeContext().get()); + // If it is OperatorBase, InferShape do nothing. + if (op_with_kernel != nullptr) + op_with_kernel->InferShape(instr_node.InnerInferShapeContext().get()); } - if (FLAGS_new_executor_use_inplace) { + if (op_with_kernel != nullptr && + FLAGS_new_executor_use_inplace) { // TODO(xiongkun03) Does operator + // base support + // inplace ? for (auto& pair : instr_node.InplaceInfo()) { const auto& in = paddle::framework::details::GetTensorFromVar(pair.first); auto* out = @@ -334,7 +339,10 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { } { platform::RecordEvent compute_event("Compute"); - instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get()); + if (op_with_kernel == nullptr) + instr_node.OpBase()->Run(*global_scope_->GetScope(), place_); + else + instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get()); } VLOG(3) << place << " " << op->DebugStringEx(global_scope_); @@ -357,7 +365,9 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { // for debug nan/inf if (FLAGS_check_nan_inf) { VLOG(4) << "Check nan/inf"; - framework::details::CheckOpHasNanOrInf(*op, *global_scope_, place); + framework::details::CheckOpHasNanOrInf( + *op, *global_scope_, + place); // TODO(xiongkun03) change it to inner scope. } } diff --git a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc index 2ae84d9dcdd..3025e23445e 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc @@ -58,6 +58,11 @@ void InterpreterCoreGarbageCollector::Add(paddle::framework::Variable* var, const platform::DeviceContext* ctx) { if (var->IsType()) { Add(var->GetMutable()->MoveMemoryHolder(), event, ctx); + } else if (var->IsType< + operators::reader:: + OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { + // var->Clear(); // TODO(xiongkun03) can we clear directly? Why we must use + // Add interface? } else if (var->IsType()) { Add(var->GetMutable()->mutable_value()->MoveMemoryHolder(), event, ctx); @@ -66,6 +71,10 @@ void InterpreterCoreGarbageCollector::Add(paddle::framework::Variable* var, for (auto& t : *tensor_arr) { Add(t.MoveMemoryHolder(), event, ctx); } + } else if (var->IsType>()) { + // NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE + // refer to executor.cc to see what old garbage collector does. + // do nothing, because the sub scope will be deleted by sub-executor. } else { PADDLE_THROW(platform::errors::Unimplemented( "The variable(%s) is not supported in eager deletion.", diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index ba328cf1a58..cc5ed6eff16 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -229,6 +229,28 @@ void apply_device_guard(const OperatorBase* op_base, } } +void deal_operator_base(const platform::Place& place, + const VariableScope* var_scope, OperatorBase* op_base, + OpFuncNode* op_func_node) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto* dev_ctx = pool.Get(place); + // input, output is prepared. set the other attributes. + op_func_node->operator_base_ = op_base; + op_func_node->type_ = OpFuncType::kQueueSync; // alway Sync + op_func_node->kernel_func_ = nullptr; + op_base->Run(*var_scope->GetScope(), place); // Run without data transformer. + + std::unordered_set no_data_transform_index; + for (auto& it : op_func_node->input_index) { + for (auto& id : it.second) { + no_data_transform_index.emplace(id); + } + } + op_func_node->no_data_transform_index = + no_data_transform_index; // all index is no-need-transform + op_func_node->dev_ctx_ = dev_ctx; +} + // the return value is whether data transformer is needed for this var bool need_place_transform_for_var(const OpKernelType& kernel_type_for_var, const OpKernelType& expected_kernel_key) { @@ -429,80 +451,89 @@ void build_op_func_list(const platform::Place& place, OpFuncNode op_func_node; op_func_node.input_index = ins_name2id; op_func_node.output_index = outs_name2id; - // construct RuntimeContext and analysis KernelType - RuntimeContext runtime_context({}, {}); - runtime_context.inputs.swap(ins_map); - runtime_context.outputs.swap(outs_map); - InterpretercoreInferShapeContext infer_shape_ctx(*op_base, runtime_context); - // TODO(Aurelius84): In case of control flow ops, they are NOT inheritted - // from OperatorWithKernel. - static_cast(op_base)->InferShape( - &infer_shape_ctx); - auto kernels_iter = all_op_kernels.find(op->Type()); - PADDLE_ENFORCE_NE( - kernels_iter, all_op_kernels.end(), - platform::errors::Unavailable( - "There are no kernels which are registered in the %s operator.", - op->Type())); - - OpKernelMap& kernels = kernels_iter->second; - - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto* dev_ctx = pool.Get(place); - Scope scope; - auto expected_kernel_key = - dynamic_cast(op_base) - ->GetExpectedKernelType( - ExecutionContext(*op_base, scope, *dev_ctx, runtime_context)); - - // consider device_guard() - apply_device_guard( - op_base, place, - &expected_kernel_key); // change device by the device_guard() - VLOG(3) << "expected_kernel_key : " << expected_kernel_key; - - // step 3. apply data transforms and insert memory ops - VariableValueMap& ins_map_temp = runtime_context.inputs; - std::vector copy_op_to_insert; - // NOTE(xiongkun03): assign op_base here to reduce parameter number of - // apply_data_transform. - op_func_node.operator_base_ = op_base; - copy_op_to_insert = apply_data_transform( - expected_kernel_key, place, ins_map_temp, var_scope, op_func_node); - for (auto& item : copy_op_to_insert) { - vec_func_list->push_back(item); - } - // step 4. Run op kernel - VLOG(3) << op_base->Type() - << " : expected_kernel_key : " << expected_kernel_key; - - if (platform::is_gpu_place(expected_kernel_key.place_)) { - op_func_node.type_ = OpFuncType::kQueueAsync; - } else if (platform::is_cpu_place(expected_kernel_key.place_)) { - op_func_node.type_ = OpFuncType::kQueueSync; + + if (dynamic_cast(op_base) == + nullptr) { + // op is not a operatorwithkernel, so direcly run OperatorBase::Run() + deal_operator_base(place, var_scope, op_base, &op_func_node); } else { - PADDLE_THROW(platform::errors::Fatal("Unsupported current place %s", - expected_kernel_key.place_)); - } + // construct RuntimeContext and analysis KernelType + RuntimeContext runtime_context({}, {}); + runtime_context.inputs.swap(ins_map); + runtime_context.outputs.swap(outs_map); + InterpretercoreInferShapeContext infer_shape_ctx(*op_base, + runtime_context); + // TODO(Aurelius84): In case of control flow ops, they are NOT inheritted + // from OperatorWithKernel. + static_cast(op_base)->InferShape( + &infer_shape_ctx); + auto kernels_iter = all_op_kernels.find(op->Type()); + PADDLE_ENFORCE_NE( + kernels_iter, all_op_kernels.end(), + platform::errors::Unavailable( + "There are no kernels which are registered in the %s operator.", + op->Type())); + + OpKernelMap& kernels = kernels_iter->second; + + platform::DeviceContextPool& pool = + platform::DeviceContextPool::Instance(); + auto* dev_ctx = pool.Get(place); + Scope scope; + auto expected_kernel_key = + dynamic_cast(op_base) + ->GetExpectedKernelType( + ExecutionContext(*op_base, scope, *dev_ctx, runtime_context)); + + // consider device_guard() + apply_device_guard( + op_base, place, + &expected_kernel_key); // change device by the device_guard() + VLOG(3) << "expected_kernel_key : " << expected_kernel_key; + + // step 3. apply data transforms and insert memory ops + VariableValueMap& ins_map_temp = runtime_context.inputs; + std::vector copy_op_to_insert; + // NOTE(xiongkun03): assign op_base here to reduce parameter number of + // apply_data_transform. + op_func_node.operator_base_ = op_base; + copy_op_to_insert = apply_data_transform( + expected_kernel_key, place, ins_map_temp, var_scope, op_func_node); + for (auto& item : copy_op_to_insert) { + vec_func_list->push_back(item); + } + // step 4. Run op kernel + VLOG(3) << op_base->Type() + << " : expected_kernel_key : " << expected_kernel_key; + + if (platform::is_gpu_place(expected_kernel_key.place_)) { + op_func_node.type_ = OpFuncType::kQueueAsync; + } else if (platform::is_cpu_place(expected_kernel_key.place_)) { + op_func_node.type_ = OpFuncType::kQueueSync; + } else { + PADDLE_THROW(platform::errors::Fatal("Unsupported current place %s", + expected_kernel_key.place_)); + } + if (!(expected_kernel_key.place_ == dev_ctx->GetPlace())) { + dev_ctx = pool.Get(expected_kernel_key.place_); + } + op_func_node.dev_ctx_ = dev_ctx; - if (!(expected_kernel_key.place_ == dev_ctx->GetPlace())) { - dev_ctx = pool.Get(expected_kernel_key.place_); - } - op_func_node.dev_ctx_ = dev_ctx; + auto exec_ctx = + ExecutionContext(*op_base, scope, *dev_ctx, runtime_context); - auto exec_ctx = - ExecutionContext(*op_base, scope, *dev_ctx, runtime_context); + auto kernel_iter = kernels.find(expected_kernel_key); + PADDLE_ENFORCE_NE( + kernel_iter, kernels.end(), + platform::errors::NotFound( + "Operator (%s) does not have kernel for %s.", op->Type(), + KernelTypeToString(expected_kernel_key))); - auto kernel_iter = kernels.find(expected_kernel_key); - PADDLE_ENFORCE_NE(kernel_iter, kernels.end(), - platform::errors::NotFound( - "Operator (%s) does not have kernel for %s.", - op->Type(), KernelTypeToString(expected_kernel_key))); + op_func_node.kernel_func_ = OpKernelComputeFunc(kernel_iter->second); + op_func_node.kernel_func_(exec_ctx); + } - op_func_node.kernel_func_ = OpKernelComputeFunc(kernel_iter->second); - op_func_node.kernel_func_(exec_ctx); vec_func_list->push_back(op_func_node); - // gc--------------------------------------------------------------------------- auto iter = unused_var_map.find(op_base); if (iter == unused_var_map.end()) { diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index 580a0ed8d5f..0432aa33d7d 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -472,6 +472,12 @@ struct VariableMetaInfo { }; // TODO(zhiqiu): Maybe we need to add rwlock for VariableScope? + +// NOTE(xiongkun03): Use scope as a member of VariableScope, we don't need +// ScopeBase. +// Scope manager the variables and VariableScope is just a +// quick +// access machanism. class VariableScope : public ScopeBase { public: VariableScope() { @@ -482,7 +488,10 @@ class VariableScope : public ScopeBase { info.var_ref_count_ = 0; info.vardesc_ = nullptr; vec_meta_info_.push_back(info); + scope_ptr_.reset(new Scope()); } + const Scope* GetScope() const { return scope_ptr_.get(); } + Variable* FindVar(const std::string& name) const { auto it = name2id_.find(name); if (it != name2id_.end()) { @@ -540,11 +549,14 @@ class VariableScope : public ScopeBase { void AddVar(const std::string& name, VarDesc* var_desc) { // NOLINT name2id_[name] = VarSize(); - auto v = new Variable(); + auto v = scope_ptr_->Var(name); if (nullptr == var_desc) { v->GetMutable(); } else { - InitializeVariable(v, var_desc->GetType()); + InitializeVariable( + v, + var_desc + ->GetType()); // Scope don't initialize variable recently created } var_list_.push_back(v); @@ -555,8 +567,12 @@ class VariableScope : public ScopeBase { } void AddVar(const std::string& name, Variable& var) { // NOLINT + // must copy. + VLOG(4) << "Add variable: " << name << " through AddVar()"; + auto v = scope_ptr_->Var(name); + *v = var; name2id_[name] = VarSize(); - var_list_.push_back(&var); + var_list_.push_back(v); VariableMetaInfo info; info.var_ref_count_ = 0; @@ -595,6 +611,7 @@ class VariableScope : public ScopeBase { std::vector var_list_; std::map name2id_; std::vector vec_meta_info_; + std::unique_ptr scope_ptr_; }; class NextInstruction { diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py new file mode 100644 index 00000000000..7c1497a4853 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py @@ -0,0 +1,131 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import unittest +import paddle +from paddle.fluid import core +from paddle.fluid.core import StandaloneExecutor +import paddle.fluid as fluid +from paddle.fluid.framework import Program, program_guard +import paddle.fluid.layers as layers + +import numpy as np + +paddle.enable_static() + + +# test the compatibility of new executor: run old +# and new executor twice and check the result. +# please override the _get_feeds() and build_prgram() +class TestCompatibility(unittest.TestCase): + def setUp(self): + self.place = paddle.CUDAPlace(0) if core.is_compiled_with_cuda( + ) else paddle.CPUPlace() + self.iter_run = 4 + + def _get_feed(self): + """ return the feeds + """ + return None + + def build_program(self): + def true_func(): + return layers.fill_constant( + shape=[1, 2], dtype='int32', value=1), layers.fill_constant( + shape=[2, 3], dtype='bool', value=True) + + def false_func(): + return layers.fill_constant( + shape=[3, 4], dtype='float32', value=3), layers.fill_constant( + shape=[4, 5], dtype='int64', value=2) + + main_program = Program() + startup_program = Program() + with program_guard(main_program, startup_program): + x = layers.fill_constant(shape=[1], dtype='float32', value=0.1) + y = layers.fill_constant(shape=[1], dtype='float32', value=0.23) + pred = layers.less_than(x, y) + out = layers.cond(pred, true_func, false_func) + # out is a tuple containing 2 tensors + return main_program, startup_program, out + + def _run(self, feed): + paddle.seed(2020) + + main_program, startup_program, fetch_vars = self.build_program() + + exe = paddle.static.Executor(self.place) + exe.run(startup_program) + ret = [] + for i in range(self.iter_run): + ret.append(exe.run(main_program, feed=feed, fetch_list=fetch_vars)) + return ret + + def run_raw_executor(self, feed): + out = self._run(feed) + print("GT:", out) + return out + + def run_new_executor(self, feed): + os.environ['FLAGS_USE_STANDALONE_EXECUTOR'] = '1' + out = self._run(feed) + del os.environ['FLAGS_USE_STANDALONE_EXECUTOR'] + print("New:", out) + return out + + def test_with_feed(self): + feed = self._get_feed() + res = self.run_new_executor(feed) + gt = self.run_raw_executor(feed) + for x, y in zip(gt, res): + if isinstance(x, list): + for tx, ty in zip(x, y): + self.assertTrue(np.array_equal(tx, ty)) + elif isinstance(x, np.ndarray): + self.assertTrue(np.array_equal(tx, ty)) + else: + raise Exception("Not Implement!") + + +class TestWhile(TestCompatibility): + def _get_feed(self): + """ return the feeds + """ + return None + + def build_program(self): + def cond(i, ten): + return i < ten + + def body(i, ten): + i = i + 1 + return [i, ten] + + main_program = paddle.static.default_main_program() + startup_program = paddle.static.default_startup_program() + with paddle.static.program_guard(main_program, startup_program): + i = paddle.full( + shape=[1], fill_value=0, dtype='int64') # loop counter + ten = paddle.full( + shape=[1], fill_value=10, dtype='int64') # loop length + i, ten = paddle.static.nn.while_loop(cond, body, [i, ten]) + + exe = paddle.static.Executor(paddle.CPUPlace()) + return main_program, startup_program, i + + +if __name__ == "__main__": + unittest.main() -- GitLab