未验证 提交 8908cba4 编写于 作者: C Cai Yudong 提交者: GitHub

optimize snapshot (#2435)

* optimize ResourceHolder
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* optimize OperationExecutor
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* roll back ResourceHolders
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* roll back BaseHolders
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 098f5a53
......@@ -43,40 +43,32 @@ OperationExecutor::Submit(OperationsPtr operation, bool sync) {
void
OperationExecutor::Start() {
if (executor_)
return;
auto queue = std::make_shared<OperationQueueT>();
auto t = std::make_shared<std::thread>(&OperationExecutor::ThreadMain, this, queue);
executor_ = std::make_shared<Executor>(t, queue);
stopped_ = false;
thread_ = std::thread(&OperationExecutor::ThreadMain, this);
running_ = true;
/* std::cout << "OperationExecutor Started" << std::endl; */
}
void
OperationExecutor::Stop() {
if (stopped_ || !executor_)
if (!running_)
return;
executor_->execute_queue->Put(nullptr);
executor_->execute_thread->join();
stopped_ = true;
executor_ = nullptr;
Enqueue(nullptr);
thread_.join();
running_ = false;
std::cout << "OperationExecutor Stopped" << std::endl;
}
void
OperationExecutor::Enqueue(OperationsPtr operation) {
/* std::cout << std::this_thread::get_id() << " Enqueue Operation " << operation->GetID() << std::endl; */
executor_->execute_queue->Put(operation);
queue_.Put(operation);
}
void
OperationExecutor::ThreadMain(OperationQueuePtr queue) {
if (!queue)
return;
OperationExecutor::ThreadMain() {
while (true) {
OperationsPtr operation = queue->Take();
OperationsPtr operation = queue_.Take();
if (!operation) {
std::cout << "Stopping operation executor thread " << std::this_thread::get_id() << std::endl;
break;
......
......@@ -22,17 +22,8 @@ namespace engine {
namespace snapshot {
using ThreadPtr = std::shared_ptr<std::thread>;
using OperationQueueT = server::BlockingQueue<OperationsPtr>;
using OperationQueuePtr = std::shared_ptr<OperationQueueT>;
struct Executor {
Executor(ThreadPtr t, OperationQueuePtr q) : execute_thread(t), execute_queue(q) {
}
ThreadPtr execute_thread;
OperationQueuePtr execute_queue;
};
using ExecutorPtr = std::shared_ptr<Executor>;
using OperationQueue = server::BlockingQueue<OperationsPtr>;
using OperationQueuePtr = std::shared_ptr<OperationQueue>;
class OperationExecutor {
public:
......@@ -58,14 +49,16 @@ class OperationExecutor {
OperationExecutor();
void
ThreadMain(OperationQueuePtr queue);
ThreadMain();
void
Enqueue(OperationsPtr operation);
protected:
mutable std::mutex mtx_;
bool stopped_ = false;
ExecutorPtr executor_;
bool running_ = false;
std::thread thread_;
OperationQueue queue_;
};
} // namespace snapshot
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册