From 94f0951f0d73dac233c6d250c621fa94dcb3acc2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 16 May 2022 18:23:55 +0800 Subject: [PATCH] Fix query lock logic in query shard (#17034) Previously query shard locks the querylock in collectionReplica before any search/query The lock range is too large and easy to cause dead lock This PR makes following changes: - Rename collectionReplica to metaReplica which is more reasonable - Make release collection operation cancels waiting search/query request - Reduce the queryLock to collection level - Add some unit tests for timeout & released case Signed-off-by: Congqi Xia --- internal/querynode/collection.go | 11 +- internal/querynode/collection_test.go | 5 +- internal/querynode/impl.go | 1 + internal/querynode/impl_test.go | 8 +- ...{collection_replica.go => meta_replica.go} | 404 +++++++++--------- ...n_replica_test.go => meta_replica_test.go} | 34 +- internal/querynode/query_collection.go | 2 +- internal/querynode/query_shard.go | 92 ++-- internal/querynode/query_shard_test.go | 128 +++++- internal/querynode/task.go | 7 +- internal/querynode/tsafe.go | 7 + internal/querynode/tsafe_replica.go | 4 + 12 files changed, 430 insertions(+), 273 deletions(-) rename internal/querynode/{collection_replica.go => meta_replica.go} (60%) rename internal/querynode/{collection_replica_test.go => meta_replica_test.go} (90%) diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index 202b92092..179e2e83c 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -49,6 +49,7 @@ import ( // Collection is a wrapper of the underlying C-structure C.CCollection type Collection struct { + sync.RWMutex // protects colllectionPtr collectionPtr C.CCollection id UniqueID partitionIDs []UniqueID @@ -65,6 +66,7 @@ type Collection struct { releaseMu sync.RWMutex // guards release releasedPartitions map[UniqueID]struct{} releaseTime Timestamp + released bool } // ID returns collection id @@ -268,17 +270,18 @@ func (c *Collection) removeVDeltaChannel(channel Channel) { } // setReleaseTime records when collection is released -func (c *Collection) setReleaseTime(t Timestamp) { +func (c *Collection) setReleaseTime(t Timestamp, released bool) { c.releaseMu.Lock() defer c.releaseMu.Unlock() c.releaseTime = t + c.released = released } // getReleaseTime gets the time when collection is released -func (c *Collection) getReleaseTime() Timestamp { +func (c *Collection) getReleaseTime() (Timestamp, bool) { c.releaseMu.RLock() defer c.releaseMu.RUnlock() - return c.releaseTime + return c.releaseTime, c.released } // setLoadType set the loading type of collection, which is loadTypeCollection or loadTypePartition @@ -325,7 +328,7 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co log.Info("create collection", zap.Int64("collectionID", collectionID)) - newCollection.setReleaseTime(Timestamp(math.MaxUint64)) + newCollection.setReleaseTime(Timestamp(math.MaxUint64), false) return newCollection } diff --git a/internal/querynode/collection_test.go b/internal/querynode/collection_test.go index 6e6e45fe0..557788d8b 100644 --- a/internal/querynode/collection_test.go +++ b/internal/querynode/collection_test.go @@ -126,9 +126,10 @@ func TestCollection_releaseTime(t *testing.T) { collection := newCollection(collectionID, schema) t0 := Timestamp(1000) - collection.setReleaseTime(t0) - t1 := collection.getReleaseTime() + collection.setReleaseTime(t0, true) + t1, released := collection.getReleaseTime() assert.Equal(t, t0, t1) + assert.True(t, released) } func TestCollection_loadType(t *testing.T) { diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 5f70be48d..303b19ca4 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -458,6 +458,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } + // collection lock is not needed since we guarantee not query/search will be dispatch from leader for _, id := range in.SegmentIDs { err := node.historical.replica.removeSegment(id) if err != nil { diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index 334719238..757e66d68 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -385,7 +385,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) { CollectionID: defaultCollectionID, } - node.streaming.replica.(*collectionReplica).partitions = make(map[UniqueID]*Partition) + node.streaming.replica.(*metaReplica).partitions = make(map[UniqueID]*Partition) rsp, err := node.GetSegmentInfo(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode) @@ -404,7 +404,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) { CollectionID: defaultCollectionID, } - node.streaming.replica.(*collectionReplica).segments = make(map[UniqueID]*Segment) + node.streaming.replica.(*metaReplica).segments = make(map[UniqueID]*Segment) rsp, err := node.GetSegmentInfo(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode) @@ -423,7 +423,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) { CollectionID: defaultCollectionID, } - node.historical.replica.(*collectionReplica).partitions = make(map[UniqueID]*Partition) + node.historical.replica.(*metaReplica).partitions = make(map[UniqueID]*Partition) rsp, err := node.GetSegmentInfo(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode) @@ -442,7 +442,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) { CollectionID: defaultCollectionID, } - node.historical.replica.(*collectionReplica).segments = make(map[UniqueID]*Segment) + node.historical.replica.(*metaReplica).segments = make(map[UniqueID]*Segment) rsp, err := node.GetSegmentInfo(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode) diff --git a/internal/querynode/collection_replica.go b/internal/querynode/meta_replica.go similarity index 60% rename from internal/querynode/collection_replica.go rename to internal/querynode/meta_replica.go index 0f5f2c03f..fb3ab99aa 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/meta_replica.go @@ -132,7 +132,7 @@ type ReplicaInterface interface { // collectionReplica is the data replication of memory data in query node. // It implements `ReplicaInterface` interface. -type collectionReplica struct { +type metaReplica struct { mu sync.RWMutex // guards all collections map[UniqueID]*Collection partitions map[UniqueID]*Partition @@ -145,115 +145,117 @@ type collectionReplica struct { } // queryLock guards query and delete operations -func (colReplica *collectionReplica) queryLock() { - colReplica.queryMu.Lock() +func (replica *metaReplica) queryLock() { + replica.queryMu.Lock() } // queryUnlock guards query and delete segment operations -func (colReplica *collectionReplica) queryUnlock() { - colReplica.queryMu.Unlock() +func (replica *metaReplica) queryUnlock() { + replica.queryMu.Unlock() } // queryRLock guards query and delete segment operations -func (colReplica *collectionReplica) queryRLock() { - colReplica.queryMu.RLock() +func (replica *metaReplica) queryRLock() { + replica.queryMu.RLock() } // queryRUnlock guards query and delete segment operations -func (colReplica *collectionReplica) queryRUnlock() { - colReplica.queryMu.RUnlock() +func (replica *metaReplica) queryRUnlock() { + replica.queryMu.RUnlock() } // getSegmentsMemSize get the memory size in bytes of all the Segments -func (colReplica *collectionReplica) getSegmentsMemSize() int64 { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getSegmentsMemSize() int64 { + replica.mu.RLock() + defer replica.mu.RUnlock() memSize := int64(0) - for _, segment := range colReplica.segments { + for _, segment := range replica.segments { memSize += segment.getMemSize() } return memSize } // printReplica prints the collections, partitions and segments in the collectionReplica -func (colReplica *collectionReplica) printReplica() { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() +func (replica *metaReplica) printReplica() { + replica.mu.Lock() + defer replica.mu.Unlock() - log.Info("collections in collectionReplica", zap.Any("info", colReplica.collections)) - log.Info("partitions in collectionReplica", zap.Any("info", colReplica.partitions)) - log.Info("segments in collectionReplica", zap.Any("info", colReplica.segments)) - log.Info("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments)) + log.Info("collections in collectionReplica", zap.Any("info", replica.collections)) + log.Info("partitions in collectionReplica", zap.Any("info", replica.partitions)) + log.Info("segments in collectionReplica", zap.Any("info", replica.segments)) + log.Info("excludedSegments in collectionReplica", zap.Any("info", replica.excludedSegments)) } //----------------------------------------------------------------------------------------------------- collection // getCollectionIDs gets all the collection ids in the collectionReplica -func (colReplica *collectionReplica) getCollectionIDs() []UniqueID { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getCollectionIDs() []UniqueID { + replica.mu.RLock() + defer replica.mu.RUnlock() collectionIDs := make([]UniqueID, 0) - for id := range colReplica.collections { + for id := range replica.collections { collectionIDs = append(collectionIDs, id) } return collectionIDs } // addCollection creates a new collection and add it to collectionReplica -func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() +func (replica *metaReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection { + replica.mu.Lock() + defer replica.mu.Unlock() - if col, ok := colReplica.collections[collectionID]; ok { + if col, ok := replica.collections[collectionID]; ok { return col } var newCollection = newCollection(collectionID, schema) - colReplica.collections[collectionID] = newCollection - metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(colReplica.collections))) + replica.collections[collectionID] = newCollection + metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.collections))) return newCollection } // removeCollection removes the collection from collectionReplica -func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error { - colReplica.queryMu.Lock() - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - defer colReplica.queryMu.Unlock() - return colReplica.removeCollectionPrivate(collectionID) +func (replica *metaReplica) removeCollection(collectionID UniqueID) error { + replica.mu.Lock() + defer replica.mu.Unlock() + return replica.removeCollectionPrivate(collectionID) } // removeCollectionPrivate is the private function in collectionReplica, to remove collection from collectionReplica -func (colReplica *collectionReplica) removeCollectionPrivate(collectionID UniqueID) error { - collection, err := colReplica.getCollectionByIDPrivate(collectionID) +func (replica *metaReplica) removeCollectionPrivate(collectionID UniqueID) error { + collection, err := replica.getCollectionByIDPrivate(collectionID) if err != nil { return err } + // block incoming search&query + collection.Lock() + defer collection.Unlock() + // delete partitions for _, partitionID := range collection.partitionIDs { // ignore error, try to delete - _ = colReplica.removePartitionPrivate(partitionID) + _ = replica.removePartitionPrivate(partitionID, true) } deleteCollection(collection) - delete(colReplica.collections, collectionID) + delete(replica.collections, collectionID) - metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(colReplica.collections))) + metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.collections))) metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(len(collection.partitionIDs))) return nil } // getCollectionByID gets the collection which id is collectionID -func (colReplica *collectionReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.getCollectionByIDPrivate(collectionID) +func (replica *metaReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.getCollectionByIDPrivate(collectionID) } // getCollectionByIDPrivate is the private function in collectionReplica, to get collection from collectionReplica -func (colReplica *collectionReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) { - collection, ok := colReplica.collections[collectionID] +func (replica *metaReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) { + collection, ok := replica.collections[collectionID] if !ok { return nil, fmt.Errorf("collection hasn't been loaded or has been released, collection id = %d", collectionID) } @@ -262,31 +264,31 @@ func (colReplica *collectionReplica) getCollectionByIDPrivate(collectionID Uniqu } // hasCollection checks if collectionReplica has the collection which id is collectionID -func (colReplica *collectionReplica) hasCollection(collectionID UniqueID) bool { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.hasCollectionPrivate(collectionID) +func (replica *metaReplica) hasCollection(collectionID UniqueID) bool { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.hasCollectionPrivate(collectionID) } // hasCollectionPrivate is the private function in collectionReplica, to check collection in collectionReplica -func (colReplica *collectionReplica) hasCollectionPrivate(collectionID UniqueID) bool { - _, ok := colReplica.collections[collectionID] +func (replica *metaReplica) hasCollectionPrivate(collectionID UniqueID) bool { + _, ok := replica.collections[collectionID] return ok } // getCollectionNum returns num of collections in collectionReplica -func (colReplica *collectionReplica) getCollectionNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return len(colReplica.collections) +func (replica *metaReplica) getCollectionNum() int { + replica.mu.RLock() + defer replica.mu.RUnlock() + return len(replica.collections) } // getPartitionIDs returns partition ids of collection -func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() - collection, err := colReplica.getCollectionByIDPrivate(collectionID) + collection, err := replica.getCollectionByIDPrivate(collectionID) if err != nil { return nil, err } @@ -294,8 +296,8 @@ func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]U return collection.partitionIDs, nil } -func (colReplica *collectionReplica) getIndexedFieldIDByCollectionIDPrivate(collectionID UniqueID, segment *Segment) ([]FieldID, error) { - fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID) +func (replica *metaReplica) getIndexedFieldIDByCollectionIDPrivate(collectionID UniqueID, segment *Segment) ([]FieldID, error) { + fields, err := replica.getFieldsByCollectionIDPrivate(collectionID) if err != nil { return nil, err } @@ -309,8 +311,8 @@ func (colReplica *collectionReplica) getIndexedFieldIDByCollectionIDPrivate(coll return fieldIDS, nil } -func (colReplica *collectionReplica) getVecFieldIDsByCollectionIDPrivate(collectionID UniqueID) ([]FieldID, error) { - fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID) +func (replica *metaReplica) getVecFieldIDsByCollectionIDPrivate(collectionID UniqueID) ([]FieldID, error) { + fields, err := replica.getFieldsByCollectionIDPrivate(collectionID) if err != nil { return nil, err } @@ -325,19 +327,19 @@ func (colReplica *collectionReplica) getVecFieldIDsByCollectionIDPrivate(collect } // getVecFieldIDsByCollectionID returns vector field ids of collection -func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() - return colReplica.getVecFieldIDsByCollectionIDPrivate(collectionID) + return replica.getVecFieldIDsByCollectionIDPrivate(collectionID) } // getPKFieldIDsByCollectionID returns vector field ids of collection -func (colReplica *collectionReplica) getPKFieldIDByCollectionID(collectionID UniqueID) (FieldID, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getPKFieldIDByCollectionID(collectionID UniqueID) (FieldID, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() - fields, err := colReplica.getFieldsByCollectionIDPrivate(collectionID) + fields, err := replica.getFieldsByCollectionIDPrivate(collectionID) if err != nil { return common.InvalidFieldID, err } @@ -351,8 +353,8 @@ func (colReplica *collectionReplica) getPKFieldIDByCollectionID(collectionID Uni } // getFieldsByCollectionIDPrivate is the private function in collectionReplica, to return vector field ids of collection -func (colReplica *collectionReplica) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) { - collection, err := colReplica.getCollectionByIDPrivate(collectionID) +func (replica *metaReplica) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) { + collection, err := replica.getCollectionByIDPrivate(collectionID) if err != nil { return nil, err } @@ -365,28 +367,28 @@ func (colReplica *collectionReplica) getFieldsByCollectionIDPrivate(collectionID } // getSegmentInfosByColID return segments info by collectionID -func (colReplica *collectionReplica) getSegmentInfosByColID(collectionID UniqueID) ([]*querypb.SegmentInfo, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getSegmentInfosByColID(collectionID UniqueID) ([]*querypb.SegmentInfo, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() segmentInfos := make([]*querypb.SegmentInfo, 0) - collection, ok := colReplica.collections[collectionID] + collection, ok := replica.collections[collectionID] if !ok { // collection not exist, so result segmentInfos is empty return segmentInfos, nil } for _, partitionID := range collection.partitionIDs { - partition, ok := colReplica.partitions[partitionID] + partition, ok := replica.partitions[partitionID] if !ok { return nil, fmt.Errorf("the meta of collection %d and partition %d are inconsistent in QueryNode", collectionID, partitionID) } for _, segmentID := range partition.segmentIDs { - segment, ok := colReplica.segments[segmentID] + segment, ok := replica.segments[segmentID] if !ok { return nil, fmt.Errorf("the meta of partition %d and segment %d are inconsistent in QueryNode", partitionID, segmentID) } - segmentInfo := colReplica.getSegmentInfo(segment) + segmentInfo := replica.getSegmentInfo(segment) segmentInfos = append(segmentInfos, segmentInfo) } } @@ -396,73 +398,77 @@ func (colReplica *collectionReplica) getSegmentInfosByColID(collectionID UniqueI //----------------------------------------------------------------------------------------------------- partition // addPartition adds a new partition to collection -func (colReplica *collectionReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - return colReplica.addPartitionPrivate(collectionID, partitionID) +func (replica *metaReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error { + replica.mu.Lock() + defer replica.mu.Unlock() + return replica.addPartitionPrivate(collectionID, partitionID) } // addPartitionPrivate is the private function in collectionReplica, to add a new partition to collection -func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { - collection, err := colReplica.getCollectionByIDPrivate(collectionID) +func (replica *metaReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { + collection, err := replica.getCollectionByIDPrivate(collectionID) if err != nil { return err } - if !colReplica.hasPartitionPrivate(partitionID) { + if !replica.hasPartitionPrivate(partitionID) { collection.addPartitionID(partitionID) var newPartition = newPartition(collectionID, partitionID) - colReplica.partitions[partitionID] = newPartition + replica.partitions[partitionID] = newPartition } - metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(colReplica.partitions))) + metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.partitions))) return nil } // removePartition removes the partition from collectionReplica -func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error { - colReplica.queryMu.Lock() - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - defer colReplica.queryMu.Unlock() - return colReplica.removePartitionPrivate(partitionID) +func (replica *metaReplica) removePartition(partitionID UniqueID) error { + replica.mu.Lock() + defer replica.mu.Unlock() + return replica.removePartitionPrivate(partitionID, false) } // removePartitionPrivate is the private function in collectionReplica, to remove the partition from collectionReplica -func (colReplica *collectionReplica) removePartitionPrivate(partitionID UniqueID) error { - partition, err := colReplica.getPartitionByIDPrivate(partitionID) +// `locked` flag indicates whether corresponding collection lock is accquired before calling this method +func (replica *metaReplica) removePartitionPrivate(partitionID UniqueID, locked bool) error { + partition, err := replica.getPartitionByIDPrivate(partitionID) if err != nil { return err } - collection, err := colReplica.getCollectionByIDPrivate(partition.collectionID) + collection, err := replica.getCollectionByIDPrivate(partition.collectionID) if err != nil { return err } + if !locked { + collection.Lock() + defer collection.Unlock() + } + // delete segments for _, segmentID := range partition.segmentIDs { // try to delete, ignore error - _ = colReplica.removeSegmentPrivate(segmentID) + _ = replica.removeSegmentPrivate(segmentID) } collection.removePartitionID(partitionID) - delete(colReplica.partitions, partitionID) + delete(replica.partitions, partitionID) - metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(colReplica.partitions))) + metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(replica.partitions))) return nil } // getPartitionByID returns the partition which id is partitionID -func (colReplica *collectionReplica) getPartitionByID(partitionID UniqueID) (*Partition, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.getPartitionByIDPrivate(partitionID) +func (replica *metaReplica) getPartitionByID(partitionID UniqueID) (*Partition, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.getPartitionByIDPrivate(partitionID) } // getPartitionByIDPrivate is the private function in collectionReplica, to get the partition -func (colReplica *collectionReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) { - partition, ok := colReplica.partitions[partitionID] +func (replica *metaReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) { + partition, ok := replica.partitions[partitionID] if !ok { return nil, fmt.Errorf("partition %d hasn't been loaded or has been released", partitionID) } @@ -471,43 +477,43 @@ func (colReplica *collectionReplica) getPartitionByIDPrivate(partitionID UniqueI } // hasPartition returns true if collectionReplica has the partition, false otherwise -func (colReplica *collectionReplica) hasPartition(partitionID UniqueID) bool { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.hasPartitionPrivate(partitionID) +func (replica *metaReplica) hasPartition(partitionID UniqueID) bool { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.hasPartitionPrivate(partitionID) } // hasPartitionPrivate is the private function in collectionReplica, to check if collectionReplica has the partition -func (colReplica *collectionReplica) hasPartitionPrivate(partitionID UniqueID) bool { - _, ok := colReplica.partitions[partitionID] +func (replica *metaReplica) hasPartitionPrivate(partitionID UniqueID) bool { + _, ok := replica.partitions[partitionID] return ok } // getPartitionNum returns num of partitions -func (colReplica *collectionReplica) getPartitionNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return len(colReplica.partitions) +func (replica *metaReplica) getPartitionNum() int { + replica.mu.RLock() + defer replica.mu.RUnlock() + return len(replica.partitions) } // getSegmentIDs returns segment ids -func (colReplica *collectionReplica) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.getSegmentIDsPrivate(partitionID) +func (replica *metaReplica) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.getSegmentIDsPrivate(partitionID) } // getSegmentIDsByVChannel returns segment ids which virtual channel is vChannel -func (colReplica *collectionReplica) getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - segmentIDs, err := colReplica.getSegmentIDsPrivate(partitionID) +func (replica *metaReplica) getSegmentIDsByVChannel(partitionID UniqueID, vChannel Channel) ([]UniqueID, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + segmentIDs, err := replica.getSegmentIDsPrivate(partitionID) if err != nil { return nil, err } segmentIDsTmp := make([]UniqueID, 0) for _, segmentID := range segmentIDs { - segment, err := colReplica.getSegmentByIDPrivate(segmentID) + segment, err := replica.getSegmentByIDPrivate(segmentID) if err != nil { return nil, err } @@ -520,8 +526,8 @@ func (colReplica *collectionReplica) getSegmentIDsByVChannel(partitionID UniqueI } // getSegmentIDsPrivate is private function in collectionReplica, it returns segment ids -func (colReplica *collectionReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) { - partition, err2 := colReplica.getPartitionByIDPrivate(partitionID) +func (replica *metaReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) { + partition, err2 := replica.getPartitionByIDPrivate(partitionID) if err2 != nil { return nil, err2 } @@ -530,10 +536,10 @@ func (colReplica *collectionReplica) getSegmentIDsPrivate(partitionID UniqueID) //----------------------------------------------------------------------------------------------------- segment // addSegment add a new segment to collectionReplica -func (colReplica *collectionReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - collection, err := colReplica.getCollectionByIDPrivate(collectionID) +func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) error { + replica.mu.Lock() + defer replica.mu.Unlock() + collection, err := replica.getCollectionByIDPrivate(collectionID) if err != nil { return err } @@ -541,58 +547,58 @@ func (colReplica *collectionReplica) addSegment(segmentID UniqueID, partitionID if err != nil { return err } - return colReplica.addSegmentPrivate(segmentID, partitionID, seg) + return replica.addSegmentPrivate(segmentID, partitionID, seg) } // addSegmentPrivate is private function in collectionReplica, to add a new segment to collectionReplica -func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, segment *Segment) error { - partition, err := colReplica.getPartitionByIDPrivate(partitionID) +func (replica *metaReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, segment *Segment) error { + partition, err := replica.getPartitionByIDPrivate(partitionID) if err != nil { return err } - if colReplica.hasSegmentPrivate(segmentID) { + if replica.hasSegmentPrivate(segmentID) { return nil } partition.addSegmentID(segmentID) - colReplica.segments[segmentID] = segment + replica.segments[segmentID] = segment metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc() return nil } // setSegment adds a segment to collectionReplica -func (colReplica *collectionReplica) setSegment(segment *Segment) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - _, err := colReplica.getCollectionByIDPrivate(segment.collectionID) +func (replica *metaReplica) setSegment(segment *Segment) error { + replica.mu.Lock() + defer replica.mu.Unlock() + _, err := replica.getCollectionByIDPrivate(segment.collectionID) if err != nil { return err } - return colReplica.addSegmentPrivate(segment.segmentID, segment.partitionID, segment) + return replica.addSegmentPrivate(segment.segmentID, segment.partitionID, segment) } // removeSegment removes a segment from collectionReplica -func (colReplica *collectionReplica) removeSegment(segmentID UniqueID) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - return colReplica.removeSegmentPrivate(segmentID) +func (replica *metaReplica) removeSegment(segmentID UniqueID) error { + replica.mu.Lock() + defer replica.mu.Unlock() + return replica.removeSegmentPrivate(segmentID) } // removeSegmentPrivate is private function in collectionReplica, to remove a segment from collectionReplica -func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) error { - segment, err := colReplica.getSegmentByIDPrivate(segmentID) +func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID) error { + segment, err := replica.getSegmentByIDPrivate(segmentID) if err != nil { return err } - partition, err2 := colReplica.getPartitionByIDPrivate(segment.partitionID) + partition, err2 := replica.getPartitionByIDPrivate(segment.partitionID) if err2 != nil { return err } partition.removeSegmentID(segmentID) - delete(colReplica.segments, segmentID) + delete(replica.segments, segmentID) deleteSegment(segment) metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Dec() @@ -600,15 +606,15 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er } // getSegmentByID returns the segment which id is segmentID -func (colReplica *collectionReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.getSegmentByIDPrivate(segmentID) +func (replica *metaReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.getSegmentByIDPrivate(segmentID) } // getSegmentByIDPrivate is private function in collectionReplica, it returns the segment which id is segmentID -func (colReplica *collectionReplica) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) { - segment, ok := colReplica.segments[segmentID] +func (replica *metaReplica) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) { + segment, ok := replica.segments[segmentID] if !ok { return nil, fmt.Errorf("cannot find segment %d in QueryNode", segmentID) } @@ -617,33 +623,33 @@ func (colReplica *collectionReplica) getSegmentByIDPrivate(segmentID UniqueID) ( } // hasSegment returns true if collectionReplica has the segment, false otherwise -func (colReplica *collectionReplica) hasSegment(segmentID UniqueID) bool { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return colReplica.hasSegmentPrivate(segmentID) +func (replica *metaReplica) hasSegment(segmentID UniqueID) bool { + replica.mu.RLock() + defer replica.mu.RUnlock() + return replica.hasSegmentPrivate(segmentID) } // hasSegmentPrivate is private function in collectionReplica, to check if collectionReplica has the segment -func (colReplica *collectionReplica) hasSegmentPrivate(segmentID UniqueID) bool { - _, ok := colReplica.segments[segmentID] +func (replica *metaReplica) hasSegmentPrivate(segmentID UniqueID) bool { + _, ok := replica.segments[segmentID] return ok } // getSegmentNum returns num of segments in collectionReplica -func (colReplica *collectionReplica) getSegmentNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - return len(colReplica.segments) +func (replica *metaReplica) getSegmentNum() int { + replica.mu.RLock() + defer replica.mu.RUnlock() + return len(replica.segments) } // getSegmentStatistics returns the statistics of segments in collectionReplica -func (colReplica *collectionReplica) getSegmentStatistics() []*internalpb.SegmentStats { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getSegmentStatistics() []*internalpb.SegmentStats { + replica.mu.RLock() + defer replica.mu.RUnlock() var statisticData = make([]*internalpb.SegmentStats, 0) - for segmentID, segment := range colReplica.segments { + for segmentID, segment := range replica.segments { currentMemSize := segment.getMemSize() segment.lastMemSize = currentMemSize segmentNumOfRows := segment.getRowCount() @@ -663,51 +669,49 @@ func (colReplica *collectionReplica) getSegmentStatistics() []*internalpb.Segmen } // removeExcludedSegments will remove excludedSegments from collectionReplica -func (colReplica *collectionReplica) removeExcludedSegments(collectionID UniqueID) { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() +func (replica *metaReplica) removeExcludedSegments(collectionID UniqueID) { + replica.mu.Lock() + defer replica.mu.Unlock() - delete(colReplica.excludedSegments, collectionID) + delete(replica.excludedSegments, collectionID) } // addExcludedSegments will add excludedSegments to collectionReplica -func (colReplica *collectionReplica) addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() +func (replica *metaReplica) addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) { + replica.mu.Lock() + defer replica.mu.Unlock() - if _, ok := colReplica.excludedSegments[collectionID]; !ok { - colReplica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0) + if _, ok := replica.excludedSegments[collectionID]; !ok { + replica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0) } - colReplica.excludedSegments[collectionID] = append(colReplica.excludedSegments[collectionID], segmentInfos...) + replica.excludedSegments[collectionID] = append(replica.excludedSegments[collectionID], segmentInfos...) } // getExcludedSegments returns excludedSegments of collectionReplica -func (colReplica *collectionReplica) getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() +func (replica *metaReplica) getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() - if _, ok := colReplica.excludedSegments[collectionID]; !ok { + if _, ok := replica.excludedSegments[collectionID]; !ok { return nil, errors.New("getExcludedSegments failed, cannot found collection, id =" + fmt.Sprintln(collectionID)) } - return colReplica.excludedSegments[collectionID], nil + return replica.excludedSegments[collectionID], nil } // freeAll will free all meta info from collectionReplica -func (colReplica *collectionReplica) freeAll() { - colReplica.queryMu.Lock() // wait for current search/query finish - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - defer colReplica.queryMu.Unlock() +func (replica *metaReplica) freeAll() { + replica.mu.Lock() + defer replica.mu.Unlock() - for id := range colReplica.collections { - _ = colReplica.removeCollectionPrivate(id) + for id := range replica.collections { + _ = replica.removeCollectionPrivate(id) } - colReplica.collections = make(map[UniqueID]*Collection) - colReplica.partitions = make(map[UniqueID]*Partition) - colReplica.segments = make(map[UniqueID]*Segment) + replica.collections = make(map[UniqueID]*Collection) + replica.partitions = make(map[UniqueID]*Partition) + replica.segments = make(map[UniqueID]*Segment) } // newCollectionReplica returns a new ReplicaInterface @@ -717,7 +721,7 @@ func newCollectionReplica(etcdKv *etcdkv.EtcdKV) ReplicaInterface { segments := make(map[UniqueID]*Segment) excludedSegments := make(map[UniqueID][]*datapb.SegmentInfo) - var replica ReplicaInterface = &collectionReplica{ + var replica ReplicaInterface = &metaReplica{ collections: collections, partitions: partitions, segments: segments, @@ -730,12 +734,12 @@ func newCollectionReplica(etcdKv *etcdkv.EtcdKV) ReplicaInterface { } // trans segment to queryPb.segmentInfo -func (colReplica *collectionReplica) getSegmentInfo(segment *Segment) *querypb.SegmentInfo { +func (replica *metaReplica) getSegmentInfo(segment *Segment) *querypb.SegmentInfo { var indexName string var indexID int64 var indexInfos []*querypb.FieldIndexInfo // TODO:: segment has multi vec column - indexedFieldIDs, _ := colReplica.getIndexedFieldIDByCollectionIDPrivate(segment.collectionID, segment) + indexedFieldIDs, _ := replica.getIndexedFieldIDByCollectionIDPrivate(segment.collectionID, segment) for _, fieldID := range indexedFieldIDs { fieldInfo, err := segment.getIndexedFieldInfo(fieldID) if err == nil { diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/meta_replica_test.go similarity index 90% rename from internal/querynode/collection_replica_test.go rename to internal/querynode/meta_replica_test.go index 7ee73d479..1ae174b44 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/meta_replica_test.go @@ -26,7 +26,7 @@ import ( ) //----------------------------------------------------------------------------------------------------- collection -func TestCollectionReplica_getCollectionNum(t *testing.T) { +func TestMetaReplica_getCollectionNum(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) assert.Equal(t, node.historical.replica.getCollectionNum(), 1) @@ -34,14 +34,14 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_addCollection(t *testing.T) { +func TestMetaReplica_addCollection(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) err := node.Stop() assert.NoError(t, err) } -func TestCollectionReplica_removeCollection(t *testing.T) { +func TestMetaReplica_removeCollection(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) assert.Equal(t, node.historical.replica.getCollectionNum(), 1) @@ -53,7 +53,7 @@ func TestCollectionReplica_removeCollection(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_getCollectionByID(t *testing.T) { +func TestMetaReplica_getCollectionByID(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -65,7 +65,7 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_hasCollection(t *testing.T) { +func TestMetaReplica_hasCollection(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -80,7 +80,7 @@ func TestCollectionReplica_hasCollection(t *testing.T) { } //----------------------------------------------------------------------------------------------------- partition -func TestCollectionReplica_getPartitionNum(t *testing.T) { +func TestMetaReplica_getPartitionNum(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -100,7 +100,7 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_addPartition(t *testing.T) { +func TestMetaReplica_addPartition(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -117,7 +117,7 @@ func TestCollectionReplica_addPartition(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_removePartition(t *testing.T) { +func TestMetaReplica_removePartition(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -137,7 +137,7 @@ func TestCollectionReplica_removePartition(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_getPartitionByTag(t *testing.T) { +func TestMetaReplica_getPartitionByTag(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -157,7 +157,7 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_hasPartition(t *testing.T) { +func TestMetaReplica_hasPartition(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -175,7 +175,7 @@ func TestCollectionReplica_hasPartition(t *testing.T) { } //----------------------------------------------------------------------------------------------------- segment -func TestCollectionReplica_addSegment(t *testing.T) { +func TestMetaReplica_addSegment(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -193,7 +193,7 @@ func TestCollectionReplica_addSegment(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_removeSegment(t *testing.T) { +func TestMetaReplica_removeSegment(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -214,7 +214,7 @@ func TestCollectionReplica_removeSegment(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_getSegmentByID(t *testing.T) { +func TestMetaReplica_getSegmentByID(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -233,7 +233,7 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_getSegmentInfosByColID(t *testing.T) { +func TestMetaReplica_getSegmentInfosByColID(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) pkType := schemapb.DataType_Int64 @@ -282,7 +282,7 @@ func TestCollectionReplica_getSegmentInfosByColID(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_hasSegment(t *testing.T) { +func TestMetaReplica_hasSegment(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -305,7 +305,7 @@ func TestCollectionReplica_hasSegment(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_freeAll(t *testing.T) { +func TestMetaReplica_freeAll(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) @@ -314,7 +314,7 @@ func TestCollectionReplica_freeAll(t *testing.T) { assert.NoError(t, err) } -func TestCollectionReplica_statistic(t *testing.T) { +func TestMetaReplica_statistic(t *testing.T) { t.Run("test getCollectionIDs", func(t *testing.T) { replica, err := genSimpleReplica() assert.NoError(t, err) diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 76b9eae71..300f8cb6a 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -495,7 +495,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { return err } guaranteeTs := msg.GuaranteeTs() - if guaranteeTs >= collection.getReleaseTime() { + if releaseTs, _ := collection.getReleaseTime(); guaranteeTs >= releaseTs { err = fmt.Errorf("retrieve failed, collection has been released, msgID = %d, collectionID = %d", msg.ID(), collectionID) publishErr := q.publishFailedQueryResult(msg, err.Error()) if publishErr != nil { diff --git a/internal/querynode/query_shard.go b/internal/querynode/query_shard.go index ca6cec128..8a07a1ea8 100644 --- a/internal/querynode/query_shard.go +++ b/internal/querynode/query_shard.go @@ -46,6 +46,7 @@ type queryShard struct { cancel context.CancelFunc collectionID UniqueID + collection *Collection // quick reference from meta channel Channel deltaChannel Channel replicaID int64 @@ -104,6 +105,7 @@ func newQueryShard( ctx: ctx, cancel: cancel, collectionID: collectionID, + collection: collection, channel: channel, replicaID: replicaID, clusterService: clusterService, @@ -237,7 +239,7 @@ func (q *queryShard) getNewTSafe(tp tsType) (Timestamp, error) { return t, nil } -func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Timestamp, tp tsType) { +func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Timestamp, tp tsType) error { st := q.getServiceableTime(tp) log.Debug("serviceable check start", zap.String("tsType", tp.String()), zap.Uint64("guarantee ts", guaranteeTs), zap.Uint64("serviceable ts", st), zap.String("channel", q.channel)) serviceable := func() bool { @@ -252,12 +254,16 @@ func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Times q.watcherCond.Wait() if err := ctx.Err(); err != nil { log.Warn("waitUntilServiceable timeout", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel)) - // TODO: implement timeout logic - return + return ctx.Err() + } + + if _, released := q.collection.getReleaseTime(); released { + return fmt.Errorf("collection %d is released before timestamp serviceable", q.collectionID) } st = q.getServiceableTime(tp) } log.Debug("wait serviceable ts done", zap.String("tsType", tp.String()), zap.Uint64("guarantee ts", guaranteeTs), zap.Uint64("serviceable ts", st), zap.String("channel", q.channel)) + return nil } func (q *queryShard) getServiceableTime(tp tsType) Timestamp { @@ -313,22 +319,15 @@ func (q *queryShard) search(ctx context.Context, req *querypb.SearchRequest) (*i return nil, errors.New("search context timeout") } - // lock historic meta-replica - q.historical.replica.queryRLock() - defer q.historical.replica.queryRUnlock() - - // lock streaming meta-replica for shard leader - if req.IsShardLeader { - q.streaming.replica.queryRLock() - defer q.streaming.replica.queryRUnlock() - } - - // check if collection has been released - collection, err := q.historical.replica.getCollectionByID(collectionID) + // check if collection has been released, check streaming since it's released first + _, err := q.streaming.replica.getCollectionByID(collectionID) if err != nil { return nil, err } - if req.GetReq().GetGuaranteeTimestamp() >= collection.getReleaseTime() { + + q.collection.RLock() // locks the collectionPtr + defer q.collection.RUnlock() + if _, released := q.collection.getReleaseTime(); released { log.Warn("collection release before search", zap.Int64("collectionID", collectionID)) return nil, fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", collectionID) } @@ -337,20 +336,20 @@ func (q *queryShard) search(ctx context.Context, req *querypb.SearchRequest) (*i var plan *SearchPlan if req.Req.GetDslType() == commonpb.DslType_BoolExprV1 { expr := req.Req.SerializedExprPlan - plan, err = createSearchPlanByExpr(collection, expr) + plan, err = createSearchPlanByExpr(q.collection, expr) if err != nil { return nil, err } } else { dsl := req.Req.Dsl - plan, err = createSearchPlan(collection, dsl) + plan, err = createSearchPlan(q.collection, dsl) if err != nil { return nil, err } } defer plan.delete() - schemaHelper, err := typeutil.CreateSchemaHelper(collection.schema) + schemaHelper, err := typeutil.CreateSchemaHelper(q.collection.schema) if err != nil { return nil, err } @@ -412,8 +411,16 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques go func() { defer wg.Done() + guaranteeTs := req.GetReq().GetGuaranteeTimestamp() - q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) // wait until guarantee timestamp >= service timestamp + tsErr := q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) // wait until guarantee timestamp >= service timestamp + if tsErr != nil { + err = tsErr + log.Warn("failed to wait serviceable ts", zap.Error(err)) + cancel() + return + } + // shard leader queries its own streaming data // TODO add context sResults, _, _, sErr := q.streaming.search(searchRequests, collectionID, partitionIDs, req.DmlChannel, plan, timestamp) @@ -520,10 +527,14 @@ func (q *queryShard) searchFollower(ctx context.Context, req *querypb.SearchRequ segmentIDs := req.GetSegmentIDs() // hold request until guarantee timestamp >= service timestamp guaranteeTs := req.GetReq().GetGuaranteeTimestamp() - q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta) + err := q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta) + if err != nil { + log.Warn("failed to wati serviceable ts", zap.Error(err)) + return nil, err + } // validate segmentIDs in request - err := q.historical.validateSegmentIDs(segmentIDs, collectionID, partitionIDs) + err = q.historical.validateSegmentIDs(segmentIDs, collectionID, partitionIDs) if err != nil { log.Warn("segmentIDs in search request fails validation", zap.Int64s("segmentIDs", segmentIDs)) return nil, err @@ -710,23 +721,16 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int return nil, errors.New("search context timeout") } - // lock historic meta-replica - q.historical.replica.queryRLock() - defer q.historical.replica.queryRUnlock() - - // lock streaming meta-replica for shard leader - if req.IsShardLeader { - q.streaming.replica.queryRLock() - defer q.streaming.replica.queryRUnlock() - } - - // check if collection has been released + // check if collection has been released, check streaming since it's released first collection, err := q.streaming.replica.getCollectionByID(collectionID) if err != nil { return nil, err } - if req.GetReq().GetGuaranteeTimestamp() >= collection.getReleaseTime() { + q.collection.RLock() + defer q.collection.RUnlock() + + if _, released := q.collection.getReleaseTime(); released { log.Warn("collection release before query", zap.Int64("collectionID", collectionID)) return nil, fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", collectionID) } @@ -774,7 +778,16 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int defer wg.Done() // hold request until guarantee timestamp >= service timestamp guaranteeTs := req.GetReq().GetGuaranteeTimestamp() - q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) + tsErr := q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) + if tsErr != nil { + err = tsErr + log.Warn("failed to wait serviceable ts", zap.Error(err)) + cancel() + return + } + + q.collection.RLock() + defer q.collection.RUnlock() // shard leader queries its own streaming data // TODO add context sResults, _, _, sErr := q.streaming.retrieve(collectionID, partitionIDs, plan, func(segment *Segment) bool { return segment.vChannelID == q.channel }) @@ -816,7 +829,14 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int // hold request until guarantee timestamp >= service timestamp guaranteeTs := req.GetReq().GetGuaranteeTimestamp() - q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta) + err = q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta) + if err != nil { + log.Warn("failed to wait servicable ts", zap.Error(err)) + return nil, err + } + + q.collection.RLock() + defer q.collection.RUnlock() // validate segmentIDs in request err = q.historical.validateSegmentIDs(segmentIDs, collectionID, partitionIDs) diff --git a/internal/querynode/query_shard_test.go b/internal/querynode/query_shard_test.go index b49e81851..bdc00d9cf 100644 --- a/internal/querynode/query_shard_test.go +++ b/internal/querynode/query_shard_test.go @@ -20,7 +20,9 @@ import ( "context" "sync" "testing" + "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -160,6 +162,56 @@ func TestQueryShard_Search(t *testing.T) { _, err = qs.search(context.Background(), request) assert.NoError(t, err) }) + + t.Run("search timeout", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + r := proto.Clone(req).(*internalpb.SearchRequest) + r.GuaranteeTimestamp = Timestamp(100) + request := &querypb.SearchRequest{ + Req: r, + IsShardLeader: true, + DmlChannel: defaultDMLChannel, + SegmentIDs: []int64{}, + } + + _, err = qs.search(ctx, request) + assert.Error(t, err) + }) + + t.Run("search wait timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + r := proto.Clone(req).(*internalpb.SearchRequest) + r.GuaranteeTimestamp = Timestamp(100) + request := &querypb.SearchRequest{ + Req: r, + IsShardLeader: true, + DmlChannel: defaultDMLChannel, + SegmentIDs: []int64{}, + } + + _, err = qs.search(ctx, request) + assert.Error(t, err) + }) + + t.Run("search collection released", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + r := proto.Clone(req).(*internalpb.SearchRequest) + r.GuaranteeTimestamp = Timestamp(100) + request := &querypb.SearchRequest{ + Req: r, + IsShardLeader: true, + DmlChannel: defaultDMLChannel, + SegmentIDs: []int64{}, + } + + qs.collection.setReleaseTime(100, true) + + _, err = qs.search(ctx, request) + assert.Error(t, err) + }) } func TestQueryShard_Query(t *testing.T) { @@ -207,6 +259,56 @@ func TestQueryShard_Query(t *testing.T) { _, err := qs.query(context.Background(), request) assert.NoError(t, err) }) + + t.Run("query timeout", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + r := proto.Clone(req).(*internalpb.RetrieveRequest) + r.GuaranteeTimestamp = Timestamp(100) + request := &querypb.QueryRequest{ + Req: r, + IsShardLeader: true, + DmlChannel: defaultDMLChannel, + SegmentIDs: []int64{}, + } + + _, err = qs.query(ctx, request) + assert.Error(t, err) + }) + + t.Run("query wait timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + r := proto.Clone(req).(*internalpb.RetrieveRequest) + r.GuaranteeTimestamp = Timestamp(100) + request := &querypb.QueryRequest{ + Req: r, + IsShardLeader: true, + DmlChannel: defaultDMLChannel, + SegmentIDs: []int64{}, + } + + _, err = qs.query(ctx, request) + assert.Error(t, err) + }) + + t.Run("query collection released", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + r := proto.Clone(req).(*internalpb.RetrieveRequest) + r.GuaranteeTimestamp = Timestamp(100) + request := &querypb.QueryRequest{ + Req: r, + IsShardLeader: true, + DmlChannel: defaultDMLChannel, + SegmentIDs: []int64{}, + } + qs.collection.setReleaseTime(100, true) + + _, err = qs.query(ctx, request) + assert.Error(t, err) + }) + } func TestQueryShard_waitNewTSafe(t *testing.T) { @@ -230,9 +332,29 @@ func TestQueryShard_WaitUntilServiceable(t *testing.T) { qs, err := genSimpleQueryShard(context.Background()) assert.NoError(t, err) - err = updateQueryShardTSafe(qs, 1000) - assert.NoError(t, err) - qs.waitUntilServiceable(context.Background(), 1000, tsTypeDML) + t.Run("normal success", func(t *testing.T) { + err = updateQueryShardTSafe(qs, 1000) + require.NoError(t, err) + + err = qs.waitUntilServiceable(context.Background(), 1000, tsTypeDML) + assert.NoError(t, err) + }) + + t.Run("context timeout", func(t *testing.T) { + err = updateQueryShardTSafe(qs, 1000) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err = qs.waitUntilServiceable(ctx, 1001, tsTypeDML) + assert.Error(t, err) + }) + + t.Run("collection released", func(t *testing.T) { + qs.collection.setReleaseTime(1000, true) + err = qs.waitUntilServiceable(context.Background(), 1001, tsTypeDML) + assert.Error(t, err) + }) } func genSearchResultData(nq int64, topk int64, ids []int64, scores []float32, topks []int64) *schemapb.SearchResultData { diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 82715d9e6..babd8e361 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -723,18 +723,13 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { } func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replicaType ReplicaType) error { - // block search/query operation - replica.queryLock() - collection, err := replica.getCollectionByID(r.req.CollectionID) if err != nil { - replica.queryUnlock() return err } // set release time log.Info("set release time", zap.Any("collectionID", r.req.CollectionID)) - collection.setReleaseTime(r.req.Base.Timestamp) - replica.queryUnlock() + collection.setReleaseTime(r.req.Base.Timestamp, true) // remove all flow graphs of the target collection var channels []Channel diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 1f98e4fe8..726a15dac 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -93,3 +93,10 @@ func (ts *tSafe) set(t Timestamp) { // zap.Any("channel", ts.channel), // zap.Any("t", m.t)) } + +// close calls the close method of internal watcher if any +func (ts *tSafe) close() { + if ts.watcher != nil { + ts.watcher.close() + } +} diff --git a/internal/querynode/tsafe_replica.go b/internal/querynode/tsafe_replica.go index cccb6c3c1..bc0bc3314 100644 --- a/internal/querynode/tsafe_replica.go +++ b/internal/querynode/tsafe_replica.go @@ -94,6 +94,10 @@ func (t *tSafeReplica) removeTSafe(vChannel Channel) { log.Info("remove tSafe replica", zap.String("vChannel", vChannel), ) + tsafe, ok := t.tSafes[vChannel] + if ok { + tsafe.close() + } delete(t.tSafes, vChannel) } -- GitLab