提交 d71694a9 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!5007 [MS][LITE]fix the issue of parallel executor

Merge pull request !5007 from zhaizhiqiang/master
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
#include <utility>
#include "src/runtime/parallel_executor.h" #include "src/runtime/parallel_executor.h"
using mindspore::predict::ThreadPool; using mindspore::predict::ThreadPool;
using mindspore::predict::TvmEnv; using mindspore::predict::TvmEnv;
...@@ -25,25 +26,15 @@ ParallelExecutor::~ParallelExecutor() { ...@@ -25,25 +26,15 @@ ParallelExecutor::~ParallelExecutor() {
} }
int ParallelExecutor::Prepare(std::vector<mindspore::kernel::LiteKernel *> &kernels) { int ParallelExecutor::Prepare(std::vector<mindspore::kernel::LiteKernel *> &kernels) {
pool = new ThreadPool(); pool = new ThreadPool();
pool->ConfigThreadPool(NO_BIND, MAX_THREAD_NUM); if (pool == nullptr) {
for (mindspore::kernel::LiteKernel *kernel : kernels) { MS_LOG(ERROR) << "Memory error: fail to new ThreadPool";
refCount[kernel] = kernel->out_kernels().size(); return RET_ERROR;
} }
pool->ConfigMaxThreadNum(MAX_THREAD_NUM);
pool->ConfigThreadPool(NO_BIND, MAX_THREAD_NUM);
return RET_OK; return RET_OK;
} }
void ParallelExecutor::PrepareReadyKernels(const std::vector<mindspore::kernel::LiteKernel *> &kernels) {
for (auto iter = refCount.begin(); iter != refCount.end();) {
if (iter->second == 0) {
readyKernels.emplace_back(iter->first);
iter = refCount.erase(iter);
} else {
iter++;
}
}
results.resize(readyKernels.size());
}
static int RunKernel(int index, TvmEnv *env, void *data) { static int RunKernel(int index, TvmEnv *env, void *data) {
ParallelExecutor *executor = reinterpret_cast<ParallelExecutor *>(data); ParallelExecutor *executor = reinterpret_cast<ParallelExecutor *>(data);
auto kernel = executor->GetReadyKernel(index); auto kernel = executor->GetReadyKernel(index);
...@@ -83,27 +74,49 @@ int ParallelExecutor::Run(std::vector<tensor::Tensor *> &in_tensors, std::vector ...@@ -83,27 +74,49 @@ int ParallelExecutor::Run(std::vector<tensor::Tensor *> &in_tensors, std::vector
} }
kernel::LiteKernelUtil::InitTensorRefCount(kernels); kernel::LiteKernelUtil::InitTensorRefCount(kernels);
PrepareReadyKernels(kernels); for (auto kernel : kernels) {
if (kernel->in_kernels().size() == 0) {
readyKernels.emplace_back(kernel);
continue;
}
refCount[kernel] = kernel->in_kernels().size();
}
std::vector<kernel::LiteKernel *> newReadyKernels;
while (readyKernels.size() > 0) { while (readyKernels.size() > 0) {
results.resize(readyKernels.size(), RET_OK);
pool->LaunchWork(RunKernel, this, readyKernels.size()); pool->LaunchWork(RunKernel, this, readyKernels.size());
if (std::find_if(results.begin(), results.end(), [](const int &ret) { return (ret != 0); }) != results.end()) { if (std::find_if(results.begin(), results.end(), [](const int &ret) { return (ret != 0); }) != results.end()) {
return RET_ERROR; return RET_ERROR;
} }
for (auto completedKernel : readyKernels) { newReadyKernels.clear();
for (auto out : completedKernel->out_kernels()) { for (auto completed : readyKernels) {
for (auto out : completed->out_kernels()) {
auto iter = refCount.find(out); auto iter = refCount.find(out);
if (iter == refCount.end()) { if (iter == refCount.end()) {
continue; continue;
} }
(iter->second)--; (iter->second)--;
if (iter->second <= 0) { if (iter->second <= 0) {
newReadyKernels.emplace_back(iter->first);
refCount.erase(iter); refCount.erase(iter);
} }
} }
for (auto input_kernel : completed->in_kernels()) {
MS_ASSERT(input_kernel != nullptr);
if (input_kernel->is_model_output()) {
continue;
}
auto ret = input_kernel->DecOutTensorRefCount();
if (0 != ret) {
MS_LOG(WARNING) << "DecOutTensorRefCount for kernel" << completed->name() << " failed";
return -1;
}
}
} }
readyKernels.clear(); readyKernels.clear();
PrepareReadyKernels(kernels); readyKernels = std::move(newReadyKernels);
} }
return RET_OK; return RET_OK;
......
...@@ -39,9 +39,6 @@ class ParallelExecutor : public Executor { ...@@ -39,9 +39,6 @@ class ParallelExecutor : public Executor {
inline kernel::LiteKernel *GetReadyKernel(const int index) { return readyKernels.at(index); } inline kernel::LiteKernel *GetReadyKernel(const int index) { return readyKernels.at(index); }
inline void SetResult(const int index, const int result) { results.at(index) = result; } inline void SetResult(const int index, const int result) { results.at(index) = result; }
private:
void PrepareReadyKernels(const std::vector<kernel::LiteKernel *> &kernels);
private: private:
predict::ThreadPool *pool; predict::ThreadPool *pool;
std::unordered_map<kernel::LiteKernel *, size_t> refCount; std::unordered_map<kernel::LiteKernel *, size_t> refCount;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册