diff --git a/src/common/depCore.h b/src/common/depCore.h new file mode 100644 index 0000000000000000000000000000000000000000..3d7948cc835e9c4f1ac4670c6bd600028f2f48c1 --- /dev/null +++ b/src/common/depCore.h @@ -0,0 +1,68 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#ifdef PADDLE_EXECUTOR_MULTITHREAD +#include +#include +#include +#include "framework/operator.h" + +namespace paddle_mobile { + +class depCore { + public: + template + void analysisDep( + const std::vector>>& ops) { + std::unordered_map vars; + size_t nop = ops.size(); + for (size_t i = 0; i < nop; i++) { + const auto& op = ops[i]; + for (const auto& kv : op->Outputs()) { + for (const auto& v : kv.second) { + vars[v] = i; + } + } + } + deps.resize(nop); + next.resize(nop); + for (size_t i = 0; i < nop; i++) { + const auto& op = ops[i]; + for (const auto& kv : op->Inputs()) { + for (const auto& v : kv.second) { + if (vars.find(v) == vars.end()) { + continue; + } + int di = vars[v]; + if (di == i) { + continue; + } + if (std::find(deps[i].begin(), deps[i].end(), di) != deps[i].end()) { + continue; + } + deps[i].push_back(di); + next[di].push_back(i); + } + } + } + } + const std::vector& getNext(int i) { return next[i]; } + const std::vector& getDeps(int i) { return deps[i]; } + std::vector> deps; + std::vector> next; +}; +} // namespace paddle_mobile + +#endif diff --git a/src/common/threadpool.h b/src/common/threadpool.h new file mode 100644 index 0000000000000000000000000000000000000000..bf7894dd94a20f4f51df23c6355d26d6da3af01d --- /dev/null +++ b/src/common/threadpool.h @@ -0,0 +1,126 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace paddle_mobile { +class ThreadPool { + public: + static ThreadPool& getThreadPool(); + static int getThreadPoolThreadId(); + explicit ThreadPool(size_t); + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type>; + ~ThreadPool(); + int getTid(const std::thread::id& id) { + for (int i = 0; i < workers.size(); i++) { + if (workers[i].get_id() == id) { + return i; + } + } + return -1; + } + + private: + // need to keep track of threads so we can join them + std::vector workers; + // the task queue + std::queue> tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; + +// the constructor just launches some amount of workers +inline ThreadPool::ThreadPool(size_t threads) : stop(false) { + for (size_t i = 0; i < threads; ++i) + workers.emplace_back([this] { + for (;;) { + std::function task; + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait( + lock, [this] { return this->stop || !this->tasks.empty(); }); + // for (;;) { + // if (this->stop || !this->tasks.empty()) { + // break; + // } + // lock.unlock(); + // lock.lock(); + // } + if (this->stop && this->tasks.empty()) return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + + task(); + } + }); +} + +// add new work item to the pool +template +auto ThreadPool::enqueue(F&& f, Args&&... args) + -> std::future::type> { + using return_type = typename std::result_of::type; + + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...)); + + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); + + // don't allow enqueueing after stopping the pool + // if(stop) + // throw std::runtime_error("enqueue on stopped ThreadPool"); + + tasks.emplace([task]() { (*task)(); }); + } + condition.notify_one(); + return res; +} + +// the destructor joins all threads +inline ThreadPool::~ThreadPool() { + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread& worker : workers) worker.join(); +} + +ThreadPool& ThreadPool::getThreadPool() { + static ThreadPool threadPool(3); + return threadPool; +} + +int ThreadPool::getThreadPoolThreadId() { + return getThreadPool().getTid(std::this_thread::get_id()); +} +} // namespace paddle_mobile diff --git a/src/io/io.cpp b/src/io/io.cpp index 6317afc8c5368daeb6b219b64ddc9f0157ed7539..e5ca89f9f6fd495c6adaba9f982f0576a2a1af30 100644 --- a/src/io/io.cpp +++ b/src/io/io.cpp @@ -12,15 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "io.h" +#include "io/io.h" #include -#define PADDLE_MOBILE_PROFILE -#ifdef PADDLE_MOBILE_PROFILE -#include -#include -#include -#endif - #include "common/enforce.h" #include "common/log.h" #include "framework/framework.pb-c.h" @@ -31,6 +24,12 @@ limitations under the License. */ #include "framework/program/var_desc.h" #include "framework/scope.h" #include "framework/tensor.h" +#ifdef PADDLE_EXECUTOR_MULTITHREAD +#include +#include +#include +#include "common/threadpool.h" +#endif namespace paddle_mobile { using framework::Variable; @@ -142,8 +141,6 @@ const framework::Program Loader::LoadProgram( } } - // originProgramDesc->Description("program: "); - if (optimize) { framework::ProgramOptimize program_optimize; program.optimizeProgram = @@ -164,7 +161,6 @@ template class Loader; template class Loader; #pragma mark - executor - template Executor::Executor(const framework::Program p, int batch_size, bool use_optimize) @@ -178,6 +174,9 @@ Executor::Executor(const framework::Program p, int batch_size, variable_ptr[0].SetValue(batch_size); const std::vector> blocks = to_predict_program_->Blocks(); +#ifdef PADDLE_EXECUTOR_MULTITHREAD + depManager.resize(blocks.size()); +#endif for (int i = 0; i < blocks.size(); ++i) { std::shared_ptr block_desc = blocks[i]; std::vector> ops = block_desc->Ops(); @@ -188,8 +187,10 @@ Executor::Executor(const framework::Program p, int batch_size, op->Type(), op->GetInputs(), op->GetOutputs(), op->GetAttrMap(), program_.scope); op_base->InferShape(); - ops_of_block_[*block_desc.get()].push_back(op_base); +#ifdef PADDLE_EXECUTOR_MULTITHREAD + depManager[i].analysisDep(ops_of_block_[*block_desc.get()]); +#endif } } if (program_.is_commbine) { @@ -350,48 +351,132 @@ std::shared_ptr Executor::Predict( feed_tensor->ShareDataWith(t); std::shared_ptr to_predict_block = to_predict_program_->Block(0); + auto &ops = ops_of_block_[*to_predict_block.get()]; #ifdef PADDLE_MOBILE_PROFILE - std::unordered_map _profile; + std::vector profile(ops.size()); #endif - for (int j = 0; j < ops_of_block_[*to_predict_block.get()].size(); ++j) { - auto op = ops_of_block_[*to_predict_block.get()][j]; +#ifdef PADDLE_EXECUTOR_MULTITHREAD + std::mutex m; + std::condition_variable cv; + std::queue next; + next.push(0); + int rsize = ops.size(); + std::vector status(rsize, 0); + auto &threadPool = ThreadPool::getThreadPool(); + auto &dep = depManager[0]; + auto finishF = [&ops, &m, &cv, &next, &status, &rsize, &dep](int opi) { + std::lock_guard lk(m); + rsize--; + status[opi] = 2; + for (int i : dep.getNext(opi)) { + bool ok = true; + for (int j : dep.getDeps(i)) { + if (status[j] != 2) { + ok = false; + break; + } + } + if (ok && (status[i] == 0)) { + next.push(i); + } + } + cv.notify_one(); + }; + for (;;) { + std::unique_lock lk(m); + cv.wait(lk, [&next, &rsize] { return rsize == 0 || !next.empty(); }); + if (rsize == 0) { + break; + } + while (next.size() > 0) { + int opi = next.front(); + next.pop(); + status[opi] = 1; + threadPool.enqueue([opi, &ops, &finishF, &profile] { + auto &op = ops[opi]; #ifdef PADDLE_MOBILE_PROFILE - _profile[op->Type()] -= clock(); + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + profile[opi].runBegin = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec; + profile[opi].tid = ThreadPool::getThreadPoolThreadId(); #endif - op->Run(); + ops[opi]->Run(); #ifdef PADDLE_MOBILE_PROFILE - _profile[op->Type()] += clock(); + clock_gettime(CLOCK_MONOTONIC, &ts); + profile[opi].runEnd = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec; #endif + finishF(opi); + }); + } } +#else + for (int i = 0; i < ops.size(); i++) { #ifdef PADDLE_MOBILE_PROFILE - { - std::cout << "====================[ profile ]======================\n"; - using prof_t = std::pair; - std::vector _tprofile(_profile.begin(), _profile.end()); - clock_t _ptotal = 0; - for (auto const &p : _tprofile) { - _ptotal += p.second; - } - auto compf = [](const prof_t &a, const prof_t &b) { - return a.second > b.second; - }; - std::sort(_tprofile.begin(), _tprofile.end(), compf); - _tprofile.push_back(std::make_pair("total", _ptotal)); - for (auto const &p : _tprofile) { - printf("%-16s\t%-10.0f\t%-.4f\n", p.first.c_str(), (float)p.second, - (float)p.second / _ptotal * 100.0); - } - std::cout << "====================[---------]======================\n"; + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + profile[i].runBegin = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec; +#endif + ops[i]->Run(); +#ifdef PADDLE_MOBILE_PROFILE + clock_gettime(CLOCK_MONOTONIC, &ts); + profile[i].runEnd = (uint64_t)ts.tv_sec * 1e9 + ts.tv_nsec; +#endif } #endif - auto ops = ops_of_block_[*to_predict_program_->Block(0)]; auto last_op = ops.rbegin(); + auto output_map = (*last_op)->Outputs(); std::vector out_keys = (*last_op)->GetOutKeys(); PADDLE_MOBILE_ENFORCE(out_keys.size() > 0, "the last op contains no output"); framework::LoDTensor *output_tensor = framework::GetVarValue(out_keys[0], output_map, *(program_.scope)); +#ifdef PADDLE_MOBILE_PROFILE +#ifdef PADDLE_EXECUTOR_MULTITHREAD + // TODO expose profile info as an interface, user can get them to analysis + // the performance of their deepnet. + FILE *df = fopen("net.dot", "w"); + fprintf(df, "digraph {\n"); + for (int i = 0; i < ops.size(); i++) { + for (int j : dep.getNext(i)) { + fprintf(df, "op_%d -> op_%d\n", i, j); + } + } + for (int i = 0; i < ops.size(); i++) { + fprintf(df, "op_%d[label=\"%s (%d)\"]\n", i, ops[i]->Type().c_str(), i); + } + fprintf(df, "}\n"); + fclose(df); +#endif + FILE *pf = fopen("profile.out", "w"); + std::unordered_map _tp; + for (int i = 0; i < profile.size(); i++) { + const auto &pInfo = profile[i]; + uint64_t timeCost = pInfo.runEnd - pInfo.runBegin; + _tp[ops[i]->Type()] += timeCost; + fprintf(pf, "%d\t%s\t%d\t%llu\t%llu\t%llu\n", i, ops[i]->Type().c_str(), + pInfo.tid, pInfo.runBegin, pInfo.runEnd, timeCost); + } + fclose(pf); + printf("====================[ profile ]======================\n"); + using prof_t = std::pair; + std::vector _tv(_tp.begin(), _tp.end()); + uint64_t _ptotal = 0; + for (auto const &p : _tv) { + _ptotal += p.second; + } + auto compf = [](const prof_t &a, const prof_t &b) { + return a.second > b.second; + }; + std::sort(_tv.begin(), _tv.end(), compf); + _tv.push_back(std::make_pair("total", _ptotal)); + for (auto const &p : _tv) { + printf("%-16s\t%-10.0f\t%-2.4f\n", p.first.c_str(), (float)p.second, + (float)p.second / _ptotal * 100.0); + } + printf("====================[---------]======================\n"); +#endif + return std::shared_ptr(output_tensor); } template diff --git a/src/io/io.h b/src/io/io.h index a1fbf158c2b026336d363db512cb44fe58ee93db..ff520dd628406eae47f76196dbe66a0992dfe735 100644 --- a/src/io/io.h +++ b/src/io/io.h @@ -18,12 +18,17 @@ limitations under the License. */ #include #include #include - #include "common/types.h" #include "framework/lod_tensor.h" #include "framework/operator.h" #include "framework/program/program.h" #include "framework/tensor.h" +#ifdef PADDLE_EXECUTOR_MULTITHREAD +#include +#include +#include +#include "common/depCore.h" +#endif namespace paddle_mobile { @@ -92,6 +97,16 @@ class Executor { std::vector>>> ops_of_block_; bool use_optimize_ = false; +#ifdef PADDLE_EXECUTOR_MULTITHREAD + std::vector depManager; +#endif +#ifdef PADDLE_MOBILE_PROFILE + struct ProfInfo { + int tid = 0; + uint64_t runBegin = 0UL; + uint64_t runEnd = 0UL; + }; +#endif }; } // namespace paddle_mobile diff --git a/tools/profile_show.sh b/tools/profile_show.sh new file mode 100644 index 0000000000000000000000000000000000000000..9dae20faf343de68a7815988c4d1046d0438094b --- /dev/null +++ b/tools/profile_show.sh @@ -0,0 +1,139 @@ +#!/usr/bin/env sh +cat < + + + + +
+
    +EOF + +min=$(awk 'NR==1{min=$4} NR>1{if($4 < min) min=$4} END{print min}' $1) +max=$(awk 'NR==1{max=$5} NR>1{if($5 > max) max=$5} END{print max}' $1) +sort $1 -k1,1n | awk -v max="$max" -v min="$min" ' +BEGIN { + total = max - min +} +{ + opid = $1 + optype = $2 + tid = $3 + cb = $4 + ce = $5 + cl = $6 + sum += $4 - $3 + print "
  • " +} +' + +cat < +
+
+EOF
+
+echo "==================[ profile ]==================="
+cat $1 | awk '
+NR>1{
+    optype = $2
+    sum += $5 - $4
+    count[$2] += $6
+}
+END {
+for (t in count) {
+    msg = sprintf("%-16s\t%-10d\t%-.4f", t, count[t], count[t]*100 / sum);
+    print msg
+}
+}' | sort -k2,2nr
+cat $1 | awk '
+NR>1{
+    sum += $5 - $4
+}
+END {
+msg = sprintf("%-16s\t%-10d\t%-.4f", "total", sum, 100);
+print msg
+}'
+
+cat <
+
+
+
+EOF
+