提交 135282c4 编写于 作者: G groot 提交者: xj.lin

add block queue class


Former-commit-id: e5ac4fa08104fac608992ca1f6acc6d17e3c7505
上级 f92df0d0
server_config: server_config:
address: 127.0.0.1 address: 127.0.0.1
port: 21001 port: 33001
transfer_protocol: json #optional: binary, compact, json, simple_json, debug
server_mode: thread_pool #optional: simple, non_blocking, hsha, thread_pool, thread_selector
log_config: log_config:
global: global:
...@@ -28,4 +30,7 @@ log_config: ...@@ -28,4 +30,7 @@ log_config:
filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-error.log" filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-error.log"
fatal: fatal:
enabled: false enabled: false
filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-fatal.log" filename: "/tmp/logs/vecwise/vecwise_engine-%datetime{%h:%m}-fatal.log"
\ No newline at end of file
cache_config:
cache_capacity: 16 # unit: GB
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <assert.h>
#include <condition_variable>
#include <iostream>
#include <queue>
#include <vector>
namespace zilliz {
namespace vecwise {
namespace server {
template<typename T>
class BlockingQueue {
public:
BlockingQueue() : mtx(), full_(), empty_() {}
BlockingQueue(const BlockingQueue &rhs) = delete;
BlockingQueue &operator=(const BlockingQueue &rhs) = delete;
void Put(const T &task);
T Take();
T Front();
T Back();
size_t Size();
bool Empty();
void SetCapacity(const size_t capacity);
private:
mutable std::mutex mtx;
std::condition_variable full_;
std::condition_variable empty_;
std::queue<T> queue_;
size_t capacity_ = 32;
};
}
}
}
#include "./BlockingQueue.inl"
#pragma once
#include "Log.h"
#include "Error.h"
namespace zilliz {
namespace vecwise {
namespace server {
template<typename T>
void
BlockingQueue<T>::Put(const T &task) {
std::unique_lock <std::mutex> lock(mtx);
full_.wait(lock, [this] { return (queue_.size() < capacity_); });
if (queue_.size() >= capacity_) {
std::string error_msg =
"blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " +
std::to_string(queue_.size());
SERVER_LOG_ERROR << error_msg;
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
queue_.push(task);
empty_.notify_all();
}
template<typename T>
T
BlockingQueue<T>::Take() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
std::string error_msg = "blocking queue empty";
SERVER_LOG_ERROR << error_msg;
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
T front(queue_.front());
queue_.pop();
full_.notify_all();
return front;
}
template<typename T>
size_t
BlockingQueue<T>::Size() {
std::lock_guard <std::mutex> lock(mtx);
return queue_.size();
}
template<typename T>
T
BlockingQueue<T>::Front() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
std::string error_msg = "blocking queue empty";
SERVER_LOG_ERROR << error_msg;
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
T front(queue_.front());
return front;
}
template<typename T>
T
BlockingQueue<T>::Back() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
std::string error_msg = "blocking queue empty";
SERVER_LOG_ERROR << error_msg;
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
T back(queue_.back());
return back;
}
template<typename T>
bool
BlockingQueue<T>::Empty() {
std::unique_lock <std::mutex> lock(mtx);
return queue_.empty();
}
template<typename T>
void
BlockingQueue<T>::SetCapacity(const size_t capacity) {
capacity_ = (capacity > 0 ? capacity : capacity_);
}
}
}
}
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
#include <exception>
#include <string>
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
...@@ -28,6 +30,27 @@ constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(0x003); ...@@ -28,6 +30,27 @@ constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(0x003);
constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004); constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004);
constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005); constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005);
constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006); constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006);
constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007);
class ServerException : public std::exception {
public:
ServerException(ServerError error_code,
const std::string &message = std::string())
: error_code_(error_code), message_(message) {}
public:
ServerError error_code() const {
return error_code_;
}
virtual const char *what() const noexcept {
return message_.c_str();
}
private:
ServerError error_code_;
std::string message_;
};
} // namespace server } // namespace server
} // namespace vecwise } // namespace vecwise
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册