diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index a5a2ca653fd1275100b71049d7a5e62ebe25a790..7fef4b5407345486e490b199c8e2a5ce49ea8b3d 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -1101,7 +1101,7 @@ Status DBImpl::Query(const std::shared_ptr& context, const std::string& table_id, const std::vector& partition_tags, uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) { - auto query_ctx = context->Child("Query"); + milvus::server::ContextChild tracer(context, "Query"); if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; @@ -1142,11 +1142,9 @@ DBImpl::Query(const std::shared_ptr& context, const std::string } cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query - status = QueryAsync(query_ctx, files_array, k, extra_params, vectors, result_ids, result_distances); + status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances); cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query - query_ctx->GetTraceContext()->GetSpan()->Finish(); - return status; } @@ -1154,7 +1152,7 @@ Status DBImpl::QueryByFileID(const std::shared_ptr& context, const std::vector& file_ids, uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) { - auto query_ctx = context->Child("Query by file id"); + milvus::server::ContextChild tracer(context, "Query by file id"); if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; @@ -1179,11 +1177,9 @@ DBImpl::QueryByFileID(const std::shared_ptr& context, const std } cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query - status = QueryAsync(query_ctx, search_files, k, extra_params, vectors, result_ids, result_distances); + status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances); cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query - query_ctx->GetTraceContext()->GetSpan()->Finish(); - return status; } @@ -1203,8 +1199,7 @@ Status DBImpl::QueryAsync(const std::shared_ptr& context, const meta::TableFilesSchema& files, uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) { - auto query_async_ctx = context->Child("Query Async"); - + milvus::server::ContextChild tracer(context, "Query Async"); server::CollectQueryMetrics metrics(vectors.vector_count_); TimeRecorder rc(""); @@ -1213,7 +1208,7 @@ DBImpl::QueryAsync(const std::shared_ptr& context, const meta:: auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files); ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); - scheduler::SearchJobPtr job = std::make_shared(query_async_ctx, k, extra_params, vectors); + scheduler::SearchJobPtr job = std::make_shared(tracer.Context(), k, extra_params, vectors); for (auto& file : files) { scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); job->AddIndexFile(file_ptr); @@ -1233,8 +1228,6 @@ DBImpl::QueryAsync(const std::shared_ptr& context, const meta:: result_distances = job->GetResultDistances(); rc.ElapseFromBegin("Engine query totally cost"); - query_async_ctx->GetTraceContext()->GetSpan()->Finish(); - return Status::OK(); } diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 016ea504176abfefc3229cdc78b70b9503683eb7..a24fc909d96ee40093efcc3ed9ace0cf57e79242 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -204,7 +204,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { void XSearchTask::Execute() { - auto execute_ctx = context_->Follower("XSearchTask::Execute " + std::to_string(index_id_)); + milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(index_id_)); if (index_engine_ == nullptr) { return; @@ -300,8 +300,6 @@ XSearchTask::Execute() { // release index in resource index_engine_ = nullptr; - - execute_ctx->GetTraceContext()->GetSpan()->Finish(); } void diff --git a/core/src/server/context/Context.cpp b/core/src/server/context/Context.cpp index 96b7ac965be4baaaabdbb82537a46d50d2311fe2..f09b30e7c37c24bd1188a4efd38556acdc86df3d 100644 --- a/core/src/server/context/Context.cpp +++ b/core/src/server/context/Context.cpp @@ -40,5 +40,31 @@ Context::Follower(const std::string& operation_name) const { return new_context; } +///////////////////////////////////////////////////////////////////////////////////////////////// +ContextChild::ContextChild(const ContextPtr& context, const std::string& operation_name) { + if (context) { + context_ = context->Child(operation_name); + } +} + +ContextChild::~ContextChild() { + if (context_) { + context_->GetTraceContext()->GetSpan()->Finish(); + } +} + +///////////////////////////////////////////////////////////////////////////////////////////////// +ContextFollower::ContextFollower(const ContextPtr& context, const std::string& operation_name) { + if (context) { + context_ = context->Child(operation_name); + } +} + +ContextFollower::~ContextFollower() { + if (context_) { + context_->GetTraceContext()->GetSpan()->Finish(); + } +} + } // namespace server } // namespace milvus diff --git a/core/src/server/context/Context.h b/core/src/server/context/Context.h index 5dde71f88b9fbca24c9840d9604dcae25bfdb846..abc2076775bd8a21e8646c064c461cc54c69ffd2 100644 --- a/core/src/server/context/Context.h +++ b/core/src/server/context/Context.h @@ -41,5 +41,35 @@ class Context { std::shared_ptr trace_context_; }; +using ContextPtr = std::shared_ptr; + +class ContextChild { + public: + explicit ContextChild(const ContextPtr& context, const std::string& operation_name); + ~ContextChild(); + + ContextPtr + Context() { + return context_; + } + + private: + ContextPtr context_; +}; + +class ContextFollower { + public: + explicit ContextFollower(const ContextPtr& context, const std::string& operation_name); + ~ContextFollower(); + + ContextPtr + Context() { + return context_; + } + + private: + ContextPtr context_; +}; + } // namespace server } // namespace milvus diff --git a/core/src/server/delivery/request/BaseRequest.h b/core/src/server/delivery/request/BaseRequest.h index 3a6c63d66be9cb33c1064dcad2398645e74661fe..d7c01ab6ccfb9aa47f6f0be73c712e1ff4d37dd9 100644 --- a/core/src/server/delivery/request/BaseRequest.h +++ b/core/src/server/delivery/request/BaseRequest.h @@ -153,7 +153,8 @@ class BaseRequest { }; protected: - BaseRequest(const std::shared_ptr& context, BaseRequest::RequestType type, bool async = false); + BaseRequest(const std::shared_ptr& context, BaseRequest::RequestType type, + bool async = false); virtual ~BaseRequest(); diff --git a/core/src/server/delivery/request/SearchCombineRequest.cpp b/core/src/server/delivery/request/SearchCombineRequest.cpp index c8791d6bddcf4411c333329e1a2aed505705a9c8..c9c99fa90296bd4b9474cbc2bfc4527e45e99a81 100644 --- a/core/src/server/delivery/request/SearchCombineRequest.cpp +++ b/core/src/server/delivery/request/SearchCombineRequest.cpp @@ -57,6 +57,38 @@ FreeRequest(SearchRequestPtr& request, const Status& status) { request->set_status(status); request->Done(); } + +class TracingContextList { + public: + TracingContextList() = default; + ~TracingContextList() { + Finish(); + } + + void + CreateChild(std::vector& requests, const std::string& operation_name) { + Finish(); + for (auto& request : requests) { + auto parent_context = request->Context(); + if (parent_context) { + auto child_context = request->Context()->Child(operation_name); + context_list_.emplace_back(child_context); + } + } + } + + void + Finish() { + for (auto& context : context_list_) { + context->GetTraceContext()->GetSpan()->Finish(); + } + context_list_.clear(); + } + + private: + std::vector context_list_; +}; + } // namespace SearchCombineRequest::SearchCombineRequest() : BaseRequest(nullptr, BaseRequest::kSearchCombine) { @@ -306,17 +338,21 @@ SearchCombineRequest::OnExecute() { // step 6: search vectors const std::vector& partition_list = first_request->PartitionList(); const std::vector& file_id_list = first_request->FileIDList(); - auto context = first_request->Context(); engine::ResultIds result_ids; engine::ResultDistances result_distances; - if (file_id_list_.empty()) { - status = DBWrapper::DB()->Query(context, table_name_, partition_list, (size_t)search_topk_, extra_params_, - vectors_data_, result_ids, result_distances); - } else { - status = DBWrapper::DB()->QueryByFileID(context, file_id_list, (size_t)search_topk_, extra_params_, - vectors_data_, result_ids, result_distances); + { + TracingContextList context_list; + context_list.CreateChild(request_list_, "Combine Query"); + + if (file_id_list_.empty()) { + status = DBWrapper::DB()->Query(nullptr, table_name_, partition_list, (size_t)search_topk_, + extra_params_, vectors_data_, result_ids, result_distances); + } else { + status = DBWrapper::DB()->QueryByFileID(nullptr, file_id_list, (size_t)search_topk_, extra_params_, + vectors_data_, result_ids, result_distances); + } } rc.RecordSection("search combined vectors from engine");