未验证 提交 d8a4e3ef 编写于 作者: D dolphin8 提交者: GitHub

Merge pull request #441 from dolphin8/develop

fix #442 multithread executor
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#ifdef PADDLE_EXECUTOR_MULTITHREAD
#include <string>
#include <unordered_map>
#include <vector>
#include "framework/operator.h"
namespace paddle_mobile {
class depCore {
public:
template <typename Dtype>
void analysisDep(
const std::vector<std::shared_ptr<framework::OperatorBase<Dtype>>>& ops) {
std::unordered_map<std::string, int> vars;
size_t nop = ops.size();
for (size_t i = 0; i < nop; i++) {
const auto& op = ops[i];
for (const auto& kv : op->Outputs()) {
for (const auto& v : kv.second) {
vars[v] = i;
}
}
}
deps.resize(nop);
next.resize(nop);
for (size_t i = 0; i < nop; i++) {
const auto& op = ops[i];
for (const auto& kv : op->Inputs()) {
for (const auto& v : kv.second) {
if (vars.find(v) == vars.end()) {
continue;
}
int di = vars[v];
if (di == i) {
continue;
}
if (std::find(deps[i].begin(), deps[i].end(), di) != deps[i].end()) {
continue;
}
deps[i].push_back(di);
next[di].push_back(i);
}
}
}
}
const std::vector<int>& getNext(int i) { return next[i]; }
const std::vector<int>& getDeps(int i) { return deps[i]; }
std::vector<std::vector<int>> deps;
std::vector<std::vector<int>> next;
};
} // namespace paddle_mobile
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
namespace paddle_mobile {
class ThreadPool {
public:
static ThreadPool& getThreadPool();
static int getThreadPoolThreadId();
explicit ThreadPool(size_t);
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
int getTid(const std::thread::id& id) {
for (int i = 0; i < workers.size(); i++) {
if (workers[i].get_id() == id) {
return i;
}
}
return -1;
}
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::function<void()>> tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->tasks.empty(); });
// for (;;) {
// if (this->stop || !this->tasks.empty()) {
// break;
// }
// lock.unlock();
// lock.lock();
// }
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
// if(stop)
// throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) worker.join();
}
ThreadPool& ThreadPool::getThreadPool() {
static ThreadPool threadPool(3);
return threadPool;
}
int ThreadPool::getThreadPoolThreadId() {
return getThreadPool().getTid(std::this_thread::get_id());
}
} // namespace paddle_mobile
...@@ -12,15 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,15 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "io.h" #include "io/io.h"
#include <vector> #include <vector>
#define PADDLE_MOBILE_PROFILE
#ifdef PADDLE_MOBILE_PROFILE
#include <algorithm>
#include <ctime>
#include <unordered_map>
#endif
#include "common/enforce.h" #include "common/enforce.h"
#include "common/log.h" #include "common/log.h"
#include "framework/framework.pb-c.h" #include "framework/framework.pb-c.h"
...@@ -31,6 +24,12 @@ limitations under the License. */ ...@@ -31,6 +24,12 @@ limitations under the License. */
#include "framework/program/var_desc.h" #include "framework/program/var_desc.h"
#include "framework/scope.h" #include "framework/scope.h"
#include "framework/tensor.h" #include "framework/tensor.h"
#ifdef PADDLE_EXECUTOR_MULTITHREAD
#include <algorithm>
#include <queue>
#include <utility>
#include "common/threadpool.h"
#endif
namespace paddle_mobile { namespace paddle_mobile {
using framework::Variable; using framework::Variable;
...@@ -142,8 +141,6 @@ const framework::Program<Dtype, P> Loader<Dtype, P>::LoadProgram( ...@@ -142,8 +141,6 @@ const framework::Program<Dtype, P> Loader<Dtype, P>::LoadProgram(
} }
} }
// originProgramDesc->Description("program: ");
if (optimize) { if (optimize) {
framework::ProgramOptimize program_optimize; framework::ProgramOptimize program_optimize;
program.optimizeProgram = program.optimizeProgram =
...@@ -164,7 +161,6 @@ template class Loader<FPGA, Precision::FP32>; ...@@ -164,7 +161,6 @@ template class Loader<FPGA, Precision::FP32>;
template class Loader<GPU_MALI, Precision::FP32>; template class Loader<GPU_MALI, Precision::FP32>;
#pragma mark - executor #pragma mark - executor
template <typename Dtype, Precision P> template <typename Dtype, Precision P>
Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size, Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size,
bool use_optimize) bool use_optimize)
...@@ -178,6 +174,9 @@ Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size, ...@@ -178,6 +174,9 @@ Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size,
variable_ptr[0].SetValue<int>(batch_size); variable_ptr[0].SetValue<int>(batch_size);
const std::vector<std::shared_ptr<framework::BlockDesc>> blocks = const std::vector<std::shared_ptr<framework::BlockDesc>> blocks =
to_predict_program_->Blocks(); to_predict_program_->Blocks();
#ifdef PADDLE_EXECUTOR_MULTITHREAD
depManager.resize(blocks.size());
#endif
for (int i = 0; i < blocks.size(); ++i) { for (int i = 0; i < blocks.size(); ++i) {
std::shared_ptr<framework::BlockDesc> block_desc = blocks[i]; std::shared_ptr<framework::BlockDesc> block_desc = blocks[i];
std::vector<std::shared_ptr<framework::OpDesc>> ops = block_desc->Ops(); std::vector<std::shared_ptr<framework::OpDesc>> ops = block_desc->Ops();
...@@ -188,8 +187,10 @@ Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size, ...@@ -188,8 +187,10 @@ Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size,
op->Type(), op->GetInputs(), op->GetOutputs(), op->GetAttrMap(), op->Type(), op->GetInputs(), op->GetOutputs(), op->GetAttrMap(),
program_.scope); program_.scope);
op_base->InferShape(); op_base->InferShape();
ops_of_block_[*block_desc.get()].push_back(op_base); ops_of_block_[*block_desc.get()].push_back(op_base);
#ifdef PADDLE_EXECUTOR_MULTITHREAD
depManager[i].analysisDep(ops_of_block_[*block_desc.get()]);
#endif
} }
} }
if (program_.is_commbine) { if (program_.is_commbine) {
...@@ -350,48 +351,132 @@ std::shared_ptr<framework::Tensor> Executor<Dtype, P>::Predict( ...@@ -350,48 +351,132 @@ std::shared_ptr<framework::Tensor> Executor<Dtype, P>::Predict(
feed_tensor->ShareDataWith(t); feed_tensor->ShareDataWith(t);
std::shared_ptr<framework::BlockDesc> to_predict_block = std::shared_ptr<framework::BlockDesc> to_predict_block =
to_predict_program_->Block(0); to_predict_program_->Block(0);
auto &ops = ops_of_block_[*to_predict_block.get()];
#ifdef PADDLE_MOBILE_PROFILE #ifdef PADDLE_MOBILE_PROFILE
std::unordered_map<std::string, clock_t> _profile; std::vector<ProfInfo> profile(ops.size());
#endif #endif
for (int j = 0; j < ops_of_block_[*to_predict_block.get()].size(); ++j) { #ifdef PADDLE_EXECUTOR_MULTITHREAD
auto op = ops_of_block_[*to_predict_block.get()][j]; std::mutex m;
std::condition_variable cv;
std::queue<int> next;
next.push(0);
int rsize = ops.size();
std::vector<int> status(rsize, 0);
auto &threadPool = ThreadPool::getThreadPool();
auto &dep = depManager[0];
auto finishF = [&ops, &m, &cv, &next, &status, &rsize, &dep](int opi) {
std::lock_guard<std::mutex> lk(m);
rsize--;
status[opi] = 2;
for (int i : dep.getNext(opi)) {
bool ok = true;
for (int j : dep.getDeps(i)) {
if (status[j] != 2) {
ok = false;
break;
}
}
if (ok && (status[i] == 0)) {
next.push(i);
}
}
cv.notify_one();
};
for (;;) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&next, &rsize] { return rsize == 0 || !next.empty(); });
if (rsize == 0) {
break;
}
while (next.size() > 0) {
int opi = next.front();
next.pop();
status[opi] = 1;
threadPool.enqueue([opi, &ops, &finishF, &profile] {
auto &op = ops[opi];
#ifdef PADDLE_MOBILE_PROFILE #ifdef PADDLE_MOBILE_PROFILE
_profile[op->Type()] -= clock(); struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
profile[opi].runBegin = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
profile[opi].tid = ThreadPool::getThreadPoolThreadId();
#endif #endif
op->Run(); ops[opi]->Run();
#ifdef PADDLE_MOBILE_PROFILE #ifdef PADDLE_MOBILE_PROFILE
_profile[op->Type()] += clock(); clock_gettime(CLOCK_MONOTONIC, &ts);
profile[opi].runEnd = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
#endif #endif
finishF(opi);
});
}
} }
#else
for (int i = 0; i < ops.size(); i++) {
#ifdef PADDLE_MOBILE_PROFILE #ifdef PADDLE_MOBILE_PROFILE
{ struct timespec ts;
std::cout << "====================[ profile ]======================\n"; clock_gettime(CLOCK_MONOTONIC, &ts);
using prof_t = std::pair<std::string, clock_t>; profile[i].runBegin = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
std::vector<prof_t> _tprofile(_profile.begin(), _profile.end()); #endif
clock_t _ptotal = 0; ops[i]->Run();
for (auto const &p : _tprofile) { #ifdef PADDLE_MOBILE_PROFILE
_ptotal += p.second; clock_gettime(CLOCK_MONOTONIC, &ts);
} profile[i].runEnd = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
auto compf = [](const prof_t &a, const prof_t &b) { #endif
return a.second > b.second;
};
std::sort(_tprofile.begin(), _tprofile.end(), compf);
_tprofile.push_back(std::make_pair("total", _ptotal));
for (auto const &p : _tprofile) {
printf("%-16s\t%-10.0f\t%-.4f\n", p.first.c_str(), (float)p.second,
(float)p.second / _ptotal * 100.0);
}
std::cout << "====================[---------]======================\n";
} }
#endif #endif
auto ops = ops_of_block_[*to_predict_program_->Block(0)];
auto last_op = ops.rbegin(); auto last_op = ops.rbegin();
auto output_map = (*last_op)->Outputs(); auto output_map = (*last_op)->Outputs();
std::vector<std::string> out_keys = (*last_op)->GetOutKeys(); std::vector<std::string> out_keys = (*last_op)->GetOutKeys();
PADDLE_MOBILE_ENFORCE(out_keys.size() > 0, "the last op contains no output"); PADDLE_MOBILE_ENFORCE(out_keys.size() > 0, "the last op contains no output");
framework::LoDTensor *output_tensor = framework::LoDTensor *output_tensor =
framework::GetVarValue<framework::LoDTensor>(out_keys[0], output_map, framework::GetVarValue<framework::LoDTensor>(out_keys[0], output_map,
*(program_.scope)); *(program_.scope));
#ifdef PADDLE_MOBILE_PROFILE
#ifdef PADDLE_EXECUTOR_MULTITHREAD
// TODO expose profile info as an interface, user can get them to analysis
// the performance of their deepnet.
FILE *df = fopen("net.dot", "w");
fprintf(df, "digraph {\n");
for (int i = 0; i < ops.size(); i++) {
for (int j : dep.getNext(i)) {
fprintf(df, "op_%d -> op_%d\n", i, j);
}
}
for (int i = 0; i < ops.size(); i++) {
fprintf(df, "op_%d[label=\"%s (%d)\"]\n", i, ops[i]->Type().c_str(), i);
}
fprintf(df, "}\n");
fclose(df);
#endif
FILE *pf = fopen("profile.out", "w");
std::unordered_map<std::string, uint64_t> _tp;
for (int i = 0; i < profile.size(); i++) {
const auto &pInfo = profile[i];
uint64_t timeCost = pInfo.runEnd - pInfo.runBegin;
_tp[ops[i]->Type()] += timeCost;
fprintf(pf, "%d\t%s\t%d\t%llu\t%llu\t%llu\n", i, ops[i]->Type().c_str(),
pInfo.tid, pInfo.runBegin, pInfo.runEnd, timeCost);
}
fclose(pf);
printf("====================[ profile ]======================\n");
using prof_t = std::pair<std::string, uint64_t>;
std::vector<prof_t> _tv(_tp.begin(), _tp.end());
uint64_t _ptotal = 0;
for (auto const &p : _tv) {
_ptotal += p.second;
}
auto compf = [](const prof_t &a, const prof_t &b) {
return a.second > b.second;
};
std::sort(_tv.begin(), _tv.end(), compf);
_tv.push_back(std::make_pair("total", _ptotal));
for (auto const &p : _tv) {
printf("%-16s\t%-10.0f\t%-2.4f\n", p.first.c_str(), (float)p.second,
(float)p.second / _ptotal * 100.0);
}
printf("====================[---------]======================\n");
#endif
return std::shared_ptr<framework::Tensor>(output_tensor); return std::shared_ptr<framework::Tensor>(output_tensor);
} }
template <typename Dtype, Precision P> template <typename Dtype, Precision P>
......
...@@ -18,12 +18,17 @@ limitations under the License. */ ...@@ -18,12 +18,17 @@ limitations under the License. */
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include "common/types.h" #include "common/types.h"
#include "framework/lod_tensor.h" #include "framework/lod_tensor.h"
#include "framework/operator.h" #include "framework/operator.h"
#include "framework/program/program.h" #include "framework/program/program.h"
#include "framework/tensor.h" #include "framework/tensor.h"
#ifdef PADDLE_EXECUTOR_MULTITHREAD
#include <condition_variable>
#include <mutex>
#include <thread>
#include "common/depCore.h"
#endif
namespace paddle_mobile { namespace paddle_mobile {
...@@ -92,6 +97,16 @@ class Executor { ...@@ -92,6 +97,16 @@ class Executor {
std::vector<std::shared_ptr<framework::OperatorBase<Dtype>>>> std::vector<std::shared_ptr<framework::OperatorBase<Dtype>>>>
ops_of_block_; ops_of_block_;
bool use_optimize_ = false; bool use_optimize_ = false;
#ifdef PADDLE_EXECUTOR_MULTITHREAD
std::vector<depCore> depManager;
#endif
#ifdef PADDLE_MOBILE_PROFILE
struct ProfInfo {
int tid = 0;
uint64_t runBegin = 0UL;
uint64_t runEnd = 0UL;
};
#endif
}; };
} // namespace paddle_mobile } // namespace paddle_mobile
#!/usr/bin/env sh
cat <<EOF
<html>
<head>
<style>
html, body {
position: absolute;
width: 100%;
height: 100%;
margin: 0;
}
div.timeview {
width: 100%;
position: relative;
overflow: scroll;
}
ul {
position: absolute;
margin: 0;
list-style:none;
padding: 0;
margin: 0;
}
li {
height: 15px;
position: absolute;
background: blue;
}
li:nth-child(odd) {
background: blue;
}
li:nth-child(even) {
background: rebeccapurple;
}
ul.timeline {
z-index: -1;
}
ul.timeline li {
position: relative;
height: 15px;
width: 100%;
}
ul.timeline li:nth-child(odd) {
background: beige;
}
ul.timeline li:nth-child(even) {
background: antiquewhite;
}
</style>
</head>
<body>
<div class="timeview">
<ul>
EOF
min=$(awk 'NR==1{min=$4} NR>1{if($4 < min) min=$4} END{print min}' $1)
max=$(awk 'NR==1{max=$5} NR>1{if($5 > max) max=$5} END{print max}' $1)
sort $1 -k1,1n | awk -v max="$max" -v min="$min" '
BEGIN {
total = max - min
}
{
opid = $1
optype = $2
tid = $3
cb = $4
ce = $5
cl = $6
sum += $4 - $3
print "<li class=\"timeline\"" \
" data-opid=\"" opid "\"" \
" data-optype=\"" optype "\"" \
" data-tid=\"" tid "\"" \
" data-begin=\"" cb "\"" \
" data-end=\"" ce "\"" \
"></li>"
}
'
cat <<EOF
</ul>
</div>
<pre>
EOF
echo "==================[ profile ]==================="
cat $1 | awk '
NR>1{
optype = $2
sum += $5 - $4
count[$2] += $6
}
END {
for (t in count) {
msg = sprintf("%-16s\t%-10d\t%-.4f", t, count[t], count[t]*100 / sum);
print msg
}
}' | sort -k2,2nr
cat $1 | awk '
NR>1{
sum += $5 - $4
}
END {
msg = sprintf("%-16s\t%-10d\t%-.4f", "total", sum, 100);
print msg
}'
cat <<EOF
</pre>
<script>
const min= $min;
const max= $max;
const px_per_nanosecond = 1/1000000;
const scale = px_per_nanosecond;
const li = document.querySelectorAll('li');
const thread = new Set();
for (let i = 0; i < li.length; i++) {
const prof = li[i].dataset;
li[i].style.width = (prof.end - prof.begin)*scale + 'px';
li[i].style.left = (prof.begin - min)*scale + 'px';
li[i].style.top = prof.tid * 15 + 'px';
thread.add(prof.tid);
}
const ul = document.createElement('ul');
ul.classList.add('timeline');
ul.style.width = (max - min)*scale + 'px';
thread.forEach(i => {
const l = document.createElement('li');
ul.appendChild(l);
});
const timeview = document.querySelector('.timeview');
timeview.appendChild(ul);
timeview.style.height = thread.size * 15 + 'px';
</script>
</body>
</html>
EOF
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册