未验证 提交 0e914949 编写于 作者: G groot 提交者: GitHub

#1734 Opentracing for combined search request (#1737)

Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 13c8d55f
...@@ -1101,7 +1101,7 @@ Status ...@@ -1101,7 +1101,7 @@ Status
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id, DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params, const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) { const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
auto query_ctx = context->Child("Query"); milvus::server::ContextChild tracer(context, "Query");
if (!initialized_.load(std::memory_order_acquire)) { if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR; return SHUTDOWN_ERROR;
...@@ -1142,11 +1142,9 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string ...@@ -1142,11 +1142,9 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
} }
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(query_ctx, files_array, k, extra_params, vectors, result_ids, result_distances); status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
query_ctx->GetTraceContext()->GetSpan()->Finish();
return status; return status;
} }
...@@ -1154,7 +1152,7 @@ Status ...@@ -1154,7 +1152,7 @@ Status
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids, DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
ResultDistances& result_distances) { ResultDistances& result_distances) {
auto query_ctx = context->Child("Query by file id"); milvus::server::ContextChild tracer(context, "Query by file id");
if (!initialized_.load(std::memory_order_acquire)) { if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR; return SHUTDOWN_ERROR;
...@@ -1179,11 +1177,9 @@ DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std ...@@ -1179,11 +1177,9 @@ DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std
} }
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(query_ctx, search_files, k, extra_params, vectors, result_ids, result_distances); status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
query_ctx->GetTraceContext()->GetSpan()->Finish();
return status; return status;
} }
...@@ -1203,8 +1199,7 @@ Status ...@@ -1203,8 +1199,7 @@ Status
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::TableFilesSchema& files, uint64_t k, DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::TableFilesSchema& files, uint64_t k,
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
ResultDistances& result_distances) { ResultDistances& result_distances) {
auto query_async_ctx = context->Child("Query Async"); milvus::server::ContextChild tracer(context, "Query Async");
server::CollectQueryMetrics metrics(vectors.vector_count_); server::CollectQueryMetrics metrics(vectors.vector_count_);
TimeRecorder rc(""); TimeRecorder rc("");
...@@ -1213,7 +1208,7 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta:: ...@@ -1213,7 +1208,7 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::
auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files); auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(query_async_ctx, k, extra_params, vectors); scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
for (auto& file : files) { for (auto& file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file); scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr); job->AddIndexFile(file_ptr);
...@@ -1233,8 +1228,6 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta:: ...@@ -1233,8 +1228,6 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::
result_distances = job->GetResultDistances(); result_distances = job->GetResultDistances();
rc.ElapseFromBegin("Engine query totally cost"); rc.ElapseFromBegin("Engine query totally cost");
query_async_ctx->GetTraceContext()->GetSpan()->Finish();
return Status::OK(); return Status::OK();
} }
......
...@@ -204,7 +204,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { ...@@ -204,7 +204,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
void void
XSearchTask::Execute() { XSearchTask::Execute() {
auto execute_ctx = context_->Follower("XSearchTask::Execute " + std::to_string(index_id_)); milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(index_id_));
if (index_engine_ == nullptr) { if (index_engine_ == nullptr) {
return; return;
...@@ -300,8 +300,6 @@ XSearchTask::Execute() { ...@@ -300,8 +300,6 @@ XSearchTask::Execute() {
// release index in resource // release index in resource
index_engine_ = nullptr; index_engine_ = nullptr;
execute_ctx->GetTraceContext()->GetSpan()->Finish();
} }
void void
......
...@@ -40,5 +40,31 @@ Context::Follower(const std::string& operation_name) const { ...@@ -40,5 +40,31 @@ Context::Follower(const std::string& operation_name) const {
return new_context; return new_context;
} }
/////////////////////////////////////////////////////////////////////////////////////////////////
ContextChild::ContextChild(const ContextPtr& context, const std::string& operation_name) {
if (context) {
context_ = context->Child(operation_name);
}
}
ContextChild::~ContextChild() {
if (context_) {
context_->GetTraceContext()->GetSpan()->Finish();
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////
ContextFollower::ContextFollower(const ContextPtr& context, const std::string& operation_name) {
if (context) {
context_ = context->Child(operation_name);
}
}
ContextFollower::~ContextFollower() {
if (context_) {
context_->GetTraceContext()->GetSpan()->Finish();
}
}
} // namespace server } // namespace server
} // namespace milvus } // namespace milvus
...@@ -41,5 +41,35 @@ class Context { ...@@ -41,5 +41,35 @@ class Context {
std::shared_ptr<tracing::TraceContext> trace_context_; std::shared_ptr<tracing::TraceContext> trace_context_;
}; };
using ContextPtr = std::shared_ptr<milvus::server::Context>;
class ContextChild {
public:
explicit ContextChild(const ContextPtr& context, const std::string& operation_name);
~ContextChild();
ContextPtr
Context() {
return context_;
}
private:
ContextPtr context_;
};
class ContextFollower {
public:
explicit ContextFollower(const ContextPtr& context, const std::string& operation_name);
~ContextFollower();
ContextPtr
Context() {
return context_;
}
private:
ContextPtr context_;
};
} // namespace server } // namespace server
} // namespace milvus } // namespace milvus
...@@ -153,7 +153,8 @@ class BaseRequest { ...@@ -153,7 +153,8 @@ class BaseRequest {
}; };
protected: protected:
BaseRequest(const std::shared_ptr<Context>& context, BaseRequest::RequestType type, bool async = false); BaseRequest(const std::shared_ptr<milvus::server::Context>& context, BaseRequest::RequestType type,
bool async = false);
virtual ~BaseRequest(); virtual ~BaseRequest();
......
...@@ -57,6 +57,38 @@ FreeRequest(SearchRequestPtr& request, const Status& status) { ...@@ -57,6 +57,38 @@ FreeRequest(SearchRequestPtr& request, const Status& status) {
request->set_status(status); request->set_status(status);
request->Done(); request->Done();
} }
class TracingContextList {
public:
TracingContextList() = default;
~TracingContextList() {
Finish();
}
void
CreateChild(std::vector<SearchRequestPtr>& requests, const std::string& operation_name) {
Finish();
for (auto& request : requests) {
auto parent_context = request->Context();
if (parent_context) {
auto child_context = request->Context()->Child(operation_name);
context_list_.emplace_back(child_context);
}
}
}
void
Finish() {
for (auto& context : context_list_) {
context->GetTraceContext()->GetSpan()->Finish();
}
context_list_.clear();
}
private:
std::vector<milvus::server::ContextPtr> context_list_;
};
} // namespace } // namespace
SearchCombineRequest::SearchCombineRequest() : BaseRequest(nullptr, BaseRequest::kSearchCombine) { SearchCombineRequest::SearchCombineRequest() : BaseRequest(nullptr, BaseRequest::kSearchCombine) {
...@@ -306,17 +338,21 @@ SearchCombineRequest::OnExecute() { ...@@ -306,17 +338,21 @@ SearchCombineRequest::OnExecute() {
// step 6: search vectors // step 6: search vectors
const std::vector<std::string>& partition_list = first_request->PartitionList(); const std::vector<std::string>& partition_list = first_request->PartitionList();
const std::vector<std::string>& file_id_list = first_request->FileIDList(); const std::vector<std::string>& file_id_list = first_request->FileIDList();
auto context = first_request->Context();
engine::ResultIds result_ids; engine::ResultIds result_ids;
engine::ResultDistances result_distances; engine::ResultDistances result_distances;
if (file_id_list_.empty()) { {
status = DBWrapper::DB()->Query(context, table_name_, partition_list, (size_t)search_topk_, extra_params_, TracingContextList context_list;
vectors_data_, result_ids, result_distances); context_list.CreateChild(request_list_, "Combine Query");
} else {
status = DBWrapper::DB()->QueryByFileID(context, file_id_list, (size_t)search_topk_, extra_params_, if (file_id_list_.empty()) {
vectors_data_, result_ids, result_distances); status = DBWrapper::DB()->Query(nullptr, table_name_, partition_list, (size_t)search_topk_,
extra_params_, vectors_data_, result_ids, result_distances);
} else {
status = DBWrapper::DB()->QueryByFileID(nullptr, file_id_list, (size_t)search_topk_, extra_params_,
vectors_data_, result_ids, result_distances);
}
} }
rc.RecordSection("search combined vectors from engine"); rc.RecordSection("search combined vectors from engine");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册