提交 09e43b0b 编写于 作者: S shengjh 提交者: yefu.chen

Fix resource dead lock

Signed-off-by: Nshengjh <1572099106@qq.com>
上级 f4d29c97
......@@ -7,6 +7,7 @@
#include <omp.h>
#include <numeric>
#include <algorithm>
#include "log/Log.h"
namespace milvus::message_client {
......@@ -161,6 +162,8 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
const std::function<uint64_t(const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp)> &segment_id) {
using stdclock = std::chrono::high_resolution_clock;
auto start = stdclock::now();
// may have retry policy?
auto row_count = request.rows_data_size();
auto topic_num = config.pulsar.topicnum();
......@@ -184,7 +187,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i));
mut_msg.mutable_extra_params()->CopyFrom(request.extra_params());
auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){
auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) {
msg_sended += 1;
if (result != pulsar::ResultOk) {
stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result));
......@@ -197,9 +200,15 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
stats[this_thread] = Status(DB_ERROR, e.what());
}
}
while (msg_sended < row_count){
while (msg_sended < row_count) {
}
auto end = stdclock::now();
auto data_size = request.ByteSize();
LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, "
<< "throughput: " << data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000 / 1024.0 / 1024
<< "M/s";
for (auto &stat : stats) {
if (!stat.ok()) {
return stat;
......@@ -213,6 +222,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
const std::function<uint64_t(const std::string &collection_name,
uint64_t channel_id,
uint64_t timestamp)> &segment_id) {
using stdclock = std::chrono::high_resolution_clock;
auto start = stdclock::now();
auto row_count = request.id_array_size();
auto topicnum = config.pulsar.topicnum();
auto stats = std::vector<Status>(topicnum);
......@@ -226,12 +238,9 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
mut_msg.set_uid(request.id_array(i));
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_timestamp(timestamp);
uint64_t uid = request.id_array(i);
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topicnum;
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
int this_thread = omp_get_thread_num();
auto callback = [&stats, &msg_sended,this_thread](Result result, const pulsar::MessageId& messageId){
auto callback = [&stats, &msg_sended, this_thread](Result result, const pulsar::MessageId &messageId) {
msg_sended += 1;
if (result != pulsar::ResultOk) {
stats[this_thread] = Status(DB_ERROR, pulsar::strResult(result));
......@@ -239,9 +248,15 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
};
paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback);
}
while (msg_sended < row_count){
while (msg_sended < row_count) {
}
auto end = stdclock::now();
auto data_size = request.ByteSize();
LOG_SERVER_INFO_ << "InsertReq Batch size:" << data_size / 1024.0 / 1024.0 << "M, "
<< "throughput: " << data_size / std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() * 1000 / 1024.0 / 1024
<< "M/s";
for (auto &stat : stats) {
if (!stat.ok()) {
return stat;
......@@ -294,7 +309,7 @@ Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, u
}
MsgClientV2::~MsgClientV2() {
// insert_delete_producer_->close();
// insert_delete_producer_->close();
for (auto &producer: paralle_mut_producers_) {
producer->close();
}
......
#include "server/delivery/ReqScheduler.h"
#include "MessageWrapper.h"
#include "config/ServerConfig.h"
......@@ -15,11 +16,25 @@ Status MessageWrapper::Init() {
int client_id = config.proxy_id();
msg_client_ = std::make_shared<message_client::MsgClientV2>(client_id, pulsar_server_addr, config.pulsar.topicnum());
auto status = msg_client_->Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult");
if (!status.ok()){
return status;
}
// timeSync
time_sync_ = std::make_shared<timesync::TimeSync>(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync");
return status;
}
const std::shared_ptr<message_client::MsgClientV2> &MessageWrapper::MessageClient() {
return msg_client_;
}
void MessageWrapper::Stop() {
if (time_sync_ != nullptr){
time_sync_->Stop();
time_sync_= nullptr;
}
msg_client_ = nullptr;
}
}
}
\ No newline at end of file
#pragma once
#include "message_client/ClientV2.h"
#include "timesync/TimeSync.h"
namespace milvus {
namespace server {
......@@ -14,11 +15,14 @@ class MessageWrapper {
const std::shared_ptr<message_client::MsgClientV2>&
MessageClient();
void Stop();
private:
MessageWrapper() = default;
private:
std::shared_ptr<message_client::MsgClientV2> msg_client_;
std::shared_ptr<milvus::timesync::TimeSync> time_sync_;
};
}
......
......@@ -70,7 +70,7 @@ Status MetaWrapper::Init() {
return SyncMeta();
}
catch (const std::exception &e) {
return Status(DB_ERROR, "Init meta error");
return Status(DB_ERROR, "Can not connect to meta server");
}
}
......@@ -173,5 +173,9 @@ int64_t MetaWrapper::CountCollection(const std::string &collection_name) {
return count;
}
void MetaWrapper::Stop() {
watcher_->Cancel();
}
}
}
\ No newline at end of file
......@@ -34,6 +34,9 @@ class MetaWrapper {
int64_t
CountCollection(const std::string& collection_name);
void
Stop();
private:
bool IsCollectionMetaKey(const std::string &key);
......
......@@ -236,13 +236,6 @@ Status
Server::StartService() {
Status stat;
// timeSync
// client id should same to MessageWrapper
int client_id = 0;
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
static timesync::TimeSync syc(client_id, GetMessageTimeSyncTime, config.timesync.interval(), pulsar_server_addr, "TimeSync");
// stat = engine::KnowhereResource::Initialize();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "KnowhereResource initialize fail: " << stat.message();
......@@ -274,6 +267,8 @@ void
Server::StopService() {
// storage::S3ClientWrapper::GetInstance().StopService();
grpc::GrpcServer::GetInstance().Stop();
MetaWrapper::GetInstance().Stop();
MessageWrapper::GetInstance().Stop();
}
void
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册