提交 7da1ea07 编写于 作者: Y Yu Yang

Use PopAll

上级 f2d29be7
...@@ -124,16 +124,26 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -124,16 +124,26 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
run_all_ready_ops(); run_all_ready_ops();
// 2. Find ready variable // 2. Find ready variable
VarHandleBase *ready_var = ready_vars.Pop(); bool timeout;
auto cur_ready_vars = ready_vars.PopAll(100, &timeout);
if (timeout) {
if (exception_) {
throw * exception_;
} else {
continue;
}
}
// 3. Remove the dependency of ready_var. // 3. Remove the dependency of ready_var.
// Find the ready_ops after the ready_var. // Find the ready_ops after the ready_var.
pending_vars.erase(ready_var); for (auto ready_var : cur_ready_vars) {
for (auto *op : ready_var->pending_ops_) { pending_vars.erase(ready_var);
auto &deps = pending_ops[op]; for (auto *op : ready_var->pending_ops_) {
--deps; auto &deps = pending_ops[op];
if (deps == 0) { --deps;
ready_ops.insert(op); if (deps == 0) {
ready_ops.insert(op);
}
} }
} }
// Keep loop until all vars are ready. // Keep loop until all vars are ready.
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#pragma once #pragma once
#include <chrono>
#include <functional> #include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party #include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/details/ssa_graph_executor.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h"
...@@ -27,10 +28,10 @@ namespace details { ...@@ -27,10 +28,10 @@ namespace details {
template <typename T> template <typename T>
class BlockingQueue { class BlockingQueue {
public: public:
void Push(const T &v) { void Push(const T &item) {
{ {
std::lock_guard<std::mutex> g(mutex_); std::lock_guard<std::mutex> g(mutex_);
q_.emplace_back(v); q_.emplace_back(item);
} }
cv_.notify_one(); cv_.notify_one();
} }
...@@ -56,6 +57,18 @@ class BlockingQueue { ...@@ -56,6 +57,18 @@ class BlockingQueue {
return v; return v;
} }
std::deque<T> PopAll(size_t ms, bool *timeout) {
auto time =
std::chrono::system_clock::now() + std::chrono::milliseconds(ms);
std::unique_lock<std::mutex> lock(mutex_);
*timeout = !cv_.wait_until(lock, time, [this] { return !q_.empty(); });
std::deque<T> ret;
if (!*timeout) {
std::swap(ret, q_);
}
return ret;
}
private: private:
std::mutex mutex_; std::mutex mutex_;
std::condition_variable cv_; std::condition_variable cv_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册