未验证 提交 e40d95e0 编写于 作者: W wei liu 提交者: GitHub

add search/query request cost metrics on qn (#24413)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 7faf934f
......@@ -117,6 +117,15 @@ message SearchResults {
bytes sliced_blob = 10;
int64 sliced_num_count = 11;
int64 sliced_offset = 12;
// search request cost
CostAggregation costAggregation = 13;
}
message CostAggregation {
int64 responseTime = 1;
int64 serviceTime = 2;
int64 totalNQ = 3;
}
message RetrieveRequest {
......@@ -136,6 +145,8 @@ message RetrieveRequest {
int64 iteration_extension_reduce_rate = 14;
}
message RetrieveResults {
common.MsgBase base = 1;
common.Status status = 2;
......@@ -145,6 +156,9 @@ message RetrieveResults {
repeated int64 sealed_segmentIDs_retrieved = 6;
repeated string channelIDs_retrieved = 7;
repeated int64 global_sealed_segmentIDs = 8;
// query request cost
CostAggregation costAggregation = 13;
}
message LoadIndex {
......
......@@ -264,7 +264,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
}
if !funcutil.SliceContain(req.GetDmlChannels(), sd.vchannelName) {
log.Warn("deletgator received query request not belongs to it",
log.Warn("delegator received query request not belongs to it",
zap.Strings("reqChannels", req.GetDmlChannels()),
)
return nil, fmt.Errorf("dml channel not match, delegator channel %s, search channels %v", sd.vchannelName, req.GetDmlChannels())
......
......@@ -22,6 +22,7 @@ import (
"math"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
......@@ -66,6 +67,14 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
return nil, err
}
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
if result.CostAggregation == nil {
return nil, false
}
return result.CostAggregation, true
})
searchResults.CostAggregation = mergeRequestCost(requestCosts)
return searchResults, nil
}
......@@ -274,6 +283,14 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
log.Debug("skip duplicated query result while reducing internal.RetrieveResults", zap.Int64("dupCount", skipDupCnt))
}
requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) {
if result.CostAggregation == nil {
return nil, false
}
return result.CostAggregation, true
})
ret.CostAggregation = mergeRequestCost(requestCosts)
return ret, nil
}
......
......@@ -22,6 +22,7 @@ import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
......@@ -574,6 +575,39 @@ func (suite *ResultSuite) TestSort() {
suite.Equal([]byte{2, 3, 4, 5, 6, 7, 8, 9}, result.FieldsData[7].GetVectors().GetBinaryVector())
}
func TestResult_MergeRequestCost(t *testing.T) {
costs := []*internalpb.CostAggregation{
{
ResponseTime: 11,
ServiceTime: 12,
TotalNQ: 13,
},
{
ResponseTime: 21,
ServiceTime: 22,
TotalNQ: 23,
},
{
ResponseTime: 31,
ServiceTime: 32,
TotalNQ: 33,
},
{
ResponseTime: 41,
ServiceTime: 42,
TotalNQ: 43,
},
}
channelCost := mergeRequestCost(costs)
assert.Equal(t, int64(41), channelCost.ResponseTime)
assert.Equal(t, int64(42), channelCost.ServiceTime)
assert.Equal(t, int64(43), channelCost.TotalNQ)
}
func TestResult(t *testing.T) {
suite.Run(t, new(ResultSuite))
}
......@@ -12,6 +12,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
......@@ -282,3 +283,16 @@ func fillFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath strin
return fmt.Errorf("invalid data type: %s", fieldData.Type.String())
}
}
// mergeRequestCost merge the costs of request, the cost may came from different worker in same channel
// or different channel in same collection, for now we just choose the part with the highest response time
func mergeRequestCost(requestCosts []*internalpb.CostAggregation) *internalpb.CostAggregation {
var result *internalpb.CostAggregation
for _, cost := range requestCosts {
if result == nil || result.ResponseTime < cost.ResponseTime {
result = cost
}
}
return result
}
......@@ -701,29 +701,29 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
searchCtx, cancel := context.WithCancel(ctx)
defer cancel()
tr := timerecord.NewTimeRecorder("searchChannel")
log.Debug("search channel...")
tr := timerecord.NewTimeRecorder("searchSegments")
log.Debug("search segments...")
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
if collection == nil {
log.Warn("failed to search channel", zap.Error(segments.ErrCollectionNotFound))
log.Warn("failed to search segments", zap.Error(segments.ErrCollectionNotFound))
return nil, segments.WrapCollectionNotFound(req.GetReq().GetCollectionID())
}
task := tasks.NewSearchTask(searchCtx, collection, node.manager, req)
if !node.scheduler.Add(task) {
err := merr.WrapErrTaskQueueFull()
log.Warn("failed to search channel", zap.Error(err))
log.Warn("failed to search segments", zap.Error(err))
return nil, err
}
err := task.Wait()
if err != nil {
log.Warn("failed to search channel", zap.Error(err))
log.Warn("failed to search segments", zap.Error(err))
return nil, err
}
tr.CtxElapse(ctx, fmt.Sprintf("search channel done, channel = %s, segmentIDs = %v",
tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v",
channel,
req.GetSegmentIDs(),
))
......@@ -732,7 +732,13 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
return task.Result(), nil
result := task.Result()
if result.CostAggregation != nil {
// update channel's response time
result.CostAggregation.ResponseTime = int64(latency)
}
return result, nil
}
// Search performs replica search tasks.
......@@ -813,7 +819,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
}
runningGp.Go(func() error {
ret, err := node.searchChannel(runningCtx, req, ch)
mu.Lock()
defer mu.Unlock()
......@@ -835,24 +840,27 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
}
tr := timerecord.NewTimeRecorderWithTrace(ctx, "searchRequestReduce")
ret, err := segments.ReduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
result, err := segments.ReduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
if err != nil {
log.Warn("failed to reduce search results", zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
failRet.Status.Reason = err.Error()
return failRet, nil
}
metrics.QueryNodeReduceLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel).
metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).
Observe(float64(tr.ElapseSpan().Milliseconds()))
if !req.FromShardLeader {
collector.Rate.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
collector.Rate.Add(metricsinfo.SearchThroughput, float64(proto.Size(req)))
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(proto.Size(req)))
collector.Rate.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq()))
collector.Rate.Add(metricsinfo.SearchThroughput, float64(proto.Size(req)))
metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).
Add(float64(proto.Size(req)))
if result.CostAggregation != nil {
// update channel's response time
currentTotalNQ := node.scheduler.GetWaitingTaskTotalNQ()
result.CostAggregation.TotalNQ = currentTotalNQ
}
return ret, nil
return result, nil
}
// only used for delegator query segments from worker
......@@ -882,7 +890,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
}
defer node.lifetime.Done()
log.Debug("start do query with channel",
log.Debug("start do query segments",
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
)
......@@ -890,7 +898,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
tr := timerecord.NewTimeRecorder("queryChannel")
tr := timerecord.NewTimeRecorder("querySegments")
results, err := node.querySegments(queryCtx, req)
if err != nil {
log.Warn("failed to query channel", zap.Error(err))
......@@ -910,6 +918,11 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
results.CostAggregation = &internalpb.CostAggregation{
ServiceTime: latency.Milliseconds(),
ResponseTime: latency.Milliseconds(),
TotalNQ: 0,
}
return results, nil
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"go.uber.org/atomic"
)
const (
......@@ -20,10 +21,8 @@ type Scheduler struct {
mergingSearchTasks []*SearchTask
mergedSearchTasks chan *SearchTask
queryProcessQueue chan *QueryTask
queryWaitQueue chan *QueryTask
pool *conc.Pool[any]
pool *conc.Pool[any]
waitingTaskTotalNQ atomic.Int64
}
func NewScheduler() *Scheduler {
......@@ -33,9 +32,9 @@ func NewScheduler() *Scheduler {
searchWaitQueue: make(chan *SearchTask, maxWaitTaskNum),
mergingSearchTasks: make([]*SearchTask, 0),
mergedSearchTasks: make(chan *SearchTask),
// queryProcessQueue: make(chan),
pool: conc.NewPool[any](maxReadConcurrency, conc.WithPreAlloc(true)),
pool: conc.NewPool[any](maxReadConcurrency, conc.WithPreAlloc(true)),
waitingTaskTotalNQ: *atomic.NewInt64(0),
}
}
......@@ -46,6 +45,7 @@ func (s *Scheduler) Add(task Task) bool {
select {
case s.searchWaitQueue <- t:
collector.Counter.Inc(metricsinfo.ReadyQueueType, 1)
s.waitingTaskTotalNQ.Add(t.nq)
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
default:
return false
......@@ -165,6 +165,11 @@ func (s *Scheduler) process(t Task) {
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
return nil, err
})
switch t := t.(type) {
case *SearchTask:
s.waitingTaskTotalNQ.Sub(t.nq)
}
}
// mergeTasks merge the given task with one of merged tasks,
......@@ -174,6 +179,7 @@ func (s *Scheduler) mergeTasks(t Task) {
merged := false
for _, task := range s.mergingSearchTasks {
if task.Merge(t) {
s.waitingTaskTotalNQ.Sub(t.nq)
merged = true
break
}
......@@ -183,3 +189,7 @@ func (s *Scheduler) mergeTasks(t Task) {
}
}
}
func (s *Scheduler) GetWaitingTaskTotalNQ() int64 {
return s.waitingTaskTotalNQ.Load()
}
......@@ -63,8 +63,7 @@ func NewSearchTask(ctx context.Context,
originTopks: []int64{req.GetReq().GetTopk()},
originNqs: []int64{req.GetReq().GetNq()},
notifier: make(chan error, 1),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "searchTask"),
tr: timerecord.NewTimeRecorderWithTrace(ctx, "searchTask"),
}
}
......@@ -86,6 +85,8 @@ func (t *SearchTask) Execute() error {
zap.String("shard", t.req.GetDmlChannels()[0]),
)
executeRecord := timerecord.NewTimeRecorderWithTrace(t.ctx, "searchTaskExecute")
req := t.req
t.combinePlaceHolderGroups()
searchReq, err := segments.NewSearchRequest(t.collection, req, t.placeholderGroup)
......@@ -135,12 +136,15 @@ func (t *SearchTask) Execute() error {
TopK: t.originTopks[i],
SlicedOffset: 1,
SlicedNumCount: 1,
CostAggregation: &internalpb.CostAggregation{
ServiceTime: executeRecord.ElapseSpan().Milliseconds(),
},
}
}
return nil
}
tr := timerecord.NewTimeRecorderWithTrace(t.ctx, "searchTaskReduce")
reduceRecord := timerecord.NewTimeRecorderWithTrace(t.ctx, "searchTaskReduce")
blobs, err := segments.ReduceSearchResultsAndFillData(
searchReq.Plan(),
results,
......@@ -174,7 +178,7 @@ func (t *SearchTask) Execute() error {
metrics.QueryNodeReduceLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
metrics.SearchLabel).
Observe(float64(tr.ElapseSpan().Milliseconds()))
Observe(float64(reduceRecord.ElapseSpan().Milliseconds()))
task.result = &internalpb.SearchResults{
Status: util.WrapStatus(commonpb.ErrorCode_Success, ""),
......@@ -184,6 +188,9 @@ func (t *SearchTask) Execute() error {
SlicedBlob: bs,
SlicedOffset: 1,
SlicedNumCount: 1,
CostAggregation: &internalpb.CostAggregation{
ServiceTime: executeRecord.ElapseSpan().Milliseconds(),
},
}
}
return nil
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册