提交 5382994e 编写于 作者: W Wilber 提交者: GitHub

lite cuda support exec multi-stream. (#2949)

lite cuda support exec multi-stream
上级 77f5911c
......@@ -32,12 +32,17 @@ namespace lite {
void CxxPaddleApiImpl::Init(const lite_api::CxxConfig &config) {
config_ = config;
auto places = config.valid_places();
std::vector<std::string> passes{};
#ifdef LITE_WITH_CUDA
// if kCUDA is included in valid places, it should be initialized first,
// otherwise skip this step.
for (auto &p : places) {
if (p.target == TARGET(kCUDA)) {
Env<TARGET(kCUDA)>::Init();
if (config_.multi_stream()) {
passes = {"multi_stream_analysis_pass"};
VLOG(3) << "add pass: " << passes[0];
}
break;
}
}
......@@ -51,7 +56,6 @@ void CxxPaddleApiImpl::Init(const lite_api::CxxConfig &config) {
config.mlu_first_conv_std(),
config.mlu_input_layout());
#endif // LITE_WITH_MLU
std::vector<std::string> passes{};
auto use_layout_preprocess_pass =
config.model_dir().find("OPENCL_PRE_PRECESS");
VLOG(1) << "use_layout_preprocess_pass:" << use_layout_preprocess_pass;
......
......@@ -136,6 +136,9 @@ class LITE_API CxxConfig : public ConfigBase {
#ifdef LITE_WITH_X86
int x86_math_library_math_threads_ = 1;
#endif
#ifdef LITE_WITH_CUDA
bool multi_stream_{false};
#endif
#ifdef LITE_WITH_MLU
lite_api::MLUCoreVersion mlu_core_version_{lite_api::MLUCoreVersion::MLU_270};
int mlu_core_number_{1};
......@@ -171,6 +174,10 @@ class LITE_API CxxConfig : public ConfigBase {
return x86_math_library_math_threads_;
}
#endif
#ifdef LITE_WITH_CUDA
void set_multi_stream(bool multi_stream) { multi_stream_ = multi_stream; }
int multi_stream() const { return multi_stream_; }
#endif
#ifdef LITE_WITH_MLU
// set MLU core version, which is used when compiling MLU kernels
......
......@@ -42,6 +42,7 @@ USE_MIR_PASS(type_precision_cast_pass);
USE_MIR_PASS(type_layout_cast_pass);
USE_MIR_PASS(type_layout_cast_preprocess_pass);
USE_MIR_PASS(memory_optimize_pass);
USE_MIR_PASS(multi_stream_analysis_pass);
USE_MIR_PASS(elementwise_mul_constant_eliminate_pass)
USE_MIR_PASS(npu_subgraph_pass);
USE_MIR_PASS(xpu_subgraph_pass);
......
......@@ -5,5 +5,7 @@ get_property(cuda_deps GLOBAL PROPERTY CUDA_MODULES)
nv_library(target_wrapper_cuda SRCS target_wrapper.cc DEPS ${cuda_deps})
nv_library(cuda_blas SRCS blas.cc DEPS ${cuda_deps})
lite_cc_library(cuda_context SRCS context.cc DEPS device_info)
add_subdirectory(math)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "lite/backends/cuda/context.h"
namespace paddle {
namespace lite {} // namespace lite
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "lite/backends/cuda/blas.h"
#include "lite/backends/cuda/cuda_utils.h"
#include "lite/backends/cuda/target_wrapper.h"
#include "lite/core/device_info.h"
namespace paddle {
namespace lite {
template <TargetType Type>
class Context;
using CUDAContext = Context<TargetType::kCUDA>;
// Only works with CUDA kernels.
template <>
class Context<TargetType::kCUDA> {
public:
typename Env<TargetType::kCUDA>::Devs& devs =
Env<TargetType::kCUDA>::Global();
// NOTE: InitOnce should only be used by ContextScheduler
void InitOnce() {
if (devs.size() > 0) {
cublas_fp32_ = std::make_shared<lite::cuda::Blas<float>>();
} else {
LOG(INFO) << "No cuda device(s) found, CUDAContext init failed.";
}
}
void Init(int dev_id, int exec_stream_id = 0, int io_stream_id = 0) {
CHECK_GT(devs.size(), 0UL)
<< "Env is not initialized or current target is not exit!";
if (dev_id >= static_cast<int>(devs.size())) {
LOG(WARNING) << "device index exceeds the number of devices, set to "
"default device(0)!";
device_id_ = 0;
} else {
device_id_ = dev_id;
}
if (io_stream_id >= devs[dev_id].max_stream()) {
LOG(WARNING) << "data stream index exceeds the maximum stream number, "
"set to default stream(0)!";
io_stream_id = 0;
}
if (exec_stream_id >= devs[dev_id].max_stream()) {
LOG(WARNING) << "exec stream index exceeds the maximum stream number, "
"set to default stream(0)!";
exec_stream_id = 0;
}
exec_stream_ = devs[dev_id].exec_streams()[exec_stream_id];
io_stream_ = devs[dev_id].io_streams()[io_stream_id];
exec_stream_id_ = exec_stream_id;
io_stream_id_ = io_stream_id;
need_sync_ = false;
}
void CopySharedTo(CUDAContext* ctx) {
CHECK(ctx);
CHECK(cublas_fp32_) << "cublas_fp32 should be set first";
ctx->cublas_fp32_ = cublas_fp32_;
}
const cudaStream_t& exec_stream() const { return exec_stream_; }
void SetExecStream(cudaStream_t stream) { exec_stream_ = stream; }
const cudaStream_t& io_stream() const { return io_stream_; }
void SetIoStream(cudaStream_t stream) { io_stream_ = stream; }
std::shared_ptr<cuda::Blas<float>> cublas_fp32() { return cublas_fp32_; }
void SetCuBlasFP32(std::shared_ptr<cuda::Blas<float>> cublas_fp32) {
cublas_fp32_ = cublas_fp32;
}
const std::vector<cudaEvent_t>& input_events() { return input_events_; }
void SetInputEvents(const std::vector<cudaEvent_t>& input_events) {
input_events_.clear();
input_events_.assign(input_events.begin(), input_events.end());
}
const std::vector<cudaEvent_t>& output_events() { return output_events_; }
void SetOutputEvents(const std::vector<cudaEvent_t>& output_events) {
output_events_.clear();
output_events_.assign(output_events.begin(), output_events.end());
}
std::vector<cudaStream_t> all_exec_streams() {
int dev_id = TargetWrapper<TargetType::kCUDA>::GetCurDevice();
return devs[dev_id].exec_streams();
}
void SetSyncStreams(const std::vector<int>& nums) {
sync_streams_.clear();
std::vector<cudaStream_t> exec_streams = all_exec_streams();
for (size_t i = 0; i < nums.size(); ++i) {
CHECK(nums[i] >= 0 && nums[i] < static_cast<int>(exec_streams.size()))
<< "streams id is not valid";
sync_streams_.push_back(exec_streams[nums[i]]);
}
InitSyncEvents(nums.size());
}
void InitSyncEvents(const int num) {
sync_events_.clear();
for (int i = 0; i < num; ++i) {
cudaEvent_t eve;
TargetWrapperCuda::CreateEventWithFlags(&eve);
sync_events_.push_back(eve);
}
}
void SetNeedSync(bool sync) { need_sync_ = sync; }
bool need_sync() const { return need_sync_; }
void Sync() {
CHECK_EQ(sync_streams_.size(), sync_events_.size());
for (size_t i = 0; i < sync_events_.size(); ++i) {
TargetWrapperCuda::RecordEvent(sync_events_[i], sync_streams_[i]);
TargetWrapperCuda::StreamSync(exec_stream_, sync_events_[i]);
}
}
std::string name() const { return "CUDAContext"; }
CUDAContext& operator=(const CUDAContext& context) {
this->Init(
context.device_id_, context.exec_stream_id_, context.io_stream_id_);
cublas_fp32_ = const_cast<CUDAContext&>(context).cublas_fp32();
return *this;
}
private:
int device_id_;
// overall information
int exec_stream_id_;
int io_stream_id_;
cudaStream_t exec_stream_;
cudaStream_t io_stream_;
// not thread-safe, should allocate for each thread.
std::shared_ptr<cuda::Blas<float>> cublas_fp32_;
// kernel information
std::vector<cudaEvent_t> input_events_;
std::vector<cudaEvent_t> output_events_;
// multi stream sync.
std::vector<cudaStream_t> sync_streams_;
std::vector<cudaEvent_t> sync_events_;
bool need_sync_;
};
} // namespace lite
} // namespace paddle
......@@ -38,7 +38,7 @@ lite_cc_library(device_info SRCS device_info.cc DEPS tensor)
if (LITE_WITH_ARM)
lite_cc_library(context SRCS context.cc DEPS tensor any device_info CL_DEPS cl_context)
else()
lite_cc_library(context SRCS context.cc DEPS tensor any device_info eigen3 CL_DEPS cl_context)
lite_cc_library(context SRCS context.cc DEPS tensor any device_info eigen3 CL_DEPS cl_context CUDA_DEPS cuda_context)
endif()
#-------------------------------------------- GET CODE META INFO ------------------------------------------
......
......@@ -16,8 +16,7 @@
#include "lite/utils/any.h"
#ifdef LITE_WITH_CUDA
#include "lite/backends/cuda/blas.h"
#include "lite/backends/cuda/cuda_utils.h"
#include "lite/backends/cuda/context.h"
#endif
#ifdef LITE_WITH_OPENCL
#include <unordered_map>
......@@ -53,7 +52,6 @@ class Context;
using HostContext = Context<TargetType::kHost>;
using X86Context = Context<TargetType::kX86>;
using CUDAContext = Context<TargetType::kCUDA>;
using ARMContext = Context<TargetType::kARM>;
using NPUContext = Context<TargetType::kNPU>;
using XPUContext = Context<TargetType::kXPU>;
......@@ -286,103 +284,6 @@ class Context<TargetType::kMLU> {
};
#endif // LITE_WITH_MLU
#ifdef LITE_WITH_CUDA
// Only works with CUDA kernels.
template <>
class Context<TargetType::kCUDA> {
public:
typename Env<TargetType::kCUDA>::Devs& devs =
Env<TargetType::kCUDA>::Global();
// NOTE: InitOnce should only be used by ContextScheduler
void InitOnce() {
if (devs.size() > 0) {
cublas_fp32_ = std::make_shared<lite::cuda::Blas<float>>();
} else {
LOG(INFO) << "No cuda device(s) found, CUDAContext init failed.";
}
}
void Init(int dev_id, int exec_stream_id = 0, int io_stream_id = 0) {
CHECK_GT(devs.size(), 0UL)
<< "Env is not initialized or current target is not exit!";
if (dev_id >= static_cast<int>(devs.size())) {
LOG(WARNING) << "device index exceeds the number of devices, set to "
"default device(0)!";
device_id_ = 0;
} else {
device_id_ = dev_id;
}
if (io_stream_id >= devs[dev_id].max_stream()) {
LOG(WARNING) << "data stream index exceeds the maximum stream number, "
"set to default stream(0)!";
io_stream_id = 0;
}
if (exec_stream_id >= devs[dev_id].max_stream()) {
LOG(WARNING) << "exec stream index exceeds the maximum stream number, "
"set to default stream(0)!";
exec_stream_id = 0;
}
exec_stream_ = devs[dev_id].exec_streams()[exec_stream_id];
io_stream_ = devs[dev_id].io_streams()[io_stream_id];
exec_stream_id_ = exec_stream_id;
io_stream_id_ = io_stream_id;
}
void CopySharedTo(CUDAContext* ctx) {
CHECK(ctx);
CHECK(cublas_fp32_) << "cublas_fp32 should be set first";
ctx->cublas_fp32_ = cublas_fp32_;
}
const cudaStream_t& exec_stream() const { return exec_stream_; }
void SetExecStream(cudaStream_t stream) { exec_stream_ = stream; }
const cudaStream_t& io_stream() const { return io_stream_; }
void SetIoStream(cudaStream_t stream) { io_stream_ = stream; }
std::shared_ptr<cuda::Blas<float>> cublas_fp32() { return cublas_fp32_; }
void SetCuBlasFP32(std::shared_ptr<cuda::Blas<float>> cublas_fp32) {
cublas_fp32_ = cublas_fp32;
}
const std::vector<cudaEvent_t>& input_events() { return input_events_; }
void SetInputEvents(const std::vector<cudaEvent_t>& input_events) {
input_events_.clear();
input_events_.assign(input_events.begin(), input_events.end());
}
const std::vector<cudaEvent_t>& output_events() { return output_events_; }
void SetOutputEvents(const std::vector<cudaEvent_t>& output_events) {
output_events_.clear();
output_events_.assign(output_events.begin(), output_events.end());
}
std::string name() const { return "CUDAContext"; }
CUDAContext& operator=(const CUDAContext& context) {
this->Init(
context.device_id_, context.exec_stream_id_, context.io_stream_id_);
cublas_fp32_ = const_cast<CUDAContext&>(context).cublas_fp32();
return *this;
}
private:
int device_id_;
// overall information
int exec_stream_id_;
int io_stream_id_;
cudaStream_t exec_stream_;
cudaStream_t io_stream_;
// not thread-safe, should allocate for each thread.
std::shared_ptr<cuda::Blas<float>> cublas_fp32_;
// kernel information
std::vector<cudaEvent_t> input_events_;
std::vector<cudaEvent_t> output_events_;
};
#endif
#ifdef LITE_WITH_X86
template <>
class Context<TargetType::kX86> {
......@@ -455,7 +356,9 @@ class ContextScheduler {
return *x;
}
std::unique_ptr<KernelContext> NewContext(TargetType target) {
std::unique_ptr<KernelContext> NewContext(
TargetType target,
/*only used for cuda context*/ int exec_stream_id = 0) {
std::unique_ptr<KernelContext> ctx(new KernelContext);
switch (target) {
case TARGET(kHost):
......@@ -472,7 +375,7 @@ class ContextScheduler {
case TARGET(kCUDA): {
int dev_id = TargetWrapper<TargetType::kCUDA>::GetCurDevice();
auto& context = ctx->As<CUDAContext>();
context.Init(dev_id);
context.Init(dev_id, exec_stream_id);
kernel_contexts_[TargetType::kCUDA].As<CUDAContext>().CopySharedTo(
&context);
} break;
......
......@@ -159,7 +159,7 @@ class Env {
static Devs* devs = new Devs();
return *devs;
}
static void Init(int max_stream = 4) {
static void Init(int max_stream = 6) {
#ifdef LITE_WITH_MLU
CNRT_CALL(cnrtInit(0));
#endif
......@@ -175,6 +175,7 @@ class Env {
} else {
LOG(INFO) << "Found " << count << " device(s)";
}
CHECK_GT(max_stream, 0) << "max_stream must be greater than 0.";
// create all device
for (int i = 0; i < count; i++) {
auto dev = Device<Type>(i, max_stream);
......@@ -234,8 +235,8 @@ class Device<TARGET(kCUDA)> {
std::string name() { return device_prop_.name; }
int core_num() { return device_prop_.multiProcessorCount; }
float max_memory() { return device_prop_.totalGlobalMem / 1048576.; }
std::vector<cudaStream_t> exec_streams() { return exec_stream_; }
std::vector<cudaStream_t> io_streams() { return io_stream_; }
const std::vector<cudaStream_t>& exec_streams() { return exec_stream_; }
const std::vector<cudaStream_t>& io_streams() { return io_stream_; }
int sm_version() { return sm_version_; }
bool has_fp16() { return has_fp16_; }
......
......@@ -37,6 +37,7 @@ lite_cc_library(mir_passes
demo_pass.cc
runtime_context_assign_pass.cc
memory_optimize_pass.cc
multi_stream_analysis_pass.cc
mlu_postprocess_pass.cc
weight_quantization_preprocess_pass.cc
quantized_op_attributes_inference_pass.cc
......
......@@ -14,6 +14,7 @@
#include "lite/core/mir/generate_program_pass.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "lite/core/mir/graph_visualize_pass.h"
......@@ -25,10 +26,37 @@ namespace mir {
void GenerateProgramPass::Apply(const std::unique_ptr<SSAGraph>& graph) {
VLOG(4) << "final program \n" << Visualize(graph.get());
for (auto& item : graph->StmtTopologicalOrder()) {
std::vector<Node*> nodes_in_order;
#ifdef LITE_WITH_CUDA
const std::string depend_pass = "multi_stream_analysis_pass";
const std::string attr_name = "nodes_in_order";
mir::Pass* pass = mir::PassManager::Global().LookUp(depend_pass);
if (pass->HasAttr(attr_name)) {
nodes_in_order = pass->GetAttr<std::vector<Node*>>(attr_name);
}
#endif
if (nodes_in_order.empty()) {
nodes_in_order = graph->StmtTopologicalOrder();
}
for (auto& item : nodes_in_order) {
if (item->IsStmt()) {
auto& stmt = item->AsStmt();
VLOG(4) << stmt;
#ifdef LITE_WITH_CUDA
if (stmt.kernels().front()->target() == TargetType::kCUDA) {
stmt.kernels()
.front()
->mutable_context()
->As<CUDAContext>()
.SetNeedSync(stmt.need_sync_);
stmt.kernels()
.front()
->mutable_context()
->As<CUDAContext>()
.SetSyncStreams(stmt.sync_streams_);
}
#endif
insts_.emplace_back(stmt.op(), std::move(stmt.kernels().front()));
}
}
......
......@@ -85,7 +85,23 @@ std::string Visualize(mir::SSAGraph* graph) {
if (!node->IsStmt()) continue;
auto op_info = node->AsStmt().op_info();
auto op_type = op_info->Type();
std::string op_name = string_format("%s%d", op_type.c_str(), op_idx++);
std::string op_name;
if (node->AsStmt().need_sync_) {
std::ostringstream oss;
for (size_t i = 0; i < node->AsStmt().sync_streams_.size(); ++i) {
oss << std::to_string(node->AsStmt().sync_streams_[i]);
if (i != node->AsStmt().sync_streams_.size() - 1) {
oss << ",";
}
}
op_name = string_format("%s%d, stream=%d, sync_streams={%s}",
op_type.c_str(),
op_idx++,
node->AsStmt().stream_id_,
oss.str().c_str());
} else {
op_name = string_format("%s%d", op_type.c_str(), op_idx++);
}
// Add its input&output variables as the Dot nodes
dot.AddNode(op_name,
{Dot::Attr("shape", "box"),
......@@ -93,7 +109,13 @@ std::string Visualize(mir::SSAGraph* graph) {
Dot::Attr("color", "black"),
Dot::Attr("fillcolor", "yellow")});
for (auto& x : node->inlinks) {
auto var_name = x->AsArg().name;
std::string var_name;
if (x->AsArg().lane != -1) {
var_name = string_format(
"%s, lane=%d", x->AsArg().name.c_str(), x->AsArg().lane);
} else {
var_name = x->AsArg().name;
}
if (!exists_var_names.count(var_name)) {
dot.AddNode(var_name, {});
exists_var_names.insert(var_name);
......@@ -101,7 +123,13 @@ std::string Visualize(mir::SSAGraph* graph) {
dot.AddEdge(var_name, op_name, {});
}
for (auto& x : node->outlinks) {
auto var_name = x->AsArg().name;
std::string var_name;
if (x->AsArg().lane != -1) {
var_name = string_format(
"%s, lane=%d", x->AsArg().name.c_str(), x->AsArg().lane);
} else {
var_name = x->AsArg().name;
}
if (!exists_var_names.count(var_name)) {
dot.AddNode(var_name, {});
exists_var_names.insert(var_name);
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "lite/core/mir/multi_stream_analysis_pass.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "lite/core/device_info.h"
#include "lite/core/mir/graph_visualize_pass.h"
#include "lite/core/mir/pass_registry.h"
#include "lite/core/type_system.h"
namespace paddle {
namespace lite {
namespace mir {
void MultiStreamAnalysisPass::CleanUp() {
exec_ops_.clear();
wait_que_.clear();
wait_que_cpu_.clear();
std::queue<int> empty_queue;
while (!exec_que_.empty()) {
exec_que_.pop();
}
ops_in_streams_.clear();
resources_.clear();
map_arg_to_lane_.clear();
op_types_set_.clear();
io_copy_once_num_ = 0;
}
void MultiStreamAnalysisPass::Init(SSAGraph* graph) {
// If not cleaned, the clone will overlay the previous state
CleanUp();
for (auto& op_node : graph->StmtTopologicalOrder()) {
if (op_node->IsStmt()) {
// Set all outputs of op to inaccessible state.
auto outputs = op_node->outlinks;
for (Node* node : outputs) {
CHECK(node->IsArg());
auto& arg = node->AsArg();
if (!resources_.count(arg.name)) {
resources_[arg.name] = false;
}
}
// Set the weight input of op to be accessible.
auto inputs = op_node->inlinks;
for (Node* node : inputs) {
CHECK(node->IsArg());
auto& arg = node->AsArg();
if (arg.is_weight || arg.is_persist) {
resources_[arg.name] = true;
}
}
// feed and io_copy_once op has no dependencies and can be launched
// directly. Other ops are put into the waiting queue.
if (op_node->AsStmt().op_type() == "feed" ||
op_node->AsStmt().op_type() == "io_copy_once") {
exec_que_.push(op_node);
} else {
auto tgt = op_node->AsStmt().kernels().front()->target();
if (tgt == TargetType::kCUDA) {
wait_que_.push_back(op_node);
} else {
wait_que_cpu_.push_back(op_node);
}
}
op_types_set_.insert(op_node->AsStmt().op_type());
}
}
// Set the stream id according to the number of feed ops, and set the output
// of the feed op to be accessible.
int lane = 0;
auto nodes = graph->inputs();
ops_in_streams_.resize(max_stream_);
for (auto& node : nodes) {
std::string::size_type idx = node->AsArg().name.find("feed");
if (idx != std::string::npos) {
for (auto& feed_ops : node->outlinks) {
if (feed_ops->AsStmt().op_type() == "feed") {
// feed op doesn't need to wait sync.
feed_ops->AsStmt().need_sync_ = false;
CHECK_EQ(static_cast<int>(feed_ops->outlinks.size()), 1)
<< "feed op must have one output.";
for (auto& var : feed_ops->outlinks) {
var->AsArg().lane = lane;
map_arg_to_lane_[var->AsArg().name] = lane;
resources_[var->AsArg().name] = true;
}
feed_ops->AsStmt().stream_id_ = lane;
ops_in_streams_[lane].push_back(feed_ops);
++lane;
if (lane >= max_stream_) {
lane = 0;
}
}
}
}
// set all io_copy_once op in the first stream
for (auto& io_copy_once_ops : node->outlinks) {
if (io_copy_once_ops->AsStmt().op_type() == "io_copy_once") {
ops_in_streams_[0].push_back(io_copy_once_ops);
io_copy_once_ops->AsStmt().stream_id_ = 0;
io_copy_once_ops->AsStmt().need_sync_ = false;
++io_copy_once_num_;
}
}
}
}
bool MultiStreamAnalysisPass::CheckOpSupport() {
std::unordered_set<std::string> invalid_op = {
"while", "conditional_block", "conditional_block_infer", "graph_op"};
for (auto& op_type : op_types_set_) {
if (invalid_op.count(op_type)) {
LOG(INFO) << "multi_stream_analysis_pass don't support " << op_type
<< ", just return.";
return false;
}
}
return true;
}
bool MultiStreamAnalysisPass::IsPrepared(Node* stmt_node) {
// feed op are prepared when init.
std::string op_name = stmt_node->AsStmt().op_type();
if (op_name == "feed") {
return true;
}
// Check is op's input are all accessible.
std::vector<std::string> args;
for (auto* ins : stmt_node->inlinks) {
args.push_back(ins->AsArg().name);
}
return CheckAccess(args);
}
bool MultiStreamAnalysisPass::CheckAccess(
const std::vector<std::string>& args) {
if (args.size() == 0) {
return true;
}
for (auto& name : args) {
if (resources_[name]) {
continue;
} else {
return false;
}
}
return true;
}
int MultiStreamAnalysisPass::SelectStreamId(const std::vector<int>& lanes) {
if (lanes.size() == 0) {
return 0;
}
int res = lanes[0];
int exclude_io_copy_once_num = ops_in_streams_[0].size() - io_copy_once_num_;
int min_num = lanes[0] == 0 ? exclude_io_copy_once_num
: ops_in_streams_[lanes[0]].size();
for (size_t i = 1; i < lanes.size(); ++i) {
int ith_num = lanes[i] == 0 ? exclude_io_copy_once_num
: ops_in_streams_[lanes[i]].size();
if (ith_num < min_num) {
res = lanes[i];
min_num = ith_num;
}
}
return res;
}
void MultiStreamAnalysisPass::Launch(Node* stmt_node) {
// record ops launch order.
exec_que_.push(stmt_node);
std::vector<int> lanes;
for (auto& in_arg : stmt_node->inlinks) {
// Weight parameter does not involve stream id, so just skip it.
if (in_arg->AsArg().is_weight || in_arg->AsArg().is_persist) {
continue;
}
if (std::find(lanes.begin(), lanes.end(), in_arg->AsArg().lane) ==
lanes.end()) {
lanes.push_back(in_arg->AsArg().lane);
}
}
int stream_id = SelectStreamId(lanes);
// If all inputs of the op are on multiple streams, they need to be
// synchronized
if (lanes.size() > 1) {
for (size_t i = 0; i < lanes.size(); ++i) {
if (lanes[i] != stream_id) {
stmt_node->AsStmt().sync_streams_.push_back(lanes[i]);
}
}
stmt_node->AsStmt().need_sync_ = true;
}
// io_copy are nodes inserted across devices and need to be synced.
if (stmt_node->AsStmt().op_type() == "io_copy") {
stmt_node->AsStmt().need_sync_ = true;
}
stmt_node->AsStmt().stream_id_ = stream_id;
// set output lane and set the output of op to be accessible.
for (auto& out_arg : stmt_node->outlinks) {
out_arg->AsArg().lane = stream_id;
resources_[out_arg->AsArg().name] = true;
}
ops_in_streams_[stream_id].push_back(stmt_node);
}
void MultiStreamAnalysisPass::Apply(const std::unique_ptr<SSAGraph>& graph) {
#ifdef LITE_WITH_CUDA
typename Env<TargetType::kCUDA>::Devs& devs =
Env<TargetType::kCUDA>::Global();
int dev_id = TargetWrapper<TargetType::kCUDA>::GetCurDevice();
max_stream_ = devs[dev_id].max_stream();
#else
LOG(FATAL) << "Please re-compile by setting the cmake flag LITE_WITH_CUDA=ON";
#endif
// Find the correct startup sequence for op.
Init(graph.get());
bool is_valid = CheckOpSupport();
if (!is_valid) {
return;
}
size_t prev_size;
while (!(this->wait_que_.empty() && this->wait_que_cpu_.empty())) {
prev_size = this->wait_que_.size() + this->wait_que_cpu_.size();
// launch the acessible cuda kernel and remove it from wait que.
for (auto it = this->wait_que_.begin(); it != this->wait_que_.end();) {
if (IsPrepared(*it)) {
Launch(*it);
it = wait_que_.erase(it);
} else {
++it;
}
}
// launch the accessible cpu kernel and remove it from wait que.
for (auto cpu_it = this->wait_que_cpu_.begin();
cpu_it != this->wait_que_cpu_.end();) {
if (IsPrepared(*cpu_it)) {
Launch(*cpu_it);
cpu_it = wait_que_cpu_.erase(cpu_it);
} else {
++cpu_it;
}
}
if (this->wait_que_.size() + this->wait_que_cpu_.size() == prev_size) {
LOG(FATAL) << "network topo error!";
}
}
// Get exec ops order.
while (!exec_que_.empty()) {
auto* node = exec_que_.front();
exec_ops_.push_back(node);
VLOG(4) << node->AsStmt().op_type()
<< " stream: " << node->AsStmt().stream_id_
<< ", sync: " << node->AsStmt().need_sync_;
if (node->AsStmt().need_sync_) {
for (size_t i = 0; i < node->AsStmt().sync_streams_.size(); ++i) {
VLOG(4) << " " << node->AsStmt().sync_streams_[i];
}
}
exec_que_.pop();
}
// Set attribute parameters, for passing parameters between passes
const std::string attr_name{"nodes_in_order"};
SetAttr<std::vector<Node*>>(attr_name, &exec_ops_);
LOG(INFO) << "stream " << 0 << " has "
<< ops_in_streams_[0].size() - io_copy_once_num_
<< " ops. (exclude io_copy_once).";
for (size_t i = 1; i < ops_in_streams_.size(); ++i) {
LOG(INFO) << "stream " << i << " has " << ops_in_streams_[i].size()
<< " ops.";
}
}
} // namespace mir
} // namespace lite
} // namespace paddle
REGISTER_MIR_PASS(multi_stream_analysis_pass,
paddle::lite::mir::MultiStreamAnalysisPass)
.BindTargets({TARGET(kCUDA)});
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <list>
#include <memory>
#include <queue>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "lite/core/kernel.h"
#include "lite/core/mir/pass.h"
namespace paddle {
namespace lite {
namespace mir {
/*
* MultiStreamAnalysisPass will find the correct launch sequence for all ops.
* Ideally, the order should be multiple asynchronous ops and a small number of
* synchronous ops.
*/
class MultiStreamAnalysisPass : public StmtPass {
public:
void Apply(const std::unique_ptr<SSAGraph>& graph) override;
private:
// Init resource list. Set all ops except feed to inaccessible state and set
// stream id according to the numer of inputs.
void Init(SSAGraph* graph);
// Clean state information of all member variables.
void CleanUp();
// After launching, unlock the output resources of op.
void Launch(Node* stmt_node);
// If all inputs of an op are accessible, the op is considered to be in the
// prepared state
bool IsPrepared(Node* stmt_node);
// Determine if all inputs of op are accessible.
bool CheckAccess(const std::vector<std::string>& args);
// The logic of selecting a stream:
// 1. Make the number of ops on each stream as close as possible.
// 2. The selected stream must be one of the streams contained in the input
// arg
int SelectStreamId(const std::vector<int>& lanes);
// Check if the model's ops are all supported. If you encounter unsupported
// ops, exit
bool CheckOpSupport();
private:
std::list<Node*> wait_que_;
std::list<Node*> wait_que_cpu_;
std::queue<Node*> exec_que_;
std::vector<Node*> exec_ops_;
std::vector<std::vector<Node*>> ops_in_streams_;
std::unordered_map<std::string, bool> resources_;
std::unordered_map<std::string, int> map_arg_to_lane_;
int max_stream_;
int io_copy_once_num_;
std::unordered_set<std::string> op_types_set_;
};
} // namespace mir
} // namespace lite
} // namespace paddle
......@@ -80,6 +80,12 @@ class Node {
// Description.
std::string desc;
// for cuda multi stream
bool need_sync_{false};
int stream_id_{0};
// streams which need to be sync. exclude stream_id_
std::vector<int> sync_streams_{};
};
struct Arg {
......@@ -93,6 +99,7 @@ class Node {
// if the need more than one tool operator(eg. io_copy layout calib), the
// argument between them should be persist to make sure it's only run once
bool is_persist{false};
int lane{-1};
};
Arg& AsArg(const std::string& name, int id);
......
......@@ -17,9 +17,11 @@
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "lite/core/mir/node.h"
#include "lite/core/mir/ssa_graph.h"
#include "lite/utils/varient.h"
namespace paddle {
namespace lite {
......@@ -121,6 +123,27 @@ class Pass {
virtual ~Pass() = default;
bool HasAttr(const std::string& attr_name) const {
return pass_attrs_.count(attr_name) > 0;
}
// Set a pointer to the attribute. Specific pass itself takes ownership of the
// attribute.
template <typename AttrType>
void SetAttr(const std::string& attr_name, const AttrType* attr) {
VLOG(4) << "Setting the attribute " << attr_name << " for the pass "
<< name_;
pass_attrs_[attr_name].set<const AttrType>(*attr);
}
// Get a reference to the attribute previously set.
template <typename AttrType>
const AttrType& GetAttr(const std::string& attr_name) const {
CHECK(pass_attrs_.count(attr_name))
<< attr_name << " attr not register for pass " << name_;
return pass_attrs_.at(attr_name).get<const AttrType>();
}
private:
const Kind kind_;
std::string name_;
......@@ -128,6 +151,8 @@ class Pass {
std::set<TargetType> bound_targets_;
std::set<TargetType> excluded_targets_;
std::unordered_map<std::string, std::set<lite_api::Place>> bound_kernels_;
std::unordered_map<std::string, variant<Node, std::vector<Node*>>>
pass_attrs_;
};
// Different kinds.
......
......@@ -45,9 +45,10 @@ class RuntimeContextAssignPass : public StmtPass {
inst.picked_kernel().target()));
}
#else
inst.picked_kernel().SetContext(
ContextScheduler::Global().NewContext(inst.picked_kernel().target()));
int stream_id = inst.stream_id_;
inst.picked_kernel().SetContext(ContextScheduler::Global().NewContext(
inst.picked_kernel().target(), stream_id));
#endif
}
}
......
......@@ -127,7 +127,21 @@ class Optimizer {
"memory_optimize_pass"}};
if (passes.size() == 1) {
passes_local.push_back(passes[0]);
// multi_stream_analysis_pass must be in the front of
// runtime_context_assign_pass
const std::string msa_pass{"multi_stream_analysis_pass"};
const std::string depend_pass{"runtime_context_assign_pass"};
if (passes[0] == msa_pass) {
auto iter =
std::find(passes_local.begin(), passes_local.end(), depend_pass);
if (iter != passes_local.end()) {
passes_local.insert(iter, msa_pass);
} else {
CHECK(false) << "Not find " << depend_pass;
}
} else {
passes_local.push_back(passes[0]);
}
}
RunPasses(passes_local);
} else {
......
......@@ -178,6 +178,13 @@ class PrecisionProfiler {
write_result_to_file&& write_tensorfile<int32_t>(in, name);
return;
}
case PRECISION(kInt64): {
auto ptr = in->data<int64_t>();
*mean = compute_mean<int64_t>(ptr, in->numel());
*std_dev = compute_standard_deviation<int64_t>(
ptr, in->numel(), true, *mean);
return;
}
default:
*mean = -333333333333;
*std_dev = -33333333333;
......
......@@ -145,6 +145,11 @@ void RuntimeProgram::Run() {
for (auto& inst : instructions_) {
#ifndef LITE_WITH_FPGA
if (inst.is_feed_fetch_op()) continue;
#endif
#ifdef LITE_WITH_CUDA
if (inst.need_sync()) {
inst.Sync();
}
#endif
inst.Run();
#ifdef LITE_WITH_PRECISION_PROFILE
......
......@@ -108,6 +108,18 @@ struct Instruction {
bool is_feed_fetch_op() const { return is_feed_fetch_op_; }
#ifdef LITE_WITH_CUDA
bool need_sync() const {
if (kernel_->target() == TargetType::kCUDA) {
return kernel_->mutable_context()->As<CUDAContext>().need_sync();
} else {
// the io_copy kernel has synced, so cpu kernels don't need sync..
return false;
}
}
void Sync() const { kernel_->mutable_context()->As<CUDAContext>().Sync(); }
#endif
#ifdef LITE_WITH_PROFILE
void set_profiler(profile::Profiler* profiler) {
profiler_ = profiler;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册