From 018b32bca2762af5f2d48d97491a5ad56a45ff6b Mon Sep 17 00:00:00 2001 From: groot Date: Mon, 15 Apr 2019 15:58:28 +0800 Subject: [PATCH] add block queue class Former-commit-id: b5fabddb317de5e99462124757e8c1e4466bb184 --- cpp/conf/server_config.yaml | 9 ++- cpp/src/utils/BlockingQueue.h | 54 ++++++++++++++++++ cpp/src/utils/BlockingQueue.inl | 99 +++++++++++++++++++++++++++++++++ cpp/src/utils/Error.h | 23 ++++++++ 4 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 cpp/src/utils/BlockingQueue.h create mode 100644 cpp/src/utils/BlockingQueue.inl diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index d2b78b58..9b56ac7e 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -1,6 +1,8 @@ server_config: address: 127.0.0.1 - port: 21001 + port: 33001 + transfer_protocol: json #optional: binary, compact, json, simple_json, debug + server_mode: thread_pool #optional: simple, non_blocking, hsha, thread_pool, thread_selector log_config: global: @@ -28,4 +30,7 @@ log_config: filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-error.log" fatal: enabled: false - filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-fatal.log" \ No newline at end of file + filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-fatal.log" + +cache_config: + cache_capacity: 16 # unit: GB \ No newline at end of file diff --git a/cpp/src/utils/BlockingQueue.h b/cpp/src/utils/BlockingQueue.h new file mode 100644 index 00000000..cd94997f --- /dev/null +++ b/cpp/src/utils/BlockingQueue.h @@ -0,0 +1,54 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include +#include +#include + +namespace zilliz { +namespace vecwise { +namespace server { + +template +class BlockingQueue { +public: + BlockingQueue() : mtx(), full_(), empty_() {} + + BlockingQueue(const BlockingQueue &rhs) = delete; + + BlockingQueue &operator=(const BlockingQueue &rhs) = delete; + + void Put(const T &task); + + T Take(); + + T Front(); + + T Back(); + + size_t Size(); + + bool Empty(); + + void SetCapacity(const size_t capacity); + +private: + mutable std::mutex mtx; + std::condition_variable full_; + std::condition_variable empty_; + std::queue queue_; + size_t capacity_ = 32; +}; + +} +} +} + + +#include "./BlockingQueue.inl" diff --git a/cpp/src/utils/BlockingQueue.inl b/cpp/src/utils/BlockingQueue.inl new file mode 100644 index 00000000..f7677ae6 --- /dev/null +++ b/cpp/src/utils/BlockingQueue.inl @@ -0,0 +1,99 @@ +#pragma once + +#include "Log.h" +#include "Error.h" + +namespace zilliz { +namespace vecwise { +namespace server { + +template +void +BlockingQueue::Put(const T &task) { + std::unique_lock lock(mtx); + full_.wait(lock, [this] { return (queue_.size() < capacity_); }); + + if (queue_.size() >= capacity_) { + std::string error_msg = + "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + + std::to_string(queue_.size()); + SERVER_LOG_ERROR << error_msg; + throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + queue_.push(task); + empty_.notify_all(); +} + +template +T +BlockingQueue::Take() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + T front(queue_.front()); + queue_.pop(); + full_.notify_all(); + return front; +} + +template +size_t +BlockingQueue::Size() { + std::lock_guard lock(mtx); + return queue_.size(); +} + +template +T +BlockingQueue::Front() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + T front(queue_.front()); + return front; +} + +template +T +BlockingQueue::Back() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + T back(queue_.back()); + return back; +} + +template +bool +BlockingQueue::Empty() { + std::unique_lock lock(mtx); + return queue_.empty(); +} + +template +void +BlockingQueue::SetCapacity(const size_t capacity) { + capacity_ = (capacity > 0 ? capacity : capacity_); +} + +} +} +} + diff --git a/cpp/src/utils/Error.h b/cpp/src/utils/Error.h index 6f3de0ad..83da0cea 100644 --- a/cpp/src/utils/Error.h +++ b/cpp/src/utils/Error.h @@ -6,6 +6,8 @@ #pragma once #include +#include +#include namespace zilliz { namespace vecwise { @@ -28,6 +30,27 @@ constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(0x003); constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004); constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005); constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006); +constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007); + +class ServerException : public std::exception { +public: + ServerException(ServerError error_code, + const std::string &message = std::string()) + : error_code_(error_code), message_(message) {} + +public: + ServerError error_code() const { + return error_code_; + } + + virtual const char *what() const noexcept { + return message_.c_str(); + } + +private: + ServerError error_code_; + std::string message_; +}; } // namespace server } // namespace vecwise -- GitLab