From 5382994ee47486f3870c7ae308ef3deac81f0347 Mon Sep 17 00:00:00 2001 From: Wilber Date: Mon, 13 Apr 2020 16:48:17 +0800 Subject: [PATCH] lite cuda support exec multi-stream. (#2949) lite cuda support exec multi-stream --- lite/api/cxx_api_impl.cc | 6 +- lite/api/paddle_api.h | 7 + lite/api/paddle_use_passes.h | 1 + lite/backends/cuda/CMakeLists.txt | 2 + lite/backends/cuda/context.cc | 19 ++ lite/backends/cuda/context.h | 170 ++++++++++ lite/core/CMakeLists.txt | 2 +- lite/core/context.h | 107 +------ lite/core/device_info.h | 7 +- lite/core/mir/CMakeLists.txt | 1 + lite/core/mir/generate_program_pass.cc | 30 +- lite/core/mir/graph_visualize_pass.cc | 34 +- lite/core/mir/multi_stream_analysis_pass.cc | 313 +++++++++++++++++++ lite/core/mir/multi_stream_analysis_pass.h | 85 +++++ lite/core/mir/node.h | 7 + lite/core/mir/pass.h | 25 ++ lite/core/mir/runtime_context_assign_pass.cc | 5 +- lite/core/optimizer.h | 16 +- lite/core/profile/precision_profiler.h | 7 + lite/core/program.cc | 5 + lite/core/program.h | 12 + 21 files changed, 747 insertions(+), 114 deletions(-) create mode 100644 lite/backends/cuda/context.cc create mode 100644 lite/backends/cuda/context.h create mode 100644 lite/core/mir/multi_stream_analysis_pass.cc create mode 100644 lite/core/mir/multi_stream_analysis_pass.h diff --git a/lite/api/cxx_api_impl.cc b/lite/api/cxx_api_impl.cc index ccd7c98138..659ebdb0bf 100644 --- a/lite/api/cxx_api_impl.cc +++ b/lite/api/cxx_api_impl.cc @@ -32,12 +32,17 @@ namespace lite { void CxxPaddleApiImpl::Init(const lite_api::CxxConfig &config) { config_ = config; auto places = config.valid_places(); + std::vector passes{}; #ifdef LITE_WITH_CUDA // if kCUDA is included in valid places, it should be initialized first, // otherwise skip this step. for (auto &p : places) { if (p.target == TARGET(kCUDA)) { Env::Init(); + if (config_.multi_stream()) { + passes = {"multi_stream_analysis_pass"}; + VLOG(3) << "add pass: " << passes[0]; + } break; } } @@ -51,7 +56,6 @@ void CxxPaddleApiImpl::Init(const lite_api::CxxConfig &config) { config.mlu_first_conv_std(), config.mlu_input_layout()); #endif // LITE_WITH_MLU - std::vector passes{}; auto use_layout_preprocess_pass = config.model_dir().find("OPENCL_PRE_PRECESS"); VLOG(1) << "use_layout_preprocess_pass:" << use_layout_preprocess_pass; diff --git a/lite/api/paddle_api.h b/lite/api/paddle_api.h index ce0f0e15d8..79ab98da79 100644 --- a/lite/api/paddle_api.h +++ b/lite/api/paddle_api.h @@ -136,6 +136,9 @@ class LITE_API CxxConfig : public ConfigBase { #ifdef LITE_WITH_X86 int x86_math_library_math_threads_ = 1; #endif +#ifdef LITE_WITH_CUDA + bool multi_stream_{false}; +#endif #ifdef LITE_WITH_MLU lite_api::MLUCoreVersion mlu_core_version_{lite_api::MLUCoreVersion::MLU_270}; int mlu_core_number_{1}; @@ -171,6 +174,10 @@ class LITE_API CxxConfig : public ConfigBase { return x86_math_library_math_threads_; } #endif +#ifdef LITE_WITH_CUDA + void set_multi_stream(bool multi_stream) { multi_stream_ = multi_stream; } + int multi_stream() const { return multi_stream_; } +#endif #ifdef LITE_WITH_MLU // set MLU core version, which is used when compiling MLU kernels diff --git a/lite/api/paddle_use_passes.h b/lite/api/paddle_use_passes.h index 219952bd2a..1eb5af74d2 100644 --- a/lite/api/paddle_use_passes.h +++ b/lite/api/paddle_use_passes.h @@ -42,6 +42,7 @@ USE_MIR_PASS(type_precision_cast_pass); USE_MIR_PASS(type_layout_cast_pass); USE_MIR_PASS(type_layout_cast_preprocess_pass); USE_MIR_PASS(memory_optimize_pass); +USE_MIR_PASS(multi_stream_analysis_pass); USE_MIR_PASS(elementwise_mul_constant_eliminate_pass) USE_MIR_PASS(npu_subgraph_pass); USE_MIR_PASS(xpu_subgraph_pass); diff --git a/lite/backends/cuda/CMakeLists.txt b/lite/backends/cuda/CMakeLists.txt index 35f5f0ce2d..0689bb706a 100644 --- a/lite/backends/cuda/CMakeLists.txt +++ b/lite/backends/cuda/CMakeLists.txt @@ -5,5 +5,7 @@ get_property(cuda_deps GLOBAL PROPERTY CUDA_MODULES) nv_library(target_wrapper_cuda SRCS target_wrapper.cc DEPS ${cuda_deps}) nv_library(cuda_blas SRCS blas.cc DEPS ${cuda_deps}) + +lite_cc_library(cuda_context SRCS context.cc DEPS device_info) add_subdirectory(math) diff --git a/lite/backends/cuda/context.cc b/lite/backends/cuda/context.cc new file mode 100644 index 0000000000..4bac4c442c --- /dev/null +++ b/lite/backends/cuda/context.cc @@ -0,0 +1,19 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "lite/backends/cuda/context.h" + +namespace paddle { +namespace lite {} // namespace lite +} // namespace paddle diff --git a/lite/backends/cuda/context.h b/lite/backends/cuda/context.h new file mode 100644 index 0000000000..5bed30a960 --- /dev/null +++ b/lite/backends/cuda/context.h @@ -0,0 +1,170 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include "lite/backends/cuda/blas.h" +#include "lite/backends/cuda/cuda_utils.h" +#include "lite/backends/cuda/target_wrapper.h" +#include "lite/core/device_info.h" + +namespace paddle { +namespace lite { + +template +class Context; + +using CUDAContext = Context; + +// Only works with CUDA kernels. +template <> +class Context { + public: + typename Env::Devs& devs = + Env::Global(); + // NOTE: InitOnce should only be used by ContextScheduler + void InitOnce() { + if (devs.size() > 0) { + cublas_fp32_ = std::make_shared>(); + } else { + LOG(INFO) << "No cuda device(s) found, CUDAContext init failed."; + } + } + void Init(int dev_id, int exec_stream_id = 0, int io_stream_id = 0) { + CHECK_GT(devs.size(), 0UL) + << "Env is not initialized or current target is not exit!"; + if (dev_id >= static_cast(devs.size())) { + LOG(WARNING) << "device index exceeds the number of devices, set to " + "default device(0)!"; + device_id_ = 0; + } else { + device_id_ = dev_id; + } + if (io_stream_id >= devs[dev_id].max_stream()) { + LOG(WARNING) << "data stream index exceeds the maximum stream number, " + "set to default stream(0)!"; + io_stream_id = 0; + } + if (exec_stream_id >= devs[dev_id].max_stream()) { + LOG(WARNING) << "exec stream index exceeds the maximum stream number, " + "set to default stream(0)!"; + exec_stream_id = 0; + } + + exec_stream_ = devs[dev_id].exec_streams()[exec_stream_id]; + io_stream_ = devs[dev_id].io_streams()[io_stream_id]; + + exec_stream_id_ = exec_stream_id; + io_stream_id_ = io_stream_id; + need_sync_ = false; + } + void CopySharedTo(CUDAContext* ctx) { + CHECK(ctx); + CHECK(cublas_fp32_) << "cublas_fp32 should be set first"; + ctx->cublas_fp32_ = cublas_fp32_; + } + + const cudaStream_t& exec_stream() const { return exec_stream_; } + void SetExecStream(cudaStream_t stream) { exec_stream_ = stream; } + + const cudaStream_t& io_stream() const { return io_stream_; } + void SetIoStream(cudaStream_t stream) { io_stream_ = stream; } + + std::shared_ptr> cublas_fp32() { return cublas_fp32_; } + void SetCuBlasFP32(std::shared_ptr> cublas_fp32) { + cublas_fp32_ = cublas_fp32; + } + + const std::vector& input_events() { return input_events_; } + void SetInputEvents(const std::vector& input_events) { + input_events_.clear(); + input_events_.assign(input_events.begin(), input_events.end()); + } + + const std::vector& output_events() { return output_events_; } + void SetOutputEvents(const std::vector& output_events) { + output_events_.clear(); + output_events_.assign(output_events.begin(), output_events.end()); + } + + std::vector all_exec_streams() { + int dev_id = TargetWrapper::GetCurDevice(); + return devs[dev_id].exec_streams(); + } + + void SetSyncStreams(const std::vector& nums) { + sync_streams_.clear(); + std::vector exec_streams = all_exec_streams(); + for (size_t i = 0; i < nums.size(); ++i) { + CHECK(nums[i] >= 0 && nums[i] < static_cast(exec_streams.size())) + << "streams id is not valid"; + sync_streams_.push_back(exec_streams[nums[i]]); + } + InitSyncEvents(nums.size()); + } + + void InitSyncEvents(const int num) { + sync_events_.clear(); + for (int i = 0; i < num; ++i) { + cudaEvent_t eve; + TargetWrapperCuda::CreateEventWithFlags(&eve); + sync_events_.push_back(eve); + } + } + + void SetNeedSync(bool sync) { need_sync_ = sync; } + bool need_sync() const { return need_sync_; } + + void Sync() { + CHECK_EQ(sync_streams_.size(), sync_events_.size()); + for (size_t i = 0; i < sync_events_.size(); ++i) { + TargetWrapperCuda::RecordEvent(sync_events_[i], sync_streams_[i]); + TargetWrapperCuda::StreamSync(exec_stream_, sync_events_[i]); + } + } + + std::string name() const { return "CUDAContext"; } + + CUDAContext& operator=(const CUDAContext& context) { + this->Init( + context.device_id_, context.exec_stream_id_, context.io_stream_id_); + cublas_fp32_ = const_cast(context).cublas_fp32(); + return *this; + } + + private: + int device_id_; + // overall information + int exec_stream_id_; + int io_stream_id_; + cudaStream_t exec_stream_; + cudaStream_t io_stream_; + + // not thread-safe, should allocate for each thread. + std::shared_ptr> cublas_fp32_; + + // kernel information + std::vector input_events_; + std::vector output_events_; + // multi stream sync. + std::vector sync_streams_; + std::vector sync_events_; + bool need_sync_; +}; + +} // namespace lite +} // namespace paddle diff --git a/lite/core/CMakeLists.txt b/lite/core/CMakeLists.txt index 278f971b0b..6bd353a9e1 100644 --- a/lite/core/CMakeLists.txt +++ b/lite/core/CMakeLists.txt @@ -38,7 +38,7 @@ lite_cc_library(device_info SRCS device_info.cc DEPS tensor) if (LITE_WITH_ARM) lite_cc_library(context SRCS context.cc DEPS tensor any device_info CL_DEPS cl_context) else() -lite_cc_library(context SRCS context.cc DEPS tensor any device_info eigen3 CL_DEPS cl_context) +lite_cc_library(context SRCS context.cc DEPS tensor any device_info eigen3 CL_DEPS cl_context CUDA_DEPS cuda_context) endif() #-------------------------------------------- GET CODE META INFO ------------------------------------------ diff --git a/lite/core/context.h b/lite/core/context.h index 061638d63f..fa38b7c81b 100644 --- a/lite/core/context.h +++ b/lite/core/context.h @@ -16,8 +16,7 @@ #include "lite/utils/any.h" #ifdef LITE_WITH_CUDA -#include "lite/backends/cuda/blas.h" -#include "lite/backends/cuda/cuda_utils.h" +#include "lite/backends/cuda/context.h" #endif #ifdef LITE_WITH_OPENCL #include @@ -53,7 +52,6 @@ class Context; using HostContext = Context; using X86Context = Context; -using CUDAContext = Context; using ARMContext = Context; using NPUContext = Context; using XPUContext = Context; @@ -286,103 +284,6 @@ class Context { }; #endif // LITE_WITH_MLU -#ifdef LITE_WITH_CUDA -// Only works with CUDA kernels. -template <> -class Context { - public: - typename Env::Devs& devs = - Env::Global(); - // NOTE: InitOnce should only be used by ContextScheduler - void InitOnce() { - if (devs.size() > 0) { - cublas_fp32_ = std::make_shared>(); - } else { - LOG(INFO) << "No cuda device(s) found, CUDAContext init failed."; - } - } - void Init(int dev_id, int exec_stream_id = 0, int io_stream_id = 0) { - CHECK_GT(devs.size(), 0UL) - << "Env is not initialized or current target is not exit!"; - if (dev_id >= static_cast(devs.size())) { - LOG(WARNING) << "device index exceeds the number of devices, set to " - "default device(0)!"; - device_id_ = 0; - } else { - device_id_ = dev_id; - } - if (io_stream_id >= devs[dev_id].max_stream()) { - LOG(WARNING) << "data stream index exceeds the maximum stream number, " - "set to default stream(0)!"; - io_stream_id = 0; - } - if (exec_stream_id >= devs[dev_id].max_stream()) { - LOG(WARNING) << "exec stream index exceeds the maximum stream number, " - "set to default stream(0)!"; - exec_stream_id = 0; - } - - exec_stream_ = devs[dev_id].exec_streams()[exec_stream_id]; - io_stream_ = devs[dev_id].io_streams()[io_stream_id]; - - exec_stream_id_ = exec_stream_id; - io_stream_id_ = io_stream_id; - } - void CopySharedTo(CUDAContext* ctx) { - CHECK(ctx); - CHECK(cublas_fp32_) << "cublas_fp32 should be set first"; - ctx->cublas_fp32_ = cublas_fp32_; - } - - const cudaStream_t& exec_stream() const { return exec_stream_; } - void SetExecStream(cudaStream_t stream) { exec_stream_ = stream; } - - const cudaStream_t& io_stream() const { return io_stream_; } - void SetIoStream(cudaStream_t stream) { io_stream_ = stream; } - - std::shared_ptr> cublas_fp32() { return cublas_fp32_; } - void SetCuBlasFP32(std::shared_ptr> cublas_fp32) { - cublas_fp32_ = cublas_fp32; - } - - const std::vector& input_events() { return input_events_; } - void SetInputEvents(const std::vector& input_events) { - input_events_.clear(); - input_events_.assign(input_events.begin(), input_events.end()); - } - - const std::vector& output_events() { return output_events_; } - void SetOutputEvents(const std::vector& output_events) { - output_events_.clear(); - output_events_.assign(output_events.begin(), output_events.end()); - } - - std::string name() const { return "CUDAContext"; } - - CUDAContext& operator=(const CUDAContext& context) { - this->Init( - context.device_id_, context.exec_stream_id_, context.io_stream_id_); - cublas_fp32_ = const_cast(context).cublas_fp32(); - return *this; - } - - private: - int device_id_; - // overall information - int exec_stream_id_; - int io_stream_id_; - cudaStream_t exec_stream_; - cudaStream_t io_stream_; - - // not thread-safe, should allocate for each thread. - std::shared_ptr> cublas_fp32_; - - // kernel information - std::vector input_events_; - std::vector output_events_; -}; -#endif - #ifdef LITE_WITH_X86 template <> class Context { @@ -455,7 +356,9 @@ class ContextScheduler { return *x; } - std::unique_ptr NewContext(TargetType target) { + std::unique_ptr NewContext( + TargetType target, + /*only used for cuda context*/ int exec_stream_id = 0) { std::unique_ptr ctx(new KernelContext); switch (target) { case TARGET(kHost): @@ -472,7 +375,7 @@ class ContextScheduler { case TARGET(kCUDA): { int dev_id = TargetWrapper::GetCurDevice(); auto& context = ctx->As(); - context.Init(dev_id); + context.Init(dev_id, exec_stream_id); kernel_contexts_[TargetType::kCUDA].As().CopySharedTo( &context); } break; diff --git a/lite/core/device_info.h b/lite/core/device_info.h index a108ae3d4b..b06eb8d944 100644 --- a/lite/core/device_info.h +++ b/lite/core/device_info.h @@ -159,7 +159,7 @@ class Env { static Devs* devs = new Devs(); return *devs; } - static void Init(int max_stream = 4) { + static void Init(int max_stream = 6) { #ifdef LITE_WITH_MLU CNRT_CALL(cnrtInit(0)); #endif @@ -175,6 +175,7 @@ class Env { } else { LOG(INFO) << "Found " << count << " device(s)"; } + CHECK_GT(max_stream, 0) << "max_stream must be greater than 0."; // create all device for (int i = 0; i < count; i++) { auto dev = Device(i, max_stream); @@ -234,8 +235,8 @@ class Device { std::string name() { return device_prop_.name; } int core_num() { return device_prop_.multiProcessorCount; } float max_memory() { return device_prop_.totalGlobalMem / 1048576.; } - std::vector exec_streams() { return exec_stream_; } - std::vector io_streams() { return io_stream_; } + const std::vector& exec_streams() { return exec_stream_; } + const std::vector& io_streams() { return io_stream_; } int sm_version() { return sm_version_; } bool has_fp16() { return has_fp16_; } diff --git a/lite/core/mir/CMakeLists.txt b/lite/core/mir/CMakeLists.txt index 91accc907e..d036bf7988 100644 --- a/lite/core/mir/CMakeLists.txt +++ b/lite/core/mir/CMakeLists.txt @@ -37,6 +37,7 @@ lite_cc_library(mir_passes demo_pass.cc runtime_context_assign_pass.cc memory_optimize_pass.cc + multi_stream_analysis_pass.cc mlu_postprocess_pass.cc weight_quantization_preprocess_pass.cc quantized_op_attributes_inference_pass.cc diff --git a/lite/core/mir/generate_program_pass.cc b/lite/core/mir/generate_program_pass.cc index 76c97d2da6..d7486c0933 100644 --- a/lite/core/mir/generate_program_pass.cc +++ b/lite/core/mir/generate_program_pass.cc @@ -14,6 +14,7 @@ #include "lite/core/mir/generate_program_pass.h" #include +#include #include #include #include "lite/core/mir/graph_visualize_pass.h" @@ -25,10 +26,37 @@ namespace mir { void GenerateProgramPass::Apply(const std::unique_ptr& graph) { VLOG(4) << "final program \n" << Visualize(graph.get()); - for (auto& item : graph->StmtTopologicalOrder()) { + std::vector nodes_in_order; +#ifdef LITE_WITH_CUDA + const std::string depend_pass = "multi_stream_analysis_pass"; + const std::string attr_name = "nodes_in_order"; + mir::Pass* pass = mir::PassManager::Global().LookUp(depend_pass); + if (pass->HasAttr(attr_name)) { + nodes_in_order = pass->GetAttr>(attr_name); + } +#endif + if (nodes_in_order.empty()) { + nodes_in_order = graph->StmtTopologicalOrder(); + } + + for (auto& item : nodes_in_order) { if (item->IsStmt()) { auto& stmt = item->AsStmt(); VLOG(4) << stmt; +#ifdef LITE_WITH_CUDA + if (stmt.kernels().front()->target() == TargetType::kCUDA) { + stmt.kernels() + .front() + ->mutable_context() + ->As() + .SetNeedSync(stmt.need_sync_); + stmt.kernels() + .front() + ->mutable_context() + ->As() + .SetSyncStreams(stmt.sync_streams_); + } +#endif insts_.emplace_back(stmt.op(), std::move(stmt.kernels().front())); } } diff --git a/lite/core/mir/graph_visualize_pass.cc b/lite/core/mir/graph_visualize_pass.cc index a32c9c05f6..55b7a00456 100644 --- a/lite/core/mir/graph_visualize_pass.cc +++ b/lite/core/mir/graph_visualize_pass.cc @@ -85,7 +85,23 @@ std::string Visualize(mir::SSAGraph* graph) { if (!node->IsStmt()) continue; auto op_info = node->AsStmt().op_info(); auto op_type = op_info->Type(); - std::string op_name = string_format("%s%d", op_type.c_str(), op_idx++); + std::string op_name; + if (node->AsStmt().need_sync_) { + std::ostringstream oss; + for (size_t i = 0; i < node->AsStmt().sync_streams_.size(); ++i) { + oss << std::to_string(node->AsStmt().sync_streams_[i]); + if (i != node->AsStmt().sync_streams_.size() - 1) { + oss << ","; + } + } + op_name = string_format("%s%d, stream=%d, sync_streams={%s}", + op_type.c_str(), + op_idx++, + node->AsStmt().stream_id_, + oss.str().c_str()); + } else { + op_name = string_format("%s%d", op_type.c_str(), op_idx++); + } // Add its input&output variables as the Dot nodes dot.AddNode(op_name, {Dot::Attr("shape", "box"), @@ -93,7 +109,13 @@ std::string Visualize(mir::SSAGraph* graph) { Dot::Attr("color", "black"), Dot::Attr("fillcolor", "yellow")}); for (auto& x : node->inlinks) { - auto var_name = x->AsArg().name; + std::string var_name; + if (x->AsArg().lane != -1) { + var_name = string_format( + "%s, lane=%d", x->AsArg().name.c_str(), x->AsArg().lane); + } else { + var_name = x->AsArg().name; + } if (!exists_var_names.count(var_name)) { dot.AddNode(var_name, {}); exists_var_names.insert(var_name); @@ -101,7 +123,13 @@ std::string Visualize(mir::SSAGraph* graph) { dot.AddEdge(var_name, op_name, {}); } for (auto& x : node->outlinks) { - auto var_name = x->AsArg().name; + std::string var_name; + if (x->AsArg().lane != -1) { + var_name = string_format( + "%s, lane=%d", x->AsArg().name.c_str(), x->AsArg().lane); + } else { + var_name = x->AsArg().name; + } if (!exists_var_names.count(var_name)) { dot.AddNode(var_name, {}); exists_var_names.insert(var_name); diff --git a/lite/core/mir/multi_stream_analysis_pass.cc b/lite/core/mir/multi_stream_analysis_pass.cc new file mode 100644 index 0000000000..46454a1fc3 --- /dev/null +++ b/lite/core/mir/multi_stream_analysis_pass.cc @@ -0,0 +1,313 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "lite/core/mir/multi_stream_analysis_pass.h" + +#include +#include +#include +#include + +#include "lite/core/device_info.h" +#include "lite/core/mir/graph_visualize_pass.h" +#include "lite/core/mir/pass_registry.h" +#include "lite/core/type_system.h" + +namespace paddle { +namespace lite { +namespace mir { + +void MultiStreamAnalysisPass::CleanUp() { + exec_ops_.clear(); + wait_que_.clear(); + wait_que_cpu_.clear(); + std::queue empty_queue; + while (!exec_que_.empty()) { + exec_que_.pop(); + } + ops_in_streams_.clear(); + resources_.clear(); + map_arg_to_lane_.clear(); + op_types_set_.clear(); + io_copy_once_num_ = 0; +} + +void MultiStreamAnalysisPass::Init(SSAGraph* graph) { + // If not cleaned, the clone will overlay the previous state + CleanUp(); + for (auto& op_node : graph->StmtTopologicalOrder()) { + if (op_node->IsStmt()) { + // Set all outputs of op to inaccessible state. + auto outputs = op_node->outlinks; + for (Node* node : outputs) { + CHECK(node->IsArg()); + auto& arg = node->AsArg(); + if (!resources_.count(arg.name)) { + resources_[arg.name] = false; + } + } + // Set the weight input of op to be accessible. + auto inputs = op_node->inlinks; + for (Node* node : inputs) { + CHECK(node->IsArg()); + auto& arg = node->AsArg(); + if (arg.is_weight || arg.is_persist) { + resources_[arg.name] = true; + } + } + + // feed and io_copy_once op has no dependencies and can be launched + // directly. Other ops are put into the waiting queue. + if (op_node->AsStmt().op_type() == "feed" || + op_node->AsStmt().op_type() == "io_copy_once") { + exec_que_.push(op_node); + } else { + auto tgt = op_node->AsStmt().kernels().front()->target(); + if (tgt == TargetType::kCUDA) { + wait_que_.push_back(op_node); + } else { + wait_que_cpu_.push_back(op_node); + } + } + op_types_set_.insert(op_node->AsStmt().op_type()); + } + } + + // Set the stream id according to the number of feed ops, and set the output + // of the feed op to be accessible. + int lane = 0; + auto nodes = graph->inputs(); + ops_in_streams_.resize(max_stream_); + + for (auto& node : nodes) { + std::string::size_type idx = node->AsArg().name.find("feed"); + if (idx != std::string::npos) { + for (auto& feed_ops : node->outlinks) { + if (feed_ops->AsStmt().op_type() == "feed") { + // feed op doesn't need to wait sync. + feed_ops->AsStmt().need_sync_ = false; + CHECK_EQ(static_cast(feed_ops->outlinks.size()), 1) + << "feed op must have one output."; + for (auto& var : feed_ops->outlinks) { + var->AsArg().lane = lane; + map_arg_to_lane_[var->AsArg().name] = lane; + resources_[var->AsArg().name] = true; + } + feed_ops->AsStmt().stream_id_ = lane; + ops_in_streams_[lane].push_back(feed_ops); + ++lane; + if (lane >= max_stream_) { + lane = 0; + } + } + } + } + // set all io_copy_once op in the first stream + for (auto& io_copy_once_ops : node->outlinks) { + if (io_copy_once_ops->AsStmt().op_type() == "io_copy_once") { + ops_in_streams_[0].push_back(io_copy_once_ops); + io_copy_once_ops->AsStmt().stream_id_ = 0; + io_copy_once_ops->AsStmt().need_sync_ = false; + ++io_copy_once_num_; + } + } + } +} + +bool MultiStreamAnalysisPass::CheckOpSupport() { + std::unordered_set invalid_op = { + "while", "conditional_block", "conditional_block_infer", "graph_op"}; + for (auto& op_type : op_types_set_) { + if (invalid_op.count(op_type)) { + LOG(INFO) << "multi_stream_analysis_pass don't support " << op_type + << ", just return."; + return false; + } + } + return true; +} + +bool MultiStreamAnalysisPass::IsPrepared(Node* stmt_node) { + // feed op are prepared when init. + std::string op_name = stmt_node->AsStmt().op_type(); + if (op_name == "feed") { + return true; + } + + // Check is op's input are all accessible. + std::vector args; + for (auto* ins : stmt_node->inlinks) { + args.push_back(ins->AsArg().name); + } + return CheckAccess(args); +} + +bool MultiStreamAnalysisPass::CheckAccess( + const std::vector& args) { + if (args.size() == 0) { + return true; + } + for (auto& name : args) { + if (resources_[name]) { + continue; + } else { + return false; + } + } + return true; +} + +int MultiStreamAnalysisPass::SelectStreamId(const std::vector& lanes) { + if (lanes.size() == 0) { + return 0; + } + + int res = lanes[0]; + int exclude_io_copy_once_num = ops_in_streams_[0].size() - io_copy_once_num_; + int min_num = lanes[0] == 0 ? exclude_io_copy_once_num + : ops_in_streams_[lanes[0]].size(); + for (size_t i = 1; i < lanes.size(); ++i) { + int ith_num = lanes[i] == 0 ? exclude_io_copy_once_num + : ops_in_streams_[lanes[i]].size(); + if (ith_num < min_num) { + res = lanes[i]; + min_num = ith_num; + } + } + + return res; +} + +void MultiStreamAnalysisPass::Launch(Node* stmt_node) { + // record ops launch order. + exec_que_.push(stmt_node); + std::vector lanes; + for (auto& in_arg : stmt_node->inlinks) { + // Weight parameter does not involve stream id, so just skip it. + if (in_arg->AsArg().is_weight || in_arg->AsArg().is_persist) { + continue; + } + + if (std::find(lanes.begin(), lanes.end(), in_arg->AsArg().lane) == + lanes.end()) { + lanes.push_back(in_arg->AsArg().lane); + } + } + + int stream_id = SelectStreamId(lanes); + + // If all inputs of the op are on multiple streams, they need to be + // synchronized + if (lanes.size() > 1) { + for (size_t i = 0; i < lanes.size(); ++i) { + if (lanes[i] != stream_id) { + stmt_node->AsStmt().sync_streams_.push_back(lanes[i]); + } + } + stmt_node->AsStmt().need_sync_ = true; + } + // io_copy are nodes inserted across devices and need to be synced. + if (stmt_node->AsStmt().op_type() == "io_copy") { + stmt_node->AsStmt().need_sync_ = true; + } + stmt_node->AsStmt().stream_id_ = stream_id; + + // set output lane and set the output of op to be accessible. + for (auto& out_arg : stmt_node->outlinks) { + out_arg->AsArg().lane = stream_id; + resources_[out_arg->AsArg().name] = true; + } + ops_in_streams_[stream_id].push_back(stmt_node); +} + +void MultiStreamAnalysisPass::Apply(const std::unique_ptr& graph) { +#ifdef LITE_WITH_CUDA + typename Env::Devs& devs = + Env::Global(); + int dev_id = TargetWrapper::GetCurDevice(); + max_stream_ = devs[dev_id].max_stream(); +#else + LOG(FATAL) << "Please re-compile by setting the cmake flag LITE_WITH_CUDA=ON"; +#endif + + // Find the correct startup sequence for op. + Init(graph.get()); + bool is_valid = CheckOpSupport(); + if (!is_valid) { + return; + } + size_t prev_size; + + while (!(this->wait_que_.empty() && this->wait_que_cpu_.empty())) { + prev_size = this->wait_que_.size() + this->wait_que_cpu_.size(); + // launch the acessible cuda kernel and remove it from wait que. + for (auto it = this->wait_que_.begin(); it != this->wait_que_.end();) { + if (IsPrepared(*it)) { + Launch(*it); + it = wait_que_.erase(it); + } else { + ++it; + } + } + // launch the accessible cpu kernel and remove it from wait que. + for (auto cpu_it = this->wait_que_cpu_.begin(); + cpu_it != this->wait_que_cpu_.end();) { + if (IsPrepared(*cpu_it)) { + Launch(*cpu_it); + cpu_it = wait_que_cpu_.erase(cpu_it); + } else { + ++cpu_it; + } + } + + if (this->wait_que_.size() + this->wait_que_cpu_.size() == prev_size) { + LOG(FATAL) << "network topo error!"; + } + } + + // Get exec ops order. + while (!exec_que_.empty()) { + auto* node = exec_que_.front(); + exec_ops_.push_back(node); + VLOG(4) << node->AsStmt().op_type() + << " stream: " << node->AsStmt().stream_id_ + << ", sync: " << node->AsStmt().need_sync_; + if (node->AsStmt().need_sync_) { + for (size_t i = 0; i < node->AsStmt().sync_streams_.size(); ++i) { + VLOG(4) << " " << node->AsStmt().sync_streams_[i]; + } + } + exec_que_.pop(); + } + + // Set attribute parameters, for passing parameters between passes + const std::string attr_name{"nodes_in_order"}; + SetAttr>(attr_name, &exec_ops_); + + LOG(INFO) << "stream " << 0 << " has " + << ops_in_streams_[0].size() - io_copy_once_num_ + << " ops. (exclude io_copy_once)."; + for (size_t i = 1; i < ops_in_streams_.size(); ++i) { + LOG(INFO) << "stream " << i << " has " << ops_in_streams_[i].size() + << " ops."; + } +} + +} // namespace mir +} // namespace lite +} // namespace paddle + +REGISTER_MIR_PASS(multi_stream_analysis_pass, + paddle::lite::mir::MultiStreamAnalysisPass) + .BindTargets({TARGET(kCUDA)}); diff --git a/lite/core/mir/multi_stream_analysis_pass.h b/lite/core/mir/multi_stream_analysis_pass.h new file mode 100644 index 0000000000..37a7feca3a --- /dev/null +++ b/lite/core/mir/multi_stream_analysis_pass.h @@ -0,0 +1,85 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "lite/core/kernel.h" +#include "lite/core/mir/pass.h" + +namespace paddle { +namespace lite { +namespace mir { + +/* + * MultiStreamAnalysisPass will find the correct launch sequence for all ops. + * Ideally, the order should be multiple asynchronous ops and a small number of + * synchronous ops. + */ +class MultiStreamAnalysisPass : public StmtPass { + public: + void Apply(const std::unique_ptr& graph) override; + + private: + // Init resource list. Set all ops except feed to inaccessible state and set + // stream id according to the numer of inputs. + void Init(SSAGraph* graph); + + // Clean state information of all member variables. + void CleanUp(); + + // After launching, unlock the output resources of op. + void Launch(Node* stmt_node); + + // If all inputs of an op are accessible, the op is considered to be in the + // prepared state + bool IsPrepared(Node* stmt_node); + + // Determine if all inputs of op are accessible. + bool CheckAccess(const std::vector& args); + + // The logic of selecting a stream: + // 1. Make the number of ops on each stream as close as possible. + // 2. The selected stream must be one of the streams contained in the input + // arg + int SelectStreamId(const std::vector& lanes); + + // Check if the model's ops are all supported. If you encounter unsupported + // ops, exit + bool CheckOpSupport(); + + private: + std::list wait_que_; + std::list wait_que_cpu_; + std::queue exec_que_; + std::vector exec_ops_; + std::vector> ops_in_streams_; + std::unordered_map resources_; + std::unordered_map map_arg_to_lane_; + int max_stream_; + int io_copy_once_num_; + std::unordered_set op_types_set_; +}; + +} // namespace mir +} // namespace lite +} // namespace paddle diff --git a/lite/core/mir/node.h b/lite/core/mir/node.h index 45b15812fa..ae7b112d91 100644 --- a/lite/core/mir/node.h +++ b/lite/core/mir/node.h @@ -80,6 +80,12 @@ class Node { // Description. std::string desc; + + // for cuda multi stream + bool need_sync_{false}; + int stream_id_{0}; + // streams which need to be sync. exclude stream_id_ + std::vector sync_streams_{}; }; struct Arg { @@ -93,6 +99,7 @@ class Node { // if the need more than one tool operator(eg. io_copy layout calib), the // argument between them should be persist to make sure it's only run once bool is_persist{false}; + int lane{-1}; }; Arg& AsArg(const std::string& name, int id); diff --git a/lite/core/mir/pass.h b/lite/core/mir/pass.h index 4e8c8be292..64f2db82c0 100644 --- a/lite/core/mir/pass.h +++ b/lite/core/mir/pass.h @@ -17,9 +17,11 @@ #include #include #include +#include #include "lite/core/mir/node.h" #include "lite/core/mir/ssa_graph.h" +#include "lite/utils/varient.h" namespace paddle { namespace lite { @@ -121,6 +123,27 @@ class Pass { virtual ~Pass() = default; + bool HasAttr(const std::string& attr_name) const { + return pass_attrs_.count(attr_name) > 0; + } + + // Set a pointer to the attribute. Specific pass itself takes ownership of the + // attribute. + template + void SetAttr(const std::string& attr_name, const AttrType* attr) { + VLOG(4) << "Setting the attribute " << attr_name << " for the pass " + << name_; + pass_attrs_[attr_name].set(*attr); + } + + // Get a reference to the attribute previously set. + template + const AttrType& GetAttr(const std::string& attr_name) const { + CHECK(pass_attrs_.count(attr_name)) + << attr_name << " attr not register for pass " << name_; + return pass_attrs_.at(attr_name).get(); + } + private: const Kind kind_; std::string name_; @@ -128,6 +151,8 @@ class Pass { std::set bound_targets_; std::set excluded_targets_; std::unordered_map> bound_kernels_; + std::unordered_map>> + pass_attrs_; }; // Different kinds. diff --git a/lite/core/mir/runtime_context_assign_pass.cc b/lite/core/mir/runtime_context_assign_pass.cc index 3cbe602f31..5b6f968484 100644 --- a/lite/core/mir/runtime_context_assign_pass.cc +++ b/lite/core/mir/runtime_context_assign_pass.cc @@ -45,9 +45,10 @@ class RuntimeContextAssignPass : public StmtPass { inst.picked_kernel().target())); } #else - inst.picked_kernel().SetContext( - ContextScheduler::Global().NewContext(inst.picked_kernel().target())); + int stream_id = inst.stream_id_; + inst.picked_kernel().SetContext(ContextScheduler::Global().NewContext( + inst.picked_kernel().target(), stream_id)); #endif } } diff --git a/lite/core/optimizer.h b/lite/core/optimizer.h index 80c2bd553f..5004be79af 100644 --- a/lite/core/optimizer.h +++ b/lite/core/optimizer.h @@ -127,7 +127,21 @@ class Optimizer { "memory_optimize_pass"}}; if (passes.size() == 1) { - passes_local.push_back(passes[0]); + // multi_stream_analysis_pass must be in the front of + // runtime_context_assign_pass + const std::string msa_pass{"multi_stream_analysis_pass"}; + const std::string depend_pass{"runtime_context_assign_pass"}; + if (passes[0] == msa_pass) { + auto iter = + std::find(passes_local.begin(), passes_local.end(), depend_pass); + if (iter != passes_local.end()) { + passes_local.insert(iter, msa_pass); + } else { + CHECK(false) << "Not find " << depend_pass; + } + } else { + passes_local.push_back(passes[0]); + } } RunPasses(passes_local); } else { diff --git a/lite/core/profile/precision_profiler.h b/lite/core/profile/precision_profiler.h index e72d1f54ee..ee581bf5e1 100644 --- a/lite/core/profile/precision_profiler.h +++ b/lite/core/profile/precision_profiler.h @@ -178,6 +178,13 @@ class PrecisionProfiler { write_result_to_file&& write_tensorfile(in, name); return; } + case PRECISION(kInt64): { + auto ptr = in->data(); + *mean = compute_mean(ptr, in->numel()); + *std_dev = compute_standard_deviation( + ptr, in->numel(), true, *mean); + return; + } default: *mean = -333333333333; *std_dev = -33333333333; diff --git a/lite/core/program.cc b/lite/core/program.cc index ff900c0e23..1193e3c84f 100644 --- a/lite/core/program.cc +++ b/lite/core/program.cc @@ -145,6 +145,11 @@ void RuntimeProgram::Run() { for (auto& inst : instructions_) { #ifndef LITE_WITH_FPGA if (inst.is_feed_fetch_op()) continue; +#endif +#ifdef LITE_WITH_CUDA + if (inst.need_sync()) { + inst.Sync(); + } #endif inst.Run(); #ifdef LITE_WITH_PRECISION_PROFILE diff --git a/lite/core/program.h b/lite/core/program.h index c845a17c52..9d5fef7c03 100644 --- a/lite/core/program.h +++ b/lite/core/program.h @@ -108,6 +108,18 @@ struct Instruction { bool is_feed_fetch_op() const { return is_feed_fetch_op_; } +#ifdef LITE_WITH_CUDA + bool need_sync() const { + if (kernel_->target() == TargetType::kCUDA) { + return kernel_->mutable_context()->As().need_sync(); + } else { + // the io_copy kernel has synced, so cpu kernels don't need sync.. + return false; + } + } + void Sync() const { kernel_->mutable_context()->As().Sync(); } +#endif + #ifdef LITE_WITH_PROFILE void set_profiler(profile::Profiler* profiler) { profiler_ = profiler; -- GitLab