diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index edcc7fb9c9ff78e62777393e5c341ccb8e7f3ba1..d8769bd3dcee3c5f3acb70edde3e2c2509d8167a 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "log/Log.h" namespace milvus::message_client { @@ -161,6 +162,8 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, const std::function &segment_id) { + using stdclock = std::chrono::high_resolution_clock; + auto start = stdclock::now(); // may have retry policy? auto row_count = request.rows_data_size(); auto topic_num = config.pulsar.topicnum(); @@ -184,7 +187,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i)); mut_msg.mutable_extra_params()->CopyFrom(request.extra_params()); - auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){ + auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) { msg_sended += 1; if (result != pulsar::ResultOk) { stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); @@ -197,9 +200,15 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, stats[this_thread] = Status(DB_ERROR, e.what()); } } - while (msg_sended < row_count){ + while (msg_sended < row_count) { } + auto end = stdclock::now(); + auto data_size = request.ByteSize(); + LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " + << "throughput: " << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 / 1024 + << "M/s"; + for (auto &stat : stats) { if (!stat.ok()) { return stat; @@ -213,6 +222,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, const std::function &segment_id) { + using stdclock = std::chrono::high_resolution_clock; + auto start = stdclock::now(); + auto row_count = request.id_array_size(); auto topicnum = config.pulsar.topicnum(); auto stats = std::vector(topicnum); @@ -226,12 +238,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, mut_msg.set_uid(request.id_array(i)); mut_msg.set_collection_name(request.collection_name()); mut_msg.set_timestamp(timestamp); - uint64_t uid = request.id_array(i); - auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topicnum; - mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); int this_thread = omp_get_thread_num(); - auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){ + auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) { msg_sended += 1; if (result != pulsar::ResultOk) { stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result)); @@ -239,9 +248,15 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, }; paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback); } - while (msg_sended < row_count){ + while (msg_sended < row_count) { } + auto end = stdclock::now(); + auto data_size = request.ByteSize(); + LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, " + << "throughput: " << data_size / std::chrono::duration_cast(end - start).count() * 1000 / 1024.0 / 1024 + << "M/s"; + for (auto &stat : stats) { if (!stat.ok()) { return stat; @@ -294,7 +309,7 @@ Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, u } MsgClientV2::~MsgClientV2() { - // insert_delete_producer_->close(); +// insert_delete_producer_->close(); for (auto &producer: paralle_mut_producers_) { producer->close(); } diff --git a/proxy/src/server/MessageWrapper.cpp b/proxy/src/server/MessageWrapper.cpp index 8fc09f8f79326eb3ba52b72dd4dad8be84d54d67..511903dda3cfda9f01ed4d8dce990ac2d0243b7d 100644 --- a/proxy/src/server/MessageWrapper.cpp +++ b/proxy/src/server/MessageWrapper.cpp @@ -1,3 +1,4 @@ +#include "server/delivery/ReqScheduler.h" #include "MessageWrapper.h" #include "config/ServerConfig.h" @@ -15,11 +16,25 @@ Status MessageWrapper::Init() { int client_id = config.proxy_id(); msg_client_ = std::make_shared(client_id, pulsar_server_addr, config.pulsar.topicnum()); auto status = msg_client_->Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult"); + if (!status.ok()){ + return status; + } + + // timeSync + time_sync_ = std::make_shared(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync"); return status; } const std::shared_ptr &MessageWrapper::MessageClient() { return msg_client_; } +void MessageWrapper::Stop() { + if (time_sync_ != nullptr){ + time_sync_->Stop(); + time_sync_= nullptr; + } + msg_client_ = nullptr; +} + } } \ No newline at end of file diff --git a/proxy/src/server/MessageWrapper.h b/proxy/src/server/MessageWrapper.h index b9c008c3a0e3b7c9a4d53cf3c5345d6a7187ea7c..94ece631d74e14e7fd5b3943b393685b08c3674f 100644 --- a/proxy/src/server/MessageWrapper.h +++ b/proxy/src/server/MessageWrapper.h @@ -1,5 +1,6 @@ #pragma once #include "message_client/ClientV2.h" +#include "timesync/TimeSync.h" namespace milvus { namespace server { @@ -14,11 +15,14 @@ class MessageWrapper { const std::shared_ptr& MessageClient(); + void Stop(); + private: MessageWrapper() = default; private: std::shared_ptr msg_client_; + std::shared_ptr time_sync_; }; } diff --git a/proxy/src/server/MetaWrapper.cpp b/proxy/src/server/MetaWrapper.cpp index af8d6d785ce2a00fdac04171ec278b41fe6af966..ad7c07fec9bf6af9efac50ab631bb5f0ed2f6078 100644 --- a/proxy/src/server/MetaWrapper.cpp +++ b/proxy/src/server/MetaWrapper.cpp @@ -70,7 +70,7 @@ Status MetaWrapper::Init() { return SyncMeta(); } catch (const std::exception &e) { - return Status(DB_ERROR, "Init meta error"); + return Status(DB_ERROR, "Can not connect to meta server"); } } @@ -173,5 +173,9 @@ int64_t MetaWrapper::CountCollection(const std::string &collection_name) { return count; } +void MetaWrapper::Stop() { + watcher_->Cancel(); +} + } } \ No newline at end of file diff --git a/proxy/src/server/MetaWrapper.h b/proxy/src/server/MetaWrapper.h index 168820b0ffea335dc17c0194b74af15cc7533f83..3284b737a0394ca831d2d6cc25d732349c2f7e92 100644 --- a/proxy/src/server/MetaWrapper.h +++ b/proxy/src/server/MetaWrapper.h @@ -34,6 +34,9 @@ class MetaWrapper { int64_t CountCollection(const std::string& collection_name); + void + Stop(); + private: bool IsCollectionMetaKey(const std::string &key); diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index ce63232da7f12bcc1de6402afd699bf44d27259d..27eddba91d72a138cb13fd5093c5694126fecda5 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -236,13 +236,6 @@ Status Server::StartService() { Status stat; - // timeSync - // client id should same to MessageWrapper - int client_id = 0; - std::string pulsar_server_addr - (std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); - static timesync::TimeSync syc(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync"); - // stat = engine::KnowhereResource::Initialize(); if (!stat.ok()) { LOG_SERVER_ERROR_ << "KnowhereResource initialize fail: " << stat.message(); @@ -274,6 +267,8 @@ void Server::StopService() { // storage::S3ClientWrapper::GetInstance().StopService(); grpc::GrpcServer::GetInstance().Stop(); + MetaWrapper::GetInstance().Stop(); + MessageWrapper::GetInstance().Stop(); } void