未验证 提交 94f0951f 编写于 作者: C congqixia 提交者: GitHub

Fix query lock logic in query shard (#17034)

Previously query shard locks the querylock in collectionReplica before any search/query
The lock range is too large and easy to cause dead lock

This PR makes following changes:
- Rename collectionReplica to metaReplica which is more reasonable
- Make release collection operation cancels waiting search/query request
- Reduce the queryLock to collection level
- Add some unit tests for timeout & released case
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 59bc0a70
......@@ -49,6 +49,7 @@ import (
// Collection is a wrapper of the underlying C-structure C.CCollection
type Collection struct {
sync.RWMutex // protects colllectionPtr
collectionPtr C.CCollection
id UniqueID
partitionIDs []UniqueID
......@@ -65,6 +66,7 @@ type Collection struct {
releaseMu sync.RWMutex // guards release
releasedPartitions map[UniqueID]struct{}
releaseTime Timestamp
released bool
}
// ID returns collection id
......@@ -268,17 +270,18 @@ func (c *Collection) removeVDeltaChannel(channel Channel) {
}
// setReleaseTime records when collection is released
func (c *Collection) setReleaseTime(t Timestamp) {
func (c *Collection) setReleaseTime(t Timestamp, released bool) {
c.releaseMu.Lock()
defer c.releaseMu.Unlock()
c.releaseTime = t
c.released = released
}
// getReleaseTime gets the time when collection is released
func (c *Collection) getReleaseTime() Timestamp {
func (c *Collection) getReleaseTime() (Timestamp, bool) {
c.releaseMu.RLock()
defer c.releaseMu.RUnlock()
return c.releaseTime
return c.releaseTime, c.released
}
// setLoadType set the loading type of collection, which is loadTypeCollection or loadTypePartition
......@@ -325,7 +328,7 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co
log.Info("create collection", zap.Int64("collectionID", collectionID))
newCollection.setReleaseTime(Timestamp(math.MaxUint64))
newCollection.setReleaseTime(Timestamp(math.MaxUint64), false)
return newCollection
}
......
......@@ -126,9 +126,10 @@ func TestCollection_releaseTime(t *testing.T) {
collection := newCollection(collectionID, schema)
t0 := Timestamp(1000)
collection.setReleaseTime(t0)
t1 := collection.getReleaseTime()
collection.setReleaseTime(t0, true)
t1, released := collection.getReleaseTime()
assert.Equal(t, t0, t1)
assert.True(t, released)
}
func TestCollection_loadType(t *testing.T) {
......
......@@ -458,6 +458,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
// collection lock is not needed since we guarantee not query/search will be dispatch from leader
for _, id := range in.SegmentIDs {
err := node.historical.replica.removeSegment(id)
if err != nil {
......
......@@ -385,7 +385,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
CollectionID: defaultCollectionID,
}
node.streaming.replica.(*collectionReplica).partitions = make(map[UniqueID]*Partition)
node.streaming.replica.(*metaReplica).partitions = make(map[UniqueID]*Partition)
rsp, err := node.GetSegmentInfo(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
......@@ -404,7 +404,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
CollectionID: defaultCollectionID,
}
node.streaming.replica.(*collectionReplica).segments = make(map[UniqueID]*Segment)
node.streaming.replica.(*metaReplica).segments = make(map[UniqueID]*Segment)
rsp, err := node.GetSegmentInfo(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
......@@ -423,7 +423,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
CollectionID: defaultCollectionID,
}
node.historical.replica.(*collectionReplica).partitions = make(map[UniqueID]*Partition)
node.historical.replica.(*metaReplica).partitions = make(map[UniqueID]*Partition)
rsp, err := node.GetSegmentInfo(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
......@@ -442,7 +442,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
CollectionID: defaultCollectionID,
}
node.historical.replica.(*collectionReplica).segments = make(map[UniqueID]*Segment)
node.historical.replica.(*metaReplica).segments = make(map[UniqueID]*Segment)
rsp, err := node.GetSegmentInfo(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
......
......@@ -26,7 +26,7 @@ import (
)
//----------------------------------------------------------------------------------------------------- collection
func TestCollectionReplica_getCollectionNum(t *testing.T) {
func TestMetaReplica_getCollectionNum(t *testing.T) {
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
assert.Equal(t, node.historical.replica.getCollectionNum(), 1)
......@@ -34,14 +34,14 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_addCollection(t *testing.T) {
func TestMetaReplica_addCollection(t *testing.T) {
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
err := node.Stop()
assert.NoError(t, err)
}
func TestCollectionReplica_removeCollection(t *testing.T) {
func TestMetaReplica_removeCollection(t *testing.T) {
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
assert.Equal(t, node.historical.replica.getCollectionNum(), 1)
......@@ -53,7 +53,7 @@ func TestCollectionReplica_removeCollection(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_getCollectionByID(t *testing.T) {
func TestMetaReplica_getCollectionByID(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -65,7 +65,7 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_hasCollection(t *testing.T) {
func TestMetaReplica_hasCollection(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -80,7 +80,7 @@ func TestCollectionReplica_hasCollection(t *testing.T) {
}
//----------------------------------------------------------------------------------------------------- partition
func TestCollectionReplica_getPartitionNum(t *testing.T) {
func TestMetaReplica_getPartitionNum(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -100,7 +100,7 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_addPartition(t *testing.T) {
func TestMetaReplica_addPartition(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -117,7 +117,7 @@ func TestCollectionReplica_addPartition(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_removePartition(t *testing.T) {
func TestMetaReplica_removePartition(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -137,7 +137,7 @@ func TestCollectionReplica_removePartition(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_getPartitionByTag(t *testing.T) {
func TestMetaReplica_getPartitionByTag(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -157,7 +157,7 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_hasPartition(t *testing.T) {
func TestMetaReplica_hasPartition(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -175,7 +175,7 @@ func TestCollectionReplica_hasPartition(t *testing.T) {
}
//----------------------------------------------------------------------------------------------------- segment
func TestCollectionReplica_addSegment(t *testing.T) {
func TestMetaReplica_addSegment(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -193,7 +193,7 @@ func TestCollectionReplica_addSegment(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_removeSegment(t *testing.T) {
func TestMetaReplica_removeSegment(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -214,7 +214,7 @@ func TestCollectionReplica_removeSegment(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_getSegmentByID(t *testing.T) {
func TestMetaReplica_getSegmentByID(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -233,7 +233,7 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_getSegmentInfosByColID(t *testing.T) {
func TestMetaReplica_getSegmentInfosByColID(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
pkType := schemapb.DataType_Int64
......@@ -282,7 +282,7 @@ func TestCollectionReplica_getSegmentInfosByColID(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_hasSegment(t *testing.T) {
func TestMetaReplica_hasSegment(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -305,7 +305,7 @@ func TestCollectionReplica_hasSegment(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_freeAll(t *testing.T) {
func TestMetaReplica_freeAll(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
initTestMeta(t, node, collectionID, 0)
......@@ -314,7 +314,7 @@ func TestCollectionReplica_freeAll(t *testing.T) {
assert.NoError(t, err)
}
func TestCollectionReplica_statistic(t *testing.T) {
func TestMetaReplica_statistic(t *testing.T) {
t.Run("test getCollectionIDs", func(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
......
......@@ -495,7 +495,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
return err
}
guaranteeTs := msg.GuaranteeTs()
if guaranteeTs >= collection.getReleaseTime() {
if releaseTs, _ := collection.getReleaseTime(); guaranteeTs >= releaseTs {
err = fmt.Errorf("retrieve failed, collection has been released, msgID = %d, collectionID = %d", msg.ID(), collectionID)
publishErr := q.publishFailedQueryResult(msg, err.Error())
if publishErr != nil {
......
......@@ -46,6 +46,7 @@ type queryShard struct {
cancel context.CancelFunc
collectionID UniqueID
collection *Collection // quick reference from meta
channel Channel
deltaChannel Channel
replicaID int64
......@@ -104,6 +105,7 @@ func newQueryShard(
ctx: ctx,
cancel: cancel,
collectionID: collectionID,
collection: collection,
channel: channel,
replicaID: replicaID,
clusterService: clusterService,
......@@ -237,7 +239,7 @@ func (q *queryShard) getNewTSafe(tp tsType) (Timestamp, error) {
return t, nil
}
func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Timestamp, tp tsType) {
func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Timestamp, tp tsType) error {
st := q.getServiceableTime(tp)
log.Debug("serviceable check start", zap.String("tsType", tp.String()), zap.Uint64("guarantee ts", guaranteeTs), zap.Uint64("serviceable ts", st), zap.String("channel", q.channel))
serviceable := func() bool {
......@@ -252,12 +254,16 @@ func (q *queryShard) waitUntilServiceable(ctx context.Context, guaranteeTs Times
q.watcherCond.Wait()
if err := ctx.Err(); err != nil {
log.Warn("waitUntilServiceable timeout", zap.Uint64("serviceable ts", st), zap.Uint64("guarantee ts", guaranteeTs), zap.String("channel", q.channel))
// TODO: implement timeout logic
return
return ctx.Err()
}
if _, released := q.collection.getReleaseTime(); released {
return fmt.Errorf("collection %d is released before timestamp serviceable", q.collectionID)
}
st = q.getServiceableTime(tp)
}
log.Debug("wait serviceable ts done", zap.String("tsType", tp.String()), zap.Uint64("guarantee ts", guaranteeTs), zap.Uint64("serviceable ts", st), zap.String("channel", q.channel))
return nil
}
func (q *queryShard) getServiceableTime(tp tsType) Timestamp {
......@@ -313,22 +319,15 @@ func (q *queryShard) search(ctx context.Context, req *querypb.SearchRequest) (*i
return nil, errors.New("search context timeout")
}
// lock historic meta-replica
q.historical.replica.queryRLock()
defer q.historical.replica.queryRUnlock()
// lock streaming meta-replica for shard leader
if req.IsShardLeader {
q.streaming.replica.queryRLock()
defer q.streaming.replica.queryRUnlock()
}
// check if collection has been released
collection, err := q.historical.replica.getCollectionByID(collectionID)
// check if collection has been released, check streaming since it's released first
_, err := q.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return nil, err
}
if req.GetReq().GetGuaranteeTimestamp() >= collection.getReleaseTime() {
q.collection.RLock() // locks the collectionPtr
defer q.collection.RUnlock()
if _, released := q.collection.getReleaseTime(); released {
log.Warn("collection release before search", zap.Int64("collectionID", collectionID))
return nil, fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", collectionID)
}
......@@ -337,20 +336,20 @@ func (q *queryShard) search(ctx context.Context, req *querypb.SearchRequest) (*i
var plan *SearchPlan
if req.Req.GetDslType() == commonpb.DslType_BoolExprV1 {
expr := req.Req.SerializedExprPlan
plan, err = createSearchPlanByExpr(collection, expr)
plan, err = createSearchPlanByExpr(q.collection, expr)
if err != nil {
return nil, err
}
} else {
dsl := req.Req.Dsl
plan, err = createSearchPlan(collection, dsl)
plan, err = createSearchPlan(q.collection, dsl)
if err != nil {
return nil, err
}
}
defer plan.delete()
schemaHelper, err := typeutil.CreateSchemaHelper(collection.schema)
schemaHelper, err := typeutil.CreateSchemaHelper(q.collection.schema)
if err != nil {
return nil, err
}
......@@ -412,8 +411,16 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques
go func() {
defer wg.Done()
guaranteeTs := req.GetReq().GetGuaranteeTimestamp()
q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) // wait until guarantee timestamp >= service timestamp
tsErr := q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML) // wait until guarantee timestamp >= service timestamp
if tsErr != nil {
err = tsErr
log.Warn("failed to wait serviceable ts", zap.Error(err))
cancel()
return
}
// shard leader queries its own streaming data
// TODO add context
sResults, _, _, sErr := q.streaming.search(searchRequests, collectionID, partitionIDs, req.DmlChannel, plan, timestamp)
......@@ -520,10 +527,14 @@ func (q *queryShard) searchFollower(ctx context.Context, req *querypb.SearchRequ
segmentIDs := req.GetSegmentIDs()
// hold request until guarantee timestamp >= service timestamp
guaranteeTs := req.GetReq().GetGuaranteeTimestamp()
q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta)
err := q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta)
if err != nil {
log.Warn("failed to wati serviceable ts", zap.Error(err))
return nil, err
}
// validate segmentIDs in request
err := q.historical.validateSegmentIDs(segmentIDs, collectionID, partitionIDs)
err = q.historical.validateSegmentIDs(segmentIDs, collectionID, partitionIDs)
if err != nil {
log.Warn("segmentIDs in search request fails validation", zap.Int64s("segmentIDs", segmentIDs))
return nil, err
......@@ -710,23 +721,16 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int
return nil, errors.New("search context timeout")
}
// lock historic meta-replica
q.historical.replica.queryRLock()
defer q.historical.replica.queryRUnlock()
// lock streaming meta-replica for shard leader
if req.IsShardLeader {
q.streaming.replica.queryRLock()
defer q.streaming.replica.queryRUnlock()
}
// check if collection has been released
// check if collection has been released, check streaming since it's released first
collection, err := q.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return nil, err
}
if req.GetReq().GetGuaranteeTimestamp() >= collection.getReleaseTime() {
q.collection.RLock()
defer q.collection.RUnlock()
if _, released := q.collection.getReleaseTime(); released {
log.Warn("collection release before query", zap.Int64("collectionID", collectionID))
return nil, fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", collectionID)
}
......@@ -774,7 +778,16 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int
defer wg.Done()
// hold request until guarantee timestamp >= service timestamp
guaranteeTs := req.GetReq().GetGuaranteeTimestamp()
q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML)
tsErr := q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDML)
if tsErr != nil {
err = tsErr
log.Warn("failed to wait serviceable ts", zap.Error(err))
cancel()
return
}
q.collection.RLock()
defer q.collection.RUnlock()
// shard leader queries its own streaming data
// TODO add context
sResults, _, _, sErr := q.streaming.retrieve(collectionID, partitionIDs, plan, func(segment *Segment) bool { return segment.vChannelID == q.channel })
......@@ -816,7 +829,14 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int
// hold request until guarantee timestamp >= service timestamp
guaranteeTs := req.GetReq().GetGuaranteeTimestamp()
q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta)
err = q.waitUntilServiceable(ctx, guaranteeTs, tsTypeDelta)
if err != nil {
log.Warn("failed to wait servicable ts", zap.Error(err))
return nil, err
}
q.collection.RLock()
defer q.collection.RUnlock()
// validate segmentIDs in request
err = q.historical.validateSegmentIDs(segmentIDs, collectionID, partitionIDs)
......
......@@ -20,7 +20,9 @@ import (
"context"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
......@@ -160,6 +162,56 @@ func TestQueryShard_Search(t *testing.T) {
_, err = qs.search(context.Background(), request)
assert.NoError(t, err)
})
t.Run("search timeout", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
r := proto.Clone(req).(*internalpb.SearchRequest)
r.GuaranteeTimestamp = Timestamp(100)
request := &querypb.SearchRequest{
Req: r,
IsShardLeader: true,
DmlChannel: defaultDMLChannel,
SegmentIDs: []int64{},
}
_, err = qs.search(ctx, request)
assert.Error(t, err)
})
t.Run("search wait timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r := proto.Clone(req).(*internalpb.SearchRequest)
r.GuaranteeTimestamp = Timestamp(100)
request := &querypb.SearchRequest{
Req: r,
IsShardLeader: true,
DmlChannel: defaultDMLChannel,
SegmentIDs: []int64{},
}
_, err = qs.search(ctx, request)
assert.Error(t, err)
})
t.Run("search collection released", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := proto.Clone(req).(*internalpb.SearchRequest)
r.GuaranteeTimestamp = Timestamp(100)
request := &querypb.SearchRequest{
Req: r,
IsShardLeader: true,
DmlChannel: defaultDMLChannel,
SegmentIDs: []int64{},
}
qs.collection.setReleaseTime(100, true)
_, err = qs.search(ctx, request)
assert.Error(t, err)
})
}
func TestQueryShard_Query(t *testing.T) {
......@@ -207,6 +259,56 @@ func TestQueryShard_Query(t *testing.T) {
_, err := qs.query(context.Background(), request)
assert.NoError(t, err)
})
t.Run("query timeout", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
r := proto.Clone(req).(*internalpb.RetrieveRequest)
r.GuaranteeTimestamp = Timestamp(100)
request := &querypb.QueryRequest{
Req: r,
IsShardLeader: true,
DmlChannel: defaultDMLChannel,
SegmentIDs: []int64{},
}
_, err = qs.query(ctx, request)
assert.Error(t, err)
})
t.Run("query wait timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r := proto.Clone(req).(*internalpb.RetrieveRequest)
r.GuaranteeTimestamp = Timestamp(100)
request := &querypb.QueryRequest{
Req: r,
IsShardLeader: true,
DmlChannel: defaultDMLChannel,
SegmentIDs: []int64{},
}
_, err = qs.query(ctx, request)
assert.Error(t, err)
})
t.Run("query collection released", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := proto.Clone(req).(*internalpb.RetrieveRequest)
r.GuaranteeTimestamp = Timestamp(100)
request := &querypb.QueryRequest{
Req: r,
IsShardLeader: true,
DmlChannel: defaultDMLChannel,
SegmentIDs: []int64{},
}
qs.collection.setReleaseTime(100, true)
_, err = qs.query(ctx, request)
assert.Error(t, err)
})
}
func TestQueryShard_waitNewTSafe(t *testing.T) {
......@@ -230,9 +332,29 @@ func TestQueryShard_WaitUntilServiceable(t *testing.T) {
qs, err := genSimpleQueryShard(context.Background())
assert.NoError(t, err)
t.Run("normal success", func(t *testing.T) {
err = updateQueryShardTSafe(qs, 1000)
require.NoError(t, err)
err = qs.waitUntilServiceable(context.Background(), 1000, tsTypeDML)
assert.NoError(t, err)
qs.waitUntilServiceable(context.Background(), 1000, tsTypeDML)
})
t.Run("context timeout", func(t *testing.T) {
err = updateQueryShardTSafe(qs, 1000)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err = qs.waitUntilServiceable(ctx, 1001, tsTypeDML)
assert.Error(t, err)
})
t.Run("collection released", func(t *testing.T) {
qs.collection.setReleaseTime(1000, true)
err = qs.waitUntilServiceable(context.Background(), 1001, tsTypeDML)
assert.Error(t, err)
})
}
func genSearchResultData(nq int64, topk int64, ids []int64, scores []float32, topks []int64) *schemapb.SearchResultData {
......
......@@ -723,18 +723,13 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
}
func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replicaType ReplicaType) error {
// block search/query operation
replica.queryLock()
collection, err := replica.getCollectionByID(r.req.CollectionID)
if err != nil {
replica.queryUnlock()
return err
}
// set release time
log.Info("set release time", zap.Any("collectionID", r.req.CollectionID))
collection.setReleaseTime(r.req.Base.Timestamp)
replica.queryUnlock()
collection.setReleaseTime(r.req.Base.Timestamp, true)
// remove all flow graphs of the target collection
var channels []Channel
......
......@@ -93,3 +93,10 @@ func (ts *tSafe) set(t Timestamp) {
// zap.Any("channel", ts.channel),
// zap.Any("t", m.t))
}
// close calls the close method of internal watcher if any
func (ts *tSafe) close() {
if ts.watcher != nil {
ts.watcher.close()
}
}
......@@ -94,6 +94,10 @@ func (t *tSafeReplica) removeTSafe(vChannel Channel) {
log.Info("remove tSafe replica",
zap.String("vChannel", vChannel),
)
tsafe, ok := t.tSafes[vChannel]
if ok {
tsafe.close()
}
delete(t.tSafes, vChannel)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册