提交 7e6c912d 编写于 作者: D dolphin8

multithread executor

上级 5401bcdf
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#ifdef PADDLE_EXECUTOR_MULTITHREAD
#include <string>
#include <unordered_map>
#include <vector>
#include "framework/operator.h"
namespace paddle_mobile {
class depCore {
public:
template <typename Dtype>
void analysisDep(
const std::vector<std::shared_ptr<framework::OperatorBase<Dtype>>>& ops) {
std::unordered_map<std::string, int> vars;
size_t nop = ops.size();
for (size_t i = 0; i < nop; i++) {
const auto& op = ops[i];
for (const auto& kv : op->Outputs()) {
for (const auto& v : kv.second) {
vars[v] = i;
}
}
}
deps.resize(nop);
next.resize(nop);
for (size_t i = 0; i < nop; i++) {
const auto& op = ops[i];
for (const auto& kv : op->Inputs()) {
for (const auto& v : kv.second) {
if (vars.find(v) == vars.end()) {
continue;
}
int di = vars[v];
if (di == i) {
continue;
}
if (std::find(deps[i].begin(), deps[i].end(), di) != deps[i].end()) {
continue;
}
deps[i].push_back(di);
next[di].push_back(i);
}
}
}
}
const std::vector<int>& getNext(int i) { return next[i]; }
const std::vector<int>& getDeps(int i) { return deps[i]; }
std::vector<std::vector<int>> deps;
std::vector<std::vector<int>> next;
};
} // namespace paddle_mobile
#endif
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class ThreadPool {
public:
static ThreadPool& getThreadPool();
static int getThreadPoolThreadId();
explicit ThreadPool(size_t);
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
int getTid(const std::thread::id& id) {
for (int i = 0; i < workers.size(); i++) {
if (workers[i].get_id() == id) {
return i;
}
}
return -1;
}
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::function<void()>> tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->tasks.empty(); });
// for (;;) {
// if (this->stop || !this->tasks.empty()) {
// break;
// }
// lock.unlock();
// lock.lock();
// }
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
// if(stop)
// throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) worker.join();
}
ThreadPool& ThreadPool::getThreadPool() {
static ThreadPool threadPool(3);
return threadPool;
}
int ThreadPool::getThreadPoolThreadId() {
return getThreadPool().getTid(std::this_thread::get_id());
}
......@@ -12,15 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "io.h"
#include "io/io.h"
#include <vector>
#define PADDLE_MOBILE_PROFILE
#ifdef PADDLE_MOBILE_PROFILE
#include <algorithm>
#include <ctime>
#include <unordered_map>
#endif
#include "common/enforce.h"
#include "common/log.h"
#include "framework/framework.pb-c.h"
......@@ -31,6 +24,12 @@ limitations under the License. */
#include "framework/program/var_desc.h"
#include "framework/scope.h"
#include "framework/tensor.h"
#ifdef PADDLE_EXECUTOR_MULTITHREAD
#include <algorithm>
#include <queue>
#include <utility>
#include "common/threadpool.h"
#endif
namespace paddle_mobile {
using framework::Variable;
......@@ -142,8 +141,6 @@ const framework::Program<Dtype, P> Loader<Dtype, P>::LoadProgram(
}
}
// originProgramDesc->Description("program: ");
if (optimize) {
framework::ProgramOptimize program_optimize;
program.optimizeProgram =
......@@ -164,7 +161,6 @@ template class Loader<FPGA, Precision::FP32>;
template class Loader<GPU_MALI, Precision::FP32>;
#pragma mark - executor
template <typename Dtype, Precision P>
Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size,
bool use_optimize)
......@@ -178,6 +174,9 @@ Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size,
variable_ptr[0].SetValue<int>(batch_size);
const std::vector<std::shared_ptr<framework::BlockDesc>> blocks =
to_predict_program_->Blocks();
#ifdef PADDLE_EXECUTOR_MULTITHREAD
depManager.resize(blocks.size());
#endif
for (int i = 0; i < blocks.size(); ++i) {
std::shared_ptr<framework::BlockDesc> block_desc = blocks[i];
std::vector<std::shared_ptr<framework::OpDesc>> ops = block_desc->Ops();
......@@ -188,8 +187,10 @@ Executor<Dtype, P>::Executor(const framework::Program<Dtype> p, int batch_size,
op->Type(), op->GetInputs(), op->GetOutputs(), op->GetAttrMap(),
program_.scope);
op_base->InferShape();
ops_of_block_[*block_desc.get()].push_back(op_base);
#ifdef PADDLE_EXECUTOR_MULTITHREAD
depManager[i].analysisDep(ops_of_block_[*block_desc.get()]);
#endif
}
}
if (program_.is_commbine) {
......@@ -350,48 +351,132 @@ std::shared_ptr<framework::Tensor> Executor<Dtype, P>::Predict(
feed_tensor->ShareDataWith(t);
std::shared_ptr<framework::BlockDesc> to_predict_block =
to_predict_program_->Block(0);
auto &ops = ops_of_block_[*to_predict_block.get()];
#ifdef PADDLE_MOBILE_PROFILE
std::unordered_map<std::string, clock_t> _profile;
std::vector<ProfInfo> profile(ops.size());
#endif
for (int j = 0; j < ops_of_block_[*to_predict_block.get()].size(); ++j) {
auto op = ops_of_block_[*to_predict_block.get()][j];
#ifdef PADDLE_EXECUTOR_MULTITHREAD
std::mutex m;
std::condition_variable cv;
std::queue<int> next;
next.push(0);
int rsize = ops.size();
std::vector<int> status(rsize, 0);
auto &threadPool = ThreadPool::getThreadPool();
auto &dep = depManager[0];
auto finishF = [&ops, &m, &cv, &next, &status, &rsize, &dep](int opi) {
std::lock_guard<std::mutex> lk(m);
rsize--;
status[opi] = 2;
for (int i : dep.getNext(opi)) {
bool ok = true;
for (int j : dep.getDeps(i)) {
if (status[j] != 2) {
ok = false;
break;
}
}
if (ok && (status[i] == 0)) {
next.push(i);
}
}
cv.notify_one();
};
for (;;) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&next, &rsize] { return rsize == 0 || !next.empty(); });
if (rsize == 0) {
break;
}
while (next.size() > 0) {
int opi = next.front();
next.pop();
status[opi] = 1;
threadPool.enqueue([opi, &ops, &finishF, &profile] {
auto &op = ops[opi];
#ifdef PADDLE_MOBILE_PROFILE
_profile[op->Type()] -= clock();
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
profile[opi].runBegin = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
profile[opi].tid = ThreadPool::getThreadPoolThreadId();
#endif
op->Run();
ops[opi]->Run();
#ifdef PADDLE_MOBILE_PROFILE
_profile[op->Type()] += clock();
clock_gettime(CLOCK_MONOTONIC, &ts);
profile[opi].runEnd = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
#endif
finishF(opi);
});
}
}
#else
for (int i = 0; i < ops.size(); i++) {
#ifdef PADDLE_MOBILE_PROFILE
{
std::cout << "====================[ profile ]======================\n";
using prof_t = std::pair<std::string, clock_t>;
std::vector<prof_t> _tprofile(_profile.begin(), _profile.end());
clock_t _ptotal = 0;
for (auto const &p : _tprofile) {
_ptotal += p.second;
}
auto compf = [](const prof_t &a, const prof_t &b) {
return a.second > b.second;
};
std::sort(_tprofile.begin(), _tprofile.end(), compf);
_tprofile.push_back(std::make_pair("total", _ptotal));
for (auto const &p : _tprofile) {
printf("%-16s\t%-10.0f\t%-.4f\n", p.first.c_str(), (float)p.second,
(float)p.second / _ptotal * 100.0);
}
std::cout << "====================[---------]======================\n";
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
profile[i].runBegin = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
#endif
ops[i]->Run();
#ifdef PADDLE_MOBILE_PROFILE
clock_gettime(CLOCK_MONOTONIC, &ts);
profile[i].runEnd = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec;
#endif
}
#endif
auto ops = ops_of_block_[*to_predict_program_->Block(0)];
auto last_op = ops.rbegin();
auto output_map = (*last_op)->Outputs();
std::vector<std::string> out_keys = (*last_op)->GetOutKeys();
PADDLE_MOBILE_ENFORCE(out_keys.size() > 0, "the last op contains no output");
framework::LoDTensor *output_tensor =
framework::GetVarValue<framework::LoDTensor>(out_keys[0], output_map,
*(program_.scope));
#ifdef PADDLE_MOBILE_PROFILE
#ifdef PADDLE_EXECUTOR_MULTITHREAD
// TODO expose profile info as an interface, user can get them to analysis
// the performance of their deepnet.
FILE *df = fopen("net.dot", "w");
fprintf(df, "digraph {\n");
for (int i = 0; i < ops.size(); i++) {
for (int j : dep.getNext(i)) {
fprintf(df, "op_%d -> op_%d\n", i, j);
}
}
for (int i = 0; i < ops.size(); i++) {
fprintf(df, "op_%d[label=\"%s (%d)\"]\n", i, ops[i]->Type().c_str(), i);
}
fprintf(df, "}\n");
fclose(df);
#endif
FILE *pf = fopen("profile.out", "w");
std::unordered_map<std::string, uint64_t> _tp;
for (int i = 0; i < profile.size(); i++) {
const auto &pInfo = profile[i];
uint64_t timeCost = pInfo.runEnd - pInfo.runBegin;
_tp[ops[i]->Type()] += timeCost;
fprintf(pf, "%d\t%s\t%d\t%llu\t%llu\t%llu\n", i, ops[i]->Type().c_str(),
pInfo.tid, pInfo.runBegin, pInfo.runEnd, timeCost);
}
fclose(pf);
printf("====================[ profile ]======================\n");
using prof_t = std::pair<std::string, uint64_t>;
std::vector<prof_t> _tv(_tp.begin(), _tp.end());
uint64_t _ptotal = 0;
for (auto const &p : _tv) {
_ptotal += p.second;
}
auto compf = [](const prof_t &a, const prof_t &b) {
return a.second > b.second;
};
std::sort(_tv.begin(), _tv.end(), compf);
_tv.push_back(std::make_pair("total", _ptotal));
for (auto const &p : _tv) {
printf("%-16s\t%-10.0f\t%-2.4f\n", p.first.c_str(), (float)p.second,
(float)p.second / _ptotal * 100.0);
}
printf("====================[---------]======================\n");
#endif
return std::shared_ptr<framework::Tensor>(output_tensor);
}
template <typename Dtype, Precision P>
......
......@@ -18,12 +18,17 @@ limitations under the License. */
#include <memory>
#include <string>
#include <vector>
#include "common/types.h"
#include "framework/lod_tensor.h"
#include "framework/operator.h"
#include "framework/program/program.h"
#include "framework/tensor.h"
#ifdef PADDLE_EXECUTOR_MULTITHREAD
#include <condition_variable>
#include <mutex>
#include <thread>
#include "common/depCore.h"
#endif
namespace paddle_mobile {
......@@ -92,6 +97,16 @@ class Executor {
std::vector<std::shared_ptr<framework::OperatorBase<Dtype>>>>
ops_of_block_;
bool use_optimize_ = false;
#ifdef PADDLE_EXECUTOR_MULTITHREAD
std::vector<depCore> depManager;
#endif
#ifdef PADDLE_MOBILE_PROFILE
struct ProfInfo {
int tid = 0;
uint64_t runBegin = 0UL;
uint64_t runEnd = 0UL;
};
#endif
};
} // namespace paddle_mobile
#!/usr/bin/env sh
cat <<EOF
<html>
<head>
<style>
html, body {
position: absolute;
width: 100%;
height: 100%;
margin: 0;
}
div.timeview {
width: 100%;
position: relative;
overflow: scroll;
}
ul {
position: absolute;
margin: 0;
list-style:none;
padding: 0;
margin: 0;
}
li {
height: 15px;
position: absolute;
background: blue;
}
li:nth-child(odd) {
background: blue;
}
li:nth-child(even) {
background: rebeccapurple;
}
ul.timeline {
z-index: -1;
}
ul.timeline li {
position: relative;
height: 15px;
width: 100%;
}
ul.timeline li:nth-child(odd) {
background: beige;
}
ul.timeline li:nth-child(even) {
background: antiquewhite;
}
</style>
</head>
<body>
<div class="timeview">
<ul>
EOF
min=$(awk 'NR==1{min=$4} NR>1{if($4 < min) min=$4} END{print min}' $1)
max=$(awk 'NR==1{max=$5} NR>1{if($5 > max) max=$5} END{print max}' $1)
sort $1 -k1,1n | awk -v max="$max" -v min="$min" '
BEGIN {
total = max - min
}
{
opid = $1
optype = $2
tid = $3
cb = $4
ce = $5
cl = $6
sum += $4 - $3
print "<li class=\"timeline\"" \
" data-opid=\"" opid "\"" \
" data-optype=\"" optype "\"" \
" data-tid=\"" tid "\"" \
" data-begin=\"" cb "\"" \
" data-end=\"" ce "\"" \
"></li>"
}
'
cat <<EOF
</ul>
</div>
<pre>
EOF
echo "==================[ profile ]==================="
cat $1 | awk '
NR>1{
optype = $2
sum += $5 - $4
count[$2] += $6
}
END {
for (t in count) {
msg = sprintf("%-16s\t%-10d\t%-.4f", t, count[t], count[t]*100 / sum);
print msg
}
}' | sort -k2,2nr
cat $1 | awk '
NR>1{
sum += $5 - $4
}
END {
msg = sprintf("%-16s\t%-10d\t%-.4f", "total", sum, 100);
print msg
}'
cat <<EOF
</pre>
<script>
const min= $min;
const max= $max;
const px_per_nanosecond = 1/1000000;
const scale = px_per_nanosecond;
const li = document.querySelectorAll('li');
const thread = new Set();
for (let i = 0; i < li.length; i++) {
const prof = li[i].dataset;
li[i].style.width = (prof.end - prof.begin)*scale + 'px';
li[i].style.left = (prof.begin - min)*scale + 'px';
li[i].style.top = prof.tid * 15 + 'px';
thread.add(prof.tid);
}
const ul = document.createElement('ul');
ul.classList.add('timeline');
ul.style.width = (max - min)*scale + 'px';
thread.forEach(i => {
const l = document.createElement('li');
ul.appendChild(l);
});
const timeview = document.querySelector('.timeview');
timeview.appendChild(ul);
timeview.style.height = thread.size * 15 + 'px';
</script>
</body>
</html>
EOF
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册