From 09e43b0b6f06cb9de733dbb6b80f6c9458f19124 Mon Sep 17 00:00:00 2001 From: shengjh <1572099106@qq.com> Date: Thu, 24 Sep 2020 14:38:35 +0800 Subject: [PATCH] Fix resource dead lock Signed-off-by: shengjh <1572099106@qq.com> --- proxy/src/message_client/ClientV2.cpp | 31 ++++++++++++++++++++------- proxy/src/server/MessageWrapper.cpp | 15 +++++++++++++ proxy/src/server/MessageWrapper.h | 4 ++++ proxy/src/server/MetaWrapper.cpp | 6 +++++- proxy/src/server/MetaWrapper.h | 3 +++ proxy/src/server/Server.cpp | 9 ++------ 6 files changed, 52 insertions(+), 16 deletions(-) diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index edcc7fb9c..d8769bd3d 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "log/Log.h" namespace milvus::message_client { @@ -161,6 +162,8 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, const std::function &segment_id) { + using stdclock = std::chrono::high_resolution_clock; + auto start = stdclock::now(); // may have retry policy? auto row_count = request.rows_data_size(); auto topic_num = config.pulsar.topicnum(); @@ -184,7 +187,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); - auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){ + auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) { msg_sended += 1; if (result != pulsar::ResultOk) { stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); @@ -197,9 +200,15 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, stats[this_thread] = Status(DB_ERROR, e.what()); } } - while (msg_sended < row_count){ + while (msg_sended < row_count) { } + auto end = stdclock::now(); + auto data_size = request.ByteSize(); + LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " + << "throughput: " << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 / 1024 + << "M/s"; + for (auto &stat : stats) { if (!stat.ok()) { return stat; @@ -213,6 +222,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, const std::function &segment_id) { + using stdclock = std::chrono::high_resolution_clock; + auto start = stdclock::now(); + auto row_count = request.id_array_size(); auto topicnum = config.pulsar.topicnum(); auto stats = std::vector(topicnum); @@ -226,12 +238,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, mut_msg.set_uid(request.id_array(i)); mut_msg.set_collection_name(request.collection_name()); mut_msg.set_timestamp(timestamp); - uint64_t uid = request.id_array(i); - auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topicnum; - mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); int this_thread = omp_get_thread_num(); - auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){ + auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) { msg_sended += 1; if (result != pulsar::ResultOk) { stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); @@ -239,9 +248,15 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, }; paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback); } - while (msg_sended < row_count){ + while (msg_sended < row_count) { } + auto end = stdclock::now(); + auto data_size = request.ByteSize(); + LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " + << "throughput: " << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 / 1024 + << "M/s"; + for (auto &stat : stats) { if (!stat.ok()) { return stat; @@ -294,7 +309,7 @@ Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, u } MsgClientV2::~MsgClientV2() { - // insert_delete_producer_->close(); +// insert_delete_producer_->close(); for (auto &producer: paralle_mut_producers_) { producer->close(); } diff --git a/proxy/src/server/MessageWrapper.cpp b/proxy/src/server/MessageWrapper.cpp index 8fc09f8f7..511903dda 100644 --- a/proxy/src/server/MessageWrapper.cpp +++ b/proxy/src/server/MessageWrapper.cpp @@ -1,3 +1,4 @@ +#include "server/delivery/ReqScheduler.h" #include "MessageWrapper.h" #include "config/ServerConfig.h" @@ -15,11 +16,25 @@ Status MessageWrapper::Init() { int client_id = config.proxy_id(); msg_client_ = std::make_shared(client_id, pulsar_server_addr, config.pulsar.topicnum()); auto status = msg_client_->Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult"); + if (!status.ok()){ + return status; + } + + // timeSync + time_sync_ = std::make_shared(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync"); return status; } const std::shared_ptr &MessageWrapper::MessageClient() { return msg_client_; } +void MessageWrapper::Stop() { + if (time_sync_ != nullptr){ + time_sync_->Stop(); + time_sync_= nullptr; + } + msg_client_ = nullptr; +} + } } \ No newline at end of file diff --git a/proxy/src/server/MessageWrapper.h b/proxy/src/server/MessageWrapper.h index b9c008c3a..94ece631d 100644 --- a/proxy/src/server/MessageWrapper.h +++ b/proxy/src/server/MessageWrapper.h @@ -1,5 +1,6 @@ #pragma once #include "message_client/ClientV2.h" +#include "timesync/TimeSync.h" namespace milvus { namespace server { @@ -14,11 +15,14 @@ class MessageWrapper { const std::shared_ptr& MessageClient(); + void Stop(); + private: MessageWrapper() = default; private: std::shared_ptr msg_client_; + std::shared_ptr time_sync_; }; } diff --git a/proxy/src/server/MetaWrapper.cpp b/proxy/src/server/MetaWrapper.cpp index af8d6d785..ad7c07fec 100644 --- a/proxy/src/server/MetaWrapper.cpp +++ b/proxy/src/server/MetaWrapper.cpp @@ -70,7 +70,7 @@ Status MetaWrapper::Init() { return SyncMeta(); } catch (const std::exception &e) { - return Status(DB_ERROR, "Init meta error"); + return Status(DB_ERROR, "Can not connect to meta server"); } } @@ -173,5 +173,9 @@ int64_t MetaWrapper::CountCollection(const std::string &collection_name) { return count; } +void MetaWrapper::Stop() { + watcher_->Cancel(); +} + } } \ No newline at end of file diff --git a/proxy/src/server/MetaWrapper.h b/proxy/src/server/MetaWrapper.h index 168820b0f..3284b737a 100644 --- a/proxy/src/server/MetaWrapper.h +++ b/proxy/src/server/MetaWrapper.h @@ -34,6 +34,9 @@ class MetaWrapper { int64_t CountCollection(const std::string& collection_name); + void + Stop(); + private: bool IsCollectionMetaKey(const std::string &key); diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index ce63232da..27eddba91 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -236,13 +236,6 @@ Status Server::StartService() { Status stat; - // timeSync - // client id should same to MessageWrapper - int client_id = 0; - std::string pulsar_server_addr - (std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); - static timesync::TimeSync syc(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync"); - // stat = engine::KnowhereResource::Initialize(); if (!stat.ok()) { LOG_SERVER_ERROR_ << "KnowhereResource initialize fail: " << stat.message(); @@ -274,6 +267,8 @@ void Server::StopService() { // storage::S3ClientWrapper::GetInstance().StopService(); grpc::GrpcServer::GetInstance().Stop(); + MetaWrapper::GetInstance().Stop(); + MessageWrapper::GetInstance().Stop(); } void -- GitLab