未验证 提交 a66b77e4 编写于 作者: C congqixia 提交者: GitHub

Fix watcher loop quit and channel shouldDrop logic (#23394)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 51f5a128
......@@ -91,3 +91,8 @@ const (
func IsSystemField(fieldID int64) bool {
return fieldID < StartOfUserFieldID
}
const (
// LatestVerision is the magic number for watch latest revision
LatestRevision = int64(-1)
)
......@@ -59,6 +59,11 @@ func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan
return c.etcdWatcher, c.timeoutWatcher
}
func (c *channelStateTimer) getWatchersWithRevision(prefix string, revision int64) (clientv3.WatchChan, chan *ackEvent) {
c.etcdWatcher = c.watchkv.WatchWithRevision(prefix, revision)
return c.etcdWatcher, c.timeoutWatcher
}
func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) {
prefix := path.Join(Params.DataCoordCfg.ChannelWatchSubPath, strconv.FormatInt(nodeID, 10))
......@@ -113,7 +118,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
case <-ticker.C:
// check tickle at path as :tickle/[prefix]/{channel_name}
c.removeTimers([]string{channelName})
log.Info("timeout and stop timer: wait for channel ACK timeout",
log.Warn("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
......
......@@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
......@@ -164,7 +165,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
checkerContext, cancel := context.WithCancel(ctx)
c.stopChecker = cancel
if c.stateChecker != nil {
go c.stateChecker(checkerContext)
// TODO get revision from reload logic
go c.stateChecker(checkerContext, common.LatestRevision)
log.Info("starting etcd states checker")
}
......@@ -651,15 +653,20 @@ func (c *ChannelManager) processAck(e *ackEvent) {
}
}
type channelStateChecker func(context.Context)
type channelStateChecker func(context.Context, int64)
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision int64) {
defer logutil.LogPanic()
// REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name}
watchPrefix := Params.DataCoordCfg.ChannelWatchSubPath
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix)
var etcdWatcher clientv3.WatchChan
var timeoutWatcher chan *ackEvent
if revision == common.LatestRevision {
etcdWatcher, timeoutWatcher = c.stateTimer.getWatchers(watchPrefix)
} else {
etcdWatcher, timeoutWatcher = c.stateTimer.getWatchersWithRevision(watchPrefix, revision)
}
for {
select {
......@@ -674,14 +681,17 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
case event, ok := <-etcdWatcher:
if !ok {
log.Warn("datacoord failed to watch channel, return")
// rewatch for transient network error, session handles process quiting if connect is not recoverable
go c.watchChannelStatesLoop(ctx, revision)
return
}
if err := event.Err(); err != nil {
log.Warn("datacoord watch channel hit error", zap.Error(event.Err()))
// https://github.com/etcd-io/etcd/issues/8980
// TODO add list and wathc with revision
if event.Err() == v3rpc.ErrCompacted {
go c.watchChannelStatesLoop(ctx)
go c.watchChannelStatesLoop(ctx, event.CompactRevision)
return
}
// if watch loop return due to event canceled, the datacoord is not functional anymore
......@@ -689,6 +699,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
return
}
revision = event.Header.GetRevision() + 1
for _, evt := range event.Events {
if evt.Type == clientv3.EventTypeDelete {
continue
......
......@@ -26,6 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/dependency"
......@@ -114,7 +115,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -144,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -175,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -213,7 +214,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -256,7 +257,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -302,7 +303,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -348,7 +349,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
wg.Done()
}()
......@@ -927,7 +928,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
chManager.stopChecker = cancel
defer cancel()
go chManager.stateChecker(ctx)
go chManager.stateChecker(ctx, common.LatestRevision)
chManager.store = &ChannelStore{
store: metakv,
......
......@@ -331,10 +331,10 @@ func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
for id, info := range c.channelsInfo {
if id == nodeID {
delete(c.channelsInfo, id)
if err := c.remove(nodeID); err != nil {
return nil, err
}
delete(c.channelsInfo, id)
return info.Channels, nil
}
}
......
......@@ -334,22 +334,8 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
// CheckShouldDropChannel returns whether specified channel is marked to be removed
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
/*
segments := h.s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // filter empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
// FIXME: cancel this limitation for #12265
// need to change a unified DropAndFlush to solve the root problem
//len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}
}
return false*/
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) ||
!h.s.meta.catalog.ChannelExists(h.s.ctx, channel)
}
// FinishDropChannel cleans up the remove flag for channels
......
......@@ -2596,7 +2596,7 @@ func TestShouldDropChannel(t *testing.T) {
})
*/
t.Run("channel name not in kv", func(t *testing.T) {
assert.False(t, svr.handler.CheckShouldDropChannel("ch99"))
assert.True(t, svr.handler.CheckShouldDropChannel("ch99"))
})
t.Run("channel in remove flag", func(t *testing.T) {
......@@ -2605,10 +2605,6 @@ func TestShouldDropChannel(t *testing.T) {
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
})
t.Run("channel name not matched", func(t *testing.T) {
assert.False(t, svr.handler.CheckShouldDropChannel("ch2"))
})
}
func TestGetRecoveryInfo(t *testing.T) {
......
......@@ -279,6 +279,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
case event, ok := <-evtChan:
if !ok {
log.Warn("datanode failed to watch channel, return")
go node.StartWatchChannels(ctx)
return
}
......
......@@ -122,7 +122,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.Debug("ddNode in dropMode",
log.Info("ddNode in dropMode",
zap.String("vChannelName", ddn.vChannelName),
zap.Int64("collection ID", ddn.collectionID))
return []Msg{}
......
......@@ -1238,7 +1238,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
log.Info("IndexCoord start watching flushed segments...")
defer i.loopWg.Done()
watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1)
watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision)
for {
select {
case <-i.loopCtx.Done():
......@@ -1246,7 +1246,8 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
return
case resp, ok := <-watchChan:
if !ok {
log.Warn("IndexCoord watch flush segments loop failed because watch channel closed")
log.Warn("IndexCoord watch flush segments loop failed because watch channel closed, retry...")
go i.watchFlushedSegmentLoop()
return
}
if err := resp.Err(); err != nil {
......@@ -1266,6 +1267,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err))
panic("failed to handle etcd request, exit..")
}
i.flushedSegmentWatcher.etcdRevision = resp.Header.GetRevision() + 1
events := resp.Events
for _, event := range events {
switch event.Type {
......
......@@ -43,6 +43,7 @@ func TestShardClusterService_SyncReplicaSegments(t *testing.T) {
defer client.Close()
session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client)
clusterService := newShardClusterService(client, session, qn)
defer clusterService.close()
t.Run("sync non-exist shard cluster", func(t *testing.T) {
err := clusterService.SyncReplicaSegments(defaultDMLChannel, nil)
......
......@@ -24,6 +24,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
......@@ -110,10 +111,11 @@ func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64,
ctx, cancel := context.WithCancel(context.Background())
go nd.cancelClose(cancel)
watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithRev(resp.Header.Revision+1), clientv3.WithPrefix(), clientv3.WithPrevKV())
revision := resp.Header.GetRevision() + 1
watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithRev(revision), clientv3.WithPrefix(), clientv3.WithPrevKV())
nd.wg.Add(1)
go nd.watch(watchCh, collectionID, replicaID)
go nd.watch(watchCh, collectionID, replicaID, revision)
return nodes, nd.evtCh
}
......@@ -123,7 +125,7 @@ func (nd *etcdShardNodeDetector) cancelClose(cancel func()) {
cancel()
}
func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID int64) {
func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID, revision int64) {
defer nd.wg.Done()
for {
select {
......@@ -132,31 +134,89 @@ func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, repl
return
case evt, ok := <-ch:
if !ok {
log.Warn("event ch closed")
log.Warn("watch channel closed, retry...")
var watchCh clientv3.WatchChan
var ok bool
watchCh, ok, revision = nd.rewatch(collectionID, replicaID, revision)
if !ok {
// detector closed
return
}
nd.wg.Add(1)
go nd.watch(watchCh, collectionID, replicaID, revision)
return
}
if err := evt.Err(); err != nil {
if err == v3rpc.ErrCompacted {
ctx, cancel := context.WithCancel(context.Background())
watchCh := nd.client.Watch(ctx, nd.path, clientv3.WithPrefix())
go nd.cancelClose(cancel)
watchCh, ok, revision := nd.rewatch(collectionID, replicaID, evt.CompactRevision)
if !ok {
// detector closed
return
}
nd.wg.Add(1)
go nd.watch(watchCh, collectionID, replicaID)
go nd.watch(watchCh, collectionID, replicaID, revision)
return
}
log.Error("failed to handle watch node error", zap.Error(err))
panic(err)
}
for _, e := range evt.Events {
switch e.Type {
case mvccpb.PUT:
nd.handlePutEvent(e, collectionID, replicaID)
case mvccpb.DELETE:
nd.handleDelEvent(e, collectionID, replicaID)
revision = evt.Header.GetRevision() + 1
nd.handleEvt(evt, collectionID, replicaID)
}
}
}
func (nd *etcdShardNodeDetector) handleEvt(evt clientv3.WatchResponse, collectionID, replicaID int64) {
for _, e := range evt.Events {
switch e.Type {
case mvccpb.PUT:
nd.handlePutEvent(e, collectionID, replicaID)
case mvccpb.DELETE:
nd.handleDelEvent(e, collectionID, replicaID)
}
}
}
func (nd *etcdShardNodeDetector) rewatch(collectionID, replicaID, rev int64) (ch clientv3.WatchChan, ok bool, revision int64) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
revision = rev
err := retry.Do(ctx, func() error {
ch = nd.client.Watch(ctx, nd.path, clientv3.WithPrefix(), clientv3.WithRev(revision))
select {
case <-nd.closeCh:
return retry.Unrecoverable(errors.New("detector closed"))
case evt, ok := <-ch:
if !ok {
return errors.New("rewatch got closed ch")
}
if err := evt.Err(); err != nil {
if err == v3rpc.ErrCompacted {
revision = evt.CompactRevision
return err
}
log.Error("failed to handle watch node error", zap.Error(err))
panic(err)
}
revision = evt.Header.GetRevision() + 1
nd.handleEvt(evt, collectionID, replicaID)
default:
// blocked, fine
}
return nil
})
// check detector closed
if err != nil {
select {
case <-nd.closeCh:
return nil, false, revision
default:
panic(err)
}
}
return ch, true, revision
}
func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID, replicaID int64) {
......
......@@ -18,11 +18,13 @@ package querynode
import (
"context"
"errors"
"sync"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
......@@ -103,14 +105,16 @@ func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID
}
}
revision := resp.Header.GetRevision() + 1
sd.wg.Add(1)
watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithRev(resp.Header.GetRevision()+1), clientv3.WithPrefix(), clientv3.WithPrevKV())
go sd.watch(watchCh, collectionID, replicaID, vchannelName)
watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithRev(revision), clientv3.WithPrefix(), clientv3.WithPrevKV())
go sd.watch(watchCh, collectionID, replicaID, vchannelName, revision)
return events, sd.evtCh
}
func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string) {
func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string, revision int64) {
defer sd.wg.Done()
for {
select {
......@@ -119,29 +123,84 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in
return
case evt, ok := <-ch:
if !ok {
log.Warn("SegmentDetector event channel closed")
log.Warn("SegmentDetector event channel closed, retry...")
watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, revision)
if !ok {
return
}
sd.wg.Add(1)
go sd.watch(watchCh, collectionID, replicaID, vchannel, revision)
return
}
if err := evt.Err(); err != nil {
if err == v3rpc.ErrCompacted {
watchCh, ok := sd.rewatch(collectionID, replicaID, vchannel, evt.CompactRevision)
if !ok {
return
}
sd.wg.Add(1)
watchCh := sd.client.Watch(sd.getCtx(), sd.path, clientv3.WithPrefix())
go sd.watch(watchCh, collectionID, replicaID, vchannel)
go sd.watch(watchCh, collectionID, replicaID, vchannel, revision)
return
}
log.Error("failed to handle watch segment error, panic", zap.Error(err))
panic(err)
}
for _, e := range evt.Events {
switch e.Type {
case mvccpb.PUT:
sd.handlePutEvent(e, collectionID, replicaID, vchannel)
case mvccpb.DELETE:
sd.handleDelEvent(e, collectionID, replicaID, vchannel)
revision = evt.Header.GetRevision() + 1
sd.handleEvt(evt, collectionID, replicaID, vchannel)
}
}
}
func (sd *etcdShardSegmentDetector) rewatch(collectionID int64, replicaID int64, vchannel string, revision int64) (ch clientv3.WatchChan, ok bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := retry.Do(ctx, func() error {
ch = sd.client.Watch(ctx, sd.path, clientv3.WithPrefix(), clientv3.WithRev(revision))
select {
case <-sd.closeCh:
return retry.Unrecoverable(errors.New("detector closed"))
case evt, ok := <-ch:
if !ok {
return errors.New("rewatch got closed ch")
}
if err := evt.Err(); err != nil {
if err == v3rpc.ErrCompacted {
revision = evt.CompactRevision
return err
}
log.Error("failed to handle watch segment error", zap.Error(err))
panic(err)
}
revision = evt.Header.GetRevision() + 1
sd.handleEvt(evt, collectionID, replicaID, vchannel)
default:
// blocked, fine
}
return nil
})
// check detector closed
if err != nil {
select {
case <-sd.closeCh:
return nil, false
default:
panic(err)
}
}
return ch, true
}
func (sd *etcdShardSegmentDetector) handleEvt(evt clientv3.WatchResponse, collectionID int64, replicaID int64, vchannel string) {
for _, e := range evt.Events {
switch e.Type {
case mvccpb.PUT:
sd.handlePutEvent(e, collectionID, replicaID, vchannel)
case mvccpb.DELETE:
sd.handleDelEvent(e, collectionID, replicaID, vchannel)
}
}
}
func (sd *etcdShardSegmentDetector) handlePutEvent(e *clientv3.Event, collectionID int64, replicaID int64, vchannel string) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册