From a66b77e408abff0e115592caa3f32ac817a88dab Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 14 Apr 2023 07:30:30 +0800 Subject: [PATCH] Fix watcher loop quit and channel shouldDrop logic (#23394) Signed-off-by: Congqi Xia --- internal/common/common.go | 5 ++ internal/datacoord/channel_checker.go | 7 +- internal/datacoord/channel_manager.go | 23 +++-- internal/datacoord/channel_manager_test.go | 17 ++-- internal/datacoord/channel_store.go | 2 +- internal/datacoord/handler.go | 18 +--- internal/datacoord/server_test.go | 6 +- internal/datanode/data_node.go | 1 + internal/datanode/flow_graph_dd_node.go | 2 +- internal/indexcoord/index_coord.go | 6 +- .../querynode/shard_cluster_service_test.go | 1 + internal/querynode/shard_node_detector.go | 88 ++++++++++++++++--- internal/querynode/shard_segment_detector.go | 83 ++++++++++++++--- 13 files changed, 193 insertions(+), 66 deletions(-) diff --git a/internal/common/common.go b/internal/common/common.go index f8dc428c3..37475cb30 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -91,3 +91,8 @@ const ( func IsSystemField(fieldID int64) bool { return fieldID < StartOfUserFieldID } + +const ( + // LatestVerision is the magic number for watch latest revision + LatestRevision = int64(-1) +) diff --git a/internal/datacoord/channel_checker.go b/internal/datacoord/channel_checker.go index 2ca87b976..4c868536d 100644 --- a/internal/datacoord/channel_checker.go +++ b/internal/datacoord/channel_checker.go @@ -59,6 +59,11 @@ func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan return c.etcdWatcher, c.timeoutWatcher } +func (c *channelStateTimer) getWatchersWithRevision(prefix string, revision int64) (clientv3.WatchChan, chan *ackEvent) { + c.etcdWatcher = c.watchkv.WatchWithRevision(prefix, revision) + return c.etcdWatcher, c.timeoutWatcher +} + func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) { prefix := path.Join(Params.DataCoordCfg.ChannelWatchSubPath, strconv.FormatInt(nodeID, 10)) @@ -113,7 +118,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe case <-ticker.C: // check tickle at path as :tickle/[prefix]/{channel_name} c.removeTimers([]string{channelName}) - log.Info("timeout and stop timer: wait for channel ACK timeout", + log.Warn("timeout and stop timer: wait for channel ACK timeout", zap.String("watch state", watchState.String()), zap.Int64("nodeID", nodeID), zap.String("channel name", channelName), diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index b98a852ed..5b9f848dc 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" @@ -164,7 +165,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { checkerContext, cancel := context.WithCancel(ctx) c.stopChecker = cancel if c.stateChecker != nil { - go c.stateChecker(checkerContext) + // TODO get revision from reload logic + go c.stateChecker(checkerContext, common.LatestRevision) log.Info("starting etcd states checker") } @@ -651,15 +653,20 @@ func (c *ChannelManager) processAck(e *ackEvent) { } } -type channelStateChecker func(context.Context) +type channelStateChecker func(context.Context, int64) -func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { +func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision int64) { defer logutil.LogPanic() // REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name} watchPrefix := Params.DataCoordCfg.ChannelWatchSubPath - // TODO, this is risky, we'd better watch etcd with revision rather simply a path - etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix) + var etcdWatcher clientv3.WatchChan + var timeoutWatcher chan *ackEvent + if revision == common.LatestRevision { + etcdWatcher, timeoutWatcher = c.stateTimer.getWatchers(watchPrefix) + } else { + etcdWatcher, timeoutWatcher = c.stateTimer.getWatchersWithRevision(watchPrefix, revision) + } for { select { @@ -674,14 +681,17 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { case event, ok := <-etcdWatcher: if !ok { log.Warn("datacoord failed to watch channel, return") + // rewatch for transient network error, session handles process quiting if connect is not recoverable + go c.watchChannelStatesLoop(ctx, revision) return } if err := event.Err(); err != nil { log.Warn("datacoord watch channel hit error", zap.Error(event.Err())) // https://github.com/etcd-io/etcd/issues/8980 + // TODO add list and wathc with revision if event.Err() == v3rpc.ErrCompacted { - go c.watchChannelStatesLoop(ctx) + go c.watchChannelStatesLoop(ctx, event.CompactRevision) return } // if watch loop return due to event canceled, the datacoord is not functional anymore @@ -689,6 +699,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) { return } + revision = event.Header.GetRevision() + 1 for _, evt := range event.Events { if evt.Type == clientv3.EventTypeDelete { continue diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index c2edb9f79..347993c44 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -26,6 +26,7 @@ import ( "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/dependency" @@ -114,7 +115,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -144,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -175,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -213,7 +214,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -256,7 +257,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -302,7 +303,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -348,7 +349,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go func() { - chManager.watchChannelStatesLoop(ctx) + chManager.watchChannelStatesLoop(ctx, common.LatestRevision) wg.Done() }() @@ -927,7 +928,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) chManager.stopChecker = cancel defer cancel() - go chManager.stateChecker(ctx) + go chManager.stateChecker(ctx, common.LatestRevision) chManager.store = &ChannelStore{ store: metakv, diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 9de3a0cee..b665b22a3 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -331,10 +331,10 @@ func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int { func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) { for id, info := range c.channelsInfo { if id == nodeID { - delete(c.channelsInfo, id) if err := c.remove(nodeID); err != nil { return nil, err } + delete(c.channelsInfo, id) return info.Channels, nil } } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 073b8201a..b17114985 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -334,22 +334,8 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID // CheckShouldDropChannel returns whether specified channel is marked to be removed func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { - /* - segments := h.s.meta.GetSegmentsByChannel(channel) - for _, segment := range segments { - if segment.GetStartPosition() != nil && // filter empty segment - // FIXME: we filter compaction generated segments - // because datanode may not know the segment due to the network lag or - // datacoord crash when handling CompleteCompaction. - // FIXME: cancel this limitation for #12265 - // need to change a unified DropAndFlush to solve the root problem - //len(segment.CompactionFrom) == 0 && - segment.GetState() != commonpb.SegmentState_Dropped { - return false - } - } - return false*/ - return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) + return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) || + !h.s.meta.catalog.ChannelExists(h.s.ctx, channel) } // FinishDropChannel cleans up the remove flag for channels diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index f9b21003f..6b3b6cd62 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2596,7 +2596,7 @@ func TestShouldDropChannel(t *testing.T) { }) */ t.Run("channel name not in kv", func(t *testing.T) { - assert.False(t, svr.handler.CheckShouldDropChannel("ch99")) + assert.True(t, svr.handler.CheckShouldDropChannel("ch99")) }) t.Run("channel in remove flag", func(t *testing.T) { @@ -2605,10 +2605,6 @@ func TestShouldDropChannel(t *testing.T) { assert.True(t, svr.handler.CheckShouldDropChannel("ch1")) }) - - t.Run("channel name not matched", func(t *testing.T) { - assert.False(t, svr.handler.CheckShouldDropChannel("ch2")) - }) } func TestGetRecoveryInfo(t *testing.T) { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4dcb6218b..9b34acb99 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -279,6 +279,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) { case event, ok := <-evtChan: if !ok { log.Warn("datanode failed to watch channel, return") + go node.StartWatchChannels(ctx) return } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index b57244a13..ecb0ff656 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -122,7 +122,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } if load := ddn.dropMode.Load(); load != nil && load.(bool) { - log.Debug("ddNode in dropMode", + log.Info("ddNode in dropMode", zap.String("vChannelName", ddn.vChannelName), zap.Int64("collection ID", ddn.collectionID)) return []Msg{} diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index a005180b0..7b6a8588e 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -1238,7 +1238,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() { log.Info("IndexCoord start watching flushed segments...") defer i.loopWg.Done() - watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1) + watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision) for { select { case <-i.loopCtx.Done(): @@ -1246,7 +1246,8 @@ func (i *IndexCoord) watchFlushedSegmentLoop() { return case resp, ok := <-watchChan: if !ok { - log.Warn("IndexCoord watch flush segments loop failed because watch channel closed") + log.Warn("IndexCoord watch flush segments loop failed because watch channel closed, retry...") + go i.watchFlushedSegmentLoop() return } if err := resp.Err(); err != nil { @@ -1266,6 +1267,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() { zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err)) panic("failed to handle etcd request, exit..") } + i.flushedSegmentWatcher.etcdRevision = resp.Header.GetRevision() + 1 events := resp.Events for _, event := range events { switch event.Type { diff --git a/internal/querynode/shard_cluster_service_test.go b/internal/querynode/shard_cluster_service_test.go index 222d94d21..881deee70 100644 --- a/internal/querynode/shard_cluster_service_test.go +++ b/internal/querynode/shard_cluster_service_test.go @@ -43,6 +43,7 @@ func TestShardClusterService_SyncReplicaSegments(t *testing.T) { defer client.Close() session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client) clusterService := newShardClusterService(client, session, qn) + defer clusterService.close() t.Run("sync non-exist shard cluster", func(t *testing.T) { err := clusterService.SyncReplicaSegments(defaultDMLChannel, nil) diff --git a/internal/querynode/shard_node_detector.go b/internal/querynode/shard_node_detector.go index f4c68a037..800bfbd33 100644 --- a/internal/querynode/shard_node_detector.go +++ b/internal/querynode/shard_node_detector.go @@ -24,6 +24,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/retry" "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" @@ -110,10 +111,11 @@ func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64, ctx, cancel := context.WithCancel(context.Background()) go nd.cancelClose(cancel) - watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithRev(resp.Header.Revision+1), clientv3.WithPrefix(), clientv3.WithPrevKV()) + revision := resp.Header.GetRevision() + 1 + watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithRev(revision), clientv3.WithPrefix(), clientv3.WithPrevKV()) nd.wg.Add(1) - go nd.watch(watchCh, collectionID, replicaID) + go nd.watch(watchCh, collectionID, replicaID, revision) return nodes, nd.evtCh } @@ -123,7 +125,7 @@ func (nd *etcdShardNodeDetector) cancelClose(cancel func()) { cancel() } -func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID int64) { +func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID, revision int64) { defer nd.wg.Done() for { select { @@ -132,31 +134,89 @@ func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, repl return case evt, ok := <-ch: if !ok { - log.Warn("event ch closed") + log.Warn("watch channel closed, retry...") + var watchCh clientv3.WatchChan + var ok bool + watchCh, ok, revision = nd.rewatch(collectionID, replicaID, revision) + if !ok { + // detector closed + return + } + nd.wg.Add(1) + go nd.watch(watchCh, collectionID, replicaID, revision) return } if err := evt.Err(); err != nil { if err == v3rpc.ErrCompacted { - ctx, cancel := context.WithCancel(context.Background()) - watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithPrefix()) - go nd.cancelClose(cancel) + watchCh, ok, revision := nd.rewatch(collectionID, replicaID, evt.CompactRevision) + if !ok { + // detector closed + return + } nd.wg.Add(1) - go nd.watch(watchCh, collectionID, replicaID) + go nd.watch(watchCh, collectionID, replicaID, revision) return } log.Error("failed to handle watch node error", zap.Error(err)) panic(err) } - for _, e := range evt.Events { - switch e.Type { - case mvccpb.PUT: - nd.handlePutEvent(e, collectionID, replicaID) - case mvccpb.DELETE: - nd.handleDelEvent(e, collectionID, replicaID) + revision = evt.Header.GetRevision() + 1 + nd.handleEvt(evt, collectionID, replicaID) + } + } +} + +func (nd *etcdShardNodeDetector) handleEvt(evt clientv3.WatchResponse, collectionID, replicaID int64) { + for _, e := range evt.Events { + switch e.Type { + case mvccpb.PUT: + nd.handlePutEvent(e, collectionID, replicaID) + case mvccpb.DELETE: + nd.handleDelEvent(e, collectionID, replicaID) + } + } +} + +func (nd *etcdShardNodeDetector) rewatch(collectionID, replicaID, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + revision = rev + err := retry.Do(ctx, func() error { + ch = nd.client.Watch(ctx, nd.path, clientv3.WithPrefix(), clientv3.WithRev(revision)) + select { + case <-nd.closeCh: + return retry.Unrecoverable(errors.New("detector closed")) + + case evt, ok := <-ch: + if !ok { + return errors.New("rewatch got closed ch") + } + if err := evt.Err(); err != nil { + if err == v3rpc.ErrCompacted { + revision = evt.CompactRevision + return err } + log.Error("failed to handle watch node error", zap.Error(err)) + panic(err) } + revision = evt.Header.GetRevision() + 1 + nd.handleEvt(evt, collectionID, replicaID) + default: + // blocked, fine + } + return nil + }) + // check detector closed + if err != nil { + select { + case <-nd.closeCh: + return nil, false, revision + default: + panic(err) } } + + return ch, true, revision } func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID, replicaID int64) { diff --git a/internal/querynode/shard_segment_detector.go b/internal/querynode/shard_segment_detector.go index 94d2b8554..b2365c03f 100644 --- a/internal/querynode/shard_segment_detector.go +++ b/internal/querynode/shard_segment_detector.go @@ -18,11 +18,13 @@ package querynode import ( "context" + "errors" "sync" "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/retry" "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" @@ -103,14 +105,16 @@ func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID } } + revision := resp.Header.GetRevision() + 1 sd.wg.Add(1) - watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithRev(resp.Header.GetRevision()+1), clientv3.WithPrefix(), clientv3.WithPrevKV()) - go sd.watch(watchCh, collectionID, replicaID, vchannelName) + watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithRev(revision), clientv3.WithPrefix(), clientv3.WithPrevKV()) + + go sd.watch(watchCh, collectionID, replicaID, vchannelName, revision) return events, sd.evtCh } -func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string) { +func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string, revision int64) { defer sd.wg.Done() for { select { @@ -119,29 +123,84 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in return case evt, ok := <-ch: if !ok { - log.Warn("SegmentDetector event channel closed") + log.Warn("SegmentDetector event channel closed, retry...") + watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, revision) + if !ok { + return + } + sd.wg.Add(1) + go sd.watch(watchCh, collectionID, replicaID, vchannel, revision) return } if err := evt.Err(); err != nil { if err == v3rpc.ErrCompacted { + watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision) + if !ok { + return + } sd.wg.Add(1) - watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithPrefix()) - go sd.watch(watchCh, collectionID, replicaID, vchannel) + go sd.watch(watchCh, collectionID, replicaID, vchannel, revision) return } log.Error("failed to handle watch segment error, panic", zap.Error(err)) panic(err) } - for _, e := range evt.Events { - switch e.Type { - case mvccpb.PUT: - sd.handlePutEvent(e, collectionID, replicaID, vchannel) - case mvccpb.DELETE: - sd.handleDelEvent(e, collectionID, replicaID, vchannel) + revision = evt.Header.GetRevision() + 1 + sd.handleEvt(evt, collectionID, replicaID, vchannel) + } + } +} + +func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, revision int64) (ch clientv3.WatchChan, ok bool) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := retry.Do(ctx, func() error { + ch = sd.client.Watch(ctx, sd.path, clientv3.WithPrefix(), clientv3.WithRev(revision)) + select { + case <-sd.closeCh: + return retry.Unrecoverable(errors.New("detector closed")) + case evt, ok := <-ch: + if !ok { + return errors.New("rewatch got closed ch") + } + if err := evt.Err(); err != nil { + if err == v3rpc.ErrCompacted { + revision = evt.CompactRevision + return err } + log.Error("failed to handle watch segment error", zap.Error(err)) + panic(err) } + revision = evt.Header.GetRevision() + 1 + sd.handleEvt(evt, collectionID, replicaID, vchannel) + default: + // blocked, fine + } + return nil + }) + // check detector closed + if err != nil { + select { + case <-sd.closeCh: + return nil, false + default: + panic(err) } } + + return ch, true +} + +func (sd *etcdShardSegmentDetector) handleEvt(evt clientv3.WatchResponse, collectionID int64, replicaID int64, vchannel string) { + for _, e := range evt.Events { + switch e.Type { + case mvccpb.PUT: + sd.handlePutEvent(e, collectionID, replicaID, vchannel) + case mvccpb.DELETE: + sd.handleDelEvent(e, collectionID, replicaID, vchannel) + } + } + } func (sd *etcdShardSegmentDetector) handlePutEvent(e *clientv3.Event, collectionID int64, replicaID int64, vchannel string) { -- GitLab