From a38c84e7e71957e62b50d8ca73c64e584d37f408 Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Fri, 29 Apr 2022 13:37:47 +0800 Subject: [PATCH] Lock meta-replica during search/query (#16703) Signed-off-by: Letian Jiang --- internal/querynode/query_shard.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/querynode/query_shard.go b/internal/querynode/query_shard.go index 328118a15..8c4083c37 100644 --- a/internal/querynode/query_shard.go +++ b/internal/querynode/query_shard.go @@ -292,6 +292,16 @@ func (q *queryShard) search(ctx context.Context, req *querypb.SearchRequest) (*i return nil, errors.New("search context timeout") } + // lock historic meta-replica + q.historical.replica.queryRLock() + defer q.historical.replica.queryRUnlock() + + // lock streaming meta-replica for shard leader + if len(req.SegmentIDs) == 0 { + q.streaming.replica.queryRLock() + defer q.streaming.replica.queryRUnlock() + } + // check if collection has been released collection, err := q.historical.replica.getCollectionByID(collectionID) if err != nil { @@ -350,8 +360,6 @@ func (q *queryShard) search(ctx context.Context, req *querypb.SearchRequest) (*i func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchRequest, searchRequests []*searchRequest, collectionID UniqueID, partitionIDs []UniqueID, schemaHelper *typeutil.SchemaHelper, plan *SearchPlan, topK int64, queryNum int64, timestamp Timestamp) (*internalpb.SearchResults, error) { - q.streaming.replica.queryRLock() - defer q.streaming.replica.queryRUnlock() cluster, ok := q.clusterService.getShardCluster(req.GetDmlChannel()) if !ok { return nil, fmt.Errorf("channel %s leader is not here", req.GetDmlChannel()) @@ -493,8 +501,6 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques func (q *queryShard) searchFollower(ctx context.Context, req *querypb.SearchRequest, searchRequests []*searchRequest, collectionID UniqueID, partitionIDs []UniqueID, schemaHelper *typeutil.SchemaHelper, plan *SearchPlan, topK int64, queryNum int64, timestamp Timestamp) (*internalpb.SearchResults, error) { - q.historical.replica.queryRLock() - defer q.historical.replica.queryRUnlock() segmentIDs := req.GetSegmentIDs() // hold request until guarantee timestamp >= service timestamp guaranteeTs := req.GetReq().GetGuaranteeTimestamp() @@ -700,6 +706,16 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int return nil, errors.New("search context timeout") } + // lock historic meta-replica + q.historical.replica.queryRLock() + defer q.historical.replica.queryRUnlock() + + // lock streaming meta-replica for shard leader + if len(req.SegmentIDs) == 0 { + q.streaming.replica.queryRLock() + defer q.streaming.replica.queryRUnlock() + } + // check if collection has been released collection, err := q.streaming.replica.getCollectionByID(collectionID) if err != nil { @@ -737,8 +753,6 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int // check if shard leader b.c only leader receives request with no segment specified if len(req.GetSegmentIDs()) == 0 { - q.streaming.replica.queryRLock() - defer q.streaming.replica.queryRUnlock() cluster, ok := q.clusterService.getShardCluster(req.GetDmlChannel()) if !ok { return nil, fmt.Errorf("channel %s leader is not here", req.GetDmlChannel()) @@ -814,8 +828,7 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int log.Debug("leader retrieve result", zap.String("channel", req.DmlChannel), zap.String("ids", mergedResults.Ids.String())) return mergedResults, nil } - q.historical.replica.queryRLock() - defer q.historical.replica.queryRUnlock() + // hold request until guarantee timestamp >= service timestamp guaranteeTs := req.GetReq().GetGuaranteeTimestamp() q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta) -- GitLab