From 251e4a8ee5cb560f9c35c90126b57e832f6d660b Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Mon, 23 Apr 2018 15:21:27 +0800 Subject: [PATCH] unify fluid blocking queue --- paddle/fluid/framework/blocking_queue.h | 74 +++++++++++++++++++ .../details/threaded_ssa_graph_executor.h | 41 +--------- paddle/fluid/operators/detail/grpc_client.h | 2 +- paddle/fluid/operators/detail/grpc_server.cc | 4 +- paddle/fluid/operators/detail/grpc_server.h | 6 +- .../operators/detail/simple_block_queue.h | 52 ------------- 6 files changed, 81 insertions(+), 98 deletions(-) create mode 100644 paddle/fluid/framework/blocking_queue.h delete mode 100644 paddle/fluid/operators/detail/simple_block_queue.h diff --git a/paddle/fluid/framework/blocking_queue.h b/paddle/fluid/framework/blocking_queue.h new file mode 100644 index 00000000000..a19558c0ae5 --- /dev/null +++ b/paddle/fluid/framework/blocking_queue.h @@ -0,0 +1,74 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include // NOLINT +#include +#include // NOLINT +#include + +namespace paddle { +namespace framework { + +template +class BlockingQueue { + public: + void Push(const T &item) { + { + std::lock_guard g(mutex_); + q_.emplace_back(item); + } + cv_.notify_one(); + } + + template + void Extend(const U &items) { + { + std::lock_guard g(mutex_); + for (auto &item : items) { + q_.emplace_back(item); + } + } + cv_.notify_all(); + } + + std::deque PopAll(size_t ms, bool *timeout) { + auto time = + std::chrono::system_clock::now() + std::chrono::milliseconds(ms); + std::unique_lock lock(mutex_); + *timeout = !cv_.wait_until(lock, time, [this] { return !q_.empty(); }); + std::deque ret; + if (!*timeout) { + std::swap(ret, q_); + } + return ret; + } + + T Pop() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [=] { return !q_.empty(); }); + T rc(std::move(q_.front())); + q_.pop_front(); + return rc; + } + + private: + std::mutex mutex_; + std::condition_variable cv_; + std::deque q_; +}; + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index d70bbd4ef0e..d089b79d913 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -22,6 +22,7 @@ #include #include "ThreadPool.h" // ThreadPool in thrird party +#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h" namespace paddle { @@ -30,46 +31,6 @@ class Scope; namespace details { -template -class BlockingQueue { - public: - void Push(const T &item) { - { - std::lock_guard g(mutex_); - q_.emplace_back(item); - } - cv_.notify_one(); - } - - template - void Extend(const U &items) { - { - std::lock_guard g(mutex_); - for (auto &item : items) { - q_.emplace_back(item); - } - } - cv_.notify_all(); - } - - std::deque PopAll(size_t ms, bool *timeout) { - auto time = - std::chrono::system_clock::now() + std::chrono::milliseconds(ms); - std::unique_lock lock(mutex_); - *timeout = !cv_.wait_until(lock, time, [this] { return !q_.empty(); }); - std::deque ret; - if (!*timeout) { - std::swap(ret, q_); - } - return ret; - } - - private: - std::mutex mutex_; - std::condition_variable cv_; - std::deque q_; -}; - class ThreadedSSAGraphExecutor : public SSAGraphExecutor { public: ThreadedSSAGraphExecutor(size_t num_threads, bool use_event, diff --git a/paddle/fluid/operators/detail/grpc_client.h b/paddle/fluid/operators/detail/grpc_client.h index 4425b19328f..f6229b71bc0 100644 --- a/paddle/fluid/operators/detail/grpc_client.h +++ b/paddle/fluid/operators/detail/grpc_client.h @@ -29,12 +29,12 @@ limitations under the License. */ #include "grpc++/support/byte_buffer.h" #include "grpc++/support/slice.h" #include "grpc/support/log.h" +#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h" -#include "paddle/fluid/operators/detail/simple_block_queue.h" namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 119e146e078..8cee46cbb2d 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -90,7 +90,7 @@ class RequestGet final : public RequestBase { ::grpc::ServerCompletionQueue* cq, framework::Scope* scope, const platform::DeviceContext* dev_ctx, - SimpleBlockQueue* queue) + framework::BlockingQueue* queue) : RequestBase(service, cq, dev_ctx), responder_(&ctx_), scope_(scope), @@ -128,7 +128,7 @@ class RequestGet final : public RequestBase { sendrecv::VariableMessage request_; ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_; framework::Scope* scope_; - SimpleBlockQueue* queue_; + framework::BlockingQueue* queue_; }; class RequestPrefetch final : public RequestBase { diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 452ff5e967c..a15c93b7830 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -19,6 +19,7 @@ limitations under the License. */ #include #include "grpc++/grpc++.h" +#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/program_desc.h" @@ -29,7 +30,6 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h" -#include "paddle/fluid/operators/detail/simple_block_queue.h" namespace paddle { namespace operators { @@ -37,7 +37,7 @@ namespace detail { typedef std::pair> ReceivedMessage; -typedef SimpleBlockQueue ReceivedQueue; +typedef framework::BlockingQueue ReceivedQueue; typedef std::pair MessageWithName; class RequestBase; @@ -99,7 +99,7 @@ class AsyncGRPCServer final { const platform::DeviceContext *dev_ctx_; // received variable from RPC, operators fetch variable from this queue. - SimpleBlockQueue var_get_queue_; + framework::BlockingQueue var_get_queue_; // client send variable to this queue. ReceivedQueue var_recv_queue_; diff --git a/paddle/fluid/operators/detail/simple_block_queue.h b/paddle/fluid/operators/detail/simple_block_queue.h deleted file mode 100644 index 69773e05df7..00000000000 --- a/paddle/fluid/operators/detail/simple_block_queue.h +++ /dev/null @@ -1,52 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include // NOLINT -#include -#include // NOLINT - -namespace paddle { -namespace operators { -namespace detail { - -template -class SimpleBlockQueue { - private: - std::mutex mutex_; - std::condition_variable condition_; - std::deque queue_; - - public: - void Push(T const& value) { - { - std::unique_lock lock(this->mutex_); - queue_.push_front(value); - } - this->condition_.notify_one(); - } - - T Pop() { - std::unique_lock lock(this->mutex_); - this->condition_.wait(lock, [=] { return !this->queue_.empty(); }); - T rc(std::move(this->queue_.back())); - this->queue_.pop_back(); - return rc; - } -}; - -} // namespace detail -} // namespace operators -} // namespace paddle -- GitLab