未验证 提交 6513be5c 编写于 作者: C congqixia 提交者: GitHub

Extend watch/release channel logic in DataNode (#15925)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 9ad5c14a
......@@ -340,6 +340,7 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
TimeoutTs: time.Now().Add(maxWatchDuration).UnixNano(),
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
......
......@@ -455,7 +455,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker {
}
// if a channel is not watched after maxWatchDuration,
// then we reallocate it to another node
if watchInfo.State == datapb.ChannelWatchState_Complete {
if watchInfo.State == datapb.ChannelWatchState_Complete || watchInfo.State == datapb.ChannelWatchState_WatchSuccess {
continue
}
startTime := time.Unix(watchInfo.StartTs, 0)
......
......@@ -293,11 +293,13 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
e = &event{
eventType: putEventType,
version: evt.Kv.Version,
}
case clientv3.EventTypeDelete:
e = &event{
eventType: deleteEventType,
version: evt.Kv.Version,
}
}
node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
......@@ -314,6 +316,11 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
return
}
if isEndWatchState(watchInfo.State) {
log.Warn("DataNode received a PUT event with a end State", zap.String("state", watchInfo.State.String()))
return
}
e.info = watchInfo
e.vChanName = watchInfo.GetVchan().GetChannelName()
......@@ -322,12 +329,9 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
e.vChanName = parseDeleteEventKey(key)
}
actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, &channelEventManager{
eventChan: make(chan event, 10),
closeChan: make(chan struct{}),
handlePutEvent: node.handlePutEvent,
handleDeleteEvent: node.handleDeleteEvent,
})
actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, newChannelEventManager(
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
))
if !loaded {
actualManager.(*channelEventManager).Run()
......@@ -350,10 +354,6 @@ func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err)
}
if watchInfo.State == datapb.ChannelWatchState_Complete {
return nil, fmt.Errorf("invalid event: event state is already ChannelWatchState_Compele")
}
if watchInfo.Vchan == nil {
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
}
......@@ -367,28 +367,51 @@ func parseDeleteEventKey(key string) string {
return vChanName
}
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo) error {
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
vChanName := watchInfo.GetVchan().GetChannelName()
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil {
return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err)
}
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
defer func() {
if err != nil {
node.releaseFlowgraph(vChanName)
}
}()
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
case datapb.ChannelWatchState_ToRelease:
success := true
func() {
defer func() {
if x := recover(); x != nil {
log.Error("release flowgraph panic", zap.Any("recovered", x))
success = false
}
}()
node.releaseFlowgraph(vChanName)
}()
if !success {
watchInfo.State = datapb.ChannelWatchState_ReleaseFailure
} else {
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
}
}
watchInfo.State = datapb.ChannelWatchState_Complete
v, err := proto.Marshal(watchInfo)
if err != nil {
return fmt.Errorf("fail to marshal watchInfo with complete state, vChanName: %s, err: %v", vChanName, err)
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err)
}
k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), vChanName)
log.Info("handle put event: try to save completed state", zap.String("key", k))
log.Info("handle put event: try to save result state", zap.String("key", k), zap.String("state", watchInfo.State.String()))
err = node.watchKv.Save(k, string(v))
// TODO DataNode unable to save into etcd, may need to panic
err = node.watchKv.CompareVersionAndSwap(k, version, string(v))
if err != nil {
node.releaseFlowgraph(vChanName)
return fmt.Errorf("fail to update completed state to etcd, vChanName: %s, err: %v", vChanName, err)
return fmt.Errorf("fail to update watch state to etcd, vChanName: %s, state: %s, err: %w", vChanName, watchInfo.State.String(), err)
}
return nil
}
......
......@@ -396,8 +396,9 @@ func TestWatchChannel(t *testing.T) {
UnflushedSegments: []*datapb.SegmentInfo{},
}
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_Uncomplete,
State: datapb.ChannelWatchState_ToWatch,
Vchan: vchan,
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
}
val, err := proto.Marshal(info)
assert.Nil(t, err)
......@@ -418,6 +419,66 @@ func TestWatchChannel(t *testing.T) {
assert.False(t, exist)
})
t.Run("Test release channel", func(t *testing.T) {
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
path := fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, oldInvalidCh)
err = kv.Save(path, string([]byte{23}))
assert.NoError(t, err)
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
path = fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, ch)
c := make(chan struct{})
go func() {
ec := kv.WatchWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID))
c <- struct{}{}
cnt := 0
for {
evt := <-ec
for _, event := range evt.Events {
if strings.Contains(string(event.Kv.Key), ch) {
cnt++
}
}
if cnt >= 2 {
break
}
}
c <- struct{}{}
}()
// wait for check goroutine start Watch
<-c
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: ch,
UnflushedSegments: []*datapb.SegmentInfo{},
}
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToRelease,
Vchan: vchan,
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
}
val, err := proto.Marshal(info)
assert.Nil(t, err)
err = kv.Save(path, string(val))
assert.Nil(t, err)
// wait for check goroutine received 2 events
<-c
exist := node.flowgraphManager.exist(ch)
assert.False(t, exist)
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID))
assert.Nil(t, err)
//TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
exist = node.flowgraphManager.exist(ch)
assert.False(t, exist)
})
t.Run("handle watch info failed", func(t *testing.T) {
e := &event{
eventType: putEventType,
......@@ -439,20 +500,81 @@ func TestWatchChannel(t *testing.T) {
exist = node.flowgraphManager.exist("test2")
assert.False(t, exist)
chPut := make(chan struct{}, 1)
chDel := make(chan struct{}, 1)
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
m := newChannelEventManager(
func(info *datapb.ChannelWatchInfo, version int64) error {
r := node.handlePutEvent(info, version)
chPut <- struct{}{}
return r
},
func(vChan string) {
node.handleDeleteEvent(vChan)
chDel <- struct{}{}
}, time.Millisecond*100,
)
node.eventManagerMap.Store(ch, m)
m.Run()
info = datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
Vchan: &datapb.VchannelInfo{ChannelName: ch},
State: datapb.ChannelWatchState_Uncomplete,
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
}
bs, err = proto.Marshal(&info)
assert.NoError(t, err)
msFactory := node.msFactory
defer func() { node.msFactory = msFactory }()
node.msFactory = &FailMessageStreamFactory{
node.msFactory,
}
node.handleWatchInfo(e, "test3", bs)
exist = node.flowgraphManager.exist("test3")
node.handleWatchInfo(e, ch, bs)
<-chPut
exist = node.flowgraphManager.exist(ch)
assert.False(t, exist)
})
t.Run("handle watchinfo out of date", func(t *testing.T) {
chPut := make(chan struct{}, 1)
chDel := make(chan struct{}, 1)
// inject eventManager
ch := fmt.Sprintf("datanode-etcd-test-by-dev-rootcoord-dml-channel_%d", rand.Int31())
m := newChannelEventManager(
func(info *datapb.ChannelWatchInfo, version int64) error {
r := node.handlePutEvent(info, version)
chPut <- struct{}{}
return r
},
func(vChan string) {
node.handleDeleteEvent(vChan)
chDel <- struct{}{}
}, time.Millisecond*100,
)
node.eventManagerMap.Store(ch, m)
m.Run()
e := &event{
eventType: putEventType,
version: 10000,
}
info := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: ch},
State: datapb.ChannelWatchState_Uncomplete,
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
node.handleWatchInfo(e, ch, bs)
<-chPut
exist := node.flowgraphManager.exist("test3")
assert.False(t, exist)
})
}
func TestDataNode_GetComponentStates(t *testing.T) {
......
......@@ -17,6 +17,7 @@
package datanode
import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
......@@ -30,14 +31,17 @@ const retryWatchInterval = 20 * time.Second
type event struct {
eventType int
vChanName string
version int64
info *datapb.ChannelWatchInfo
}
type channelEventManager struct {
sync.Once
eventChan chan event
closeChan chan struct{}
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo) error // node.handlePutEvent
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo, version int64) error // node.handlePutEvent
handleDeleteEvent func(vChanName string) // node.handleDeleteEvent
retryInterval time.Duration
}
const (
......@@ -45,6 +49,17 @@ const (
deleteEventType = 2
)
func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error,
handleDel func(string), retryInterval time.Duration) *channelEventManager {
return &channelEventManager{
eventChan: make(chan event, 10),
closeChan: make(chan struct{}),
handlePutEvent: handlePut,
handleDeleteEvent: handleDel,
retryInterval: retryInterval,
}
}
func (e *channelEventManager) Run() {
go func() {
for {
......@@ -52,29 +67,69 @@ func (e *channelEventManager) Run() {
case event := <-e.eventChan:
switch event.eventType {
case putEventType:
e.retryHandlePutEvent(event)
case deleteEventType:
e.handleDeleteEvent(event.vChanName)
}
case <-e.closeChan:
return
}
}
}()
}
func (e *channelEventManager) retryHandlePutEvent(event event) {
countdown := time.Until(time.Unix(0, event.info.TimeoutTs))
if countdown < 0 {
log.Warn("event already timed out", zap.String("vChanName", event.vChanName))
return
}
// Trigger retry for-loop when fail to handle put event for the first time
if err := e.handlePutEvent(event.info); err != nil {
if err := e.handlePutEvent(event.info, event.version); err != nil {
timer := time.NewTimer(countdown)
defer timer.Stop()
ticker := time.NewTicker(e.retryInterval)
defer ticker.Stop()
for {
log.Warn("handle put event fail, starting retry",
zap.String("vChanName", event.vChanName),
zap.String("retry interval", retryWatchInterval.String()),
zap.String("retry interval", e.retryInterval.String()),
zap.Error(err))
<-time.NewTimer(retryWatchInterval).C
// reset the ticker
ticker.Reset(e.retryInterval)
select {
case e, ok := <-e.eventChan:
case <-ticker.C:
// ticker notify, do another retry
case <-timer.C:
// timeout
log.Warn("event process timed out", zap.String("vChanName", event.vChanName))
return
case evt, ok := <-e.eventChan:
if !ok {
log.Warn("event channel closed", zap.String("vChanName", event.vChanName))
return
}
// When got another put event, overwrite current event
if evt.eventType == putEventType {
// handles only Uncomplete, ToWatch and ToRelease
if isEndWatchState(evt.info.State) {
return
}
event = evt
}
// When getting a delete event at next retry, exit retry loop
// When getting a put event, just continue the retry
if ok && e.eventType == deleteEventType {
if evt.eventType == deleteEventType {
log.Warn("delete event triggerred, terminating retry.",
zap.String("vChanName", event.vChanName))
e.handleDeleteEvent(evt.vChanName)
return
}
default:
}
err = e.handlePutEvent(event.info)
err = e.handlePutEvent(event.info, event.version)
if err == nil {
log.Info("retry to handle put event successfully",
zap.String("vChanName", event.vChanName))
......@@ -82,14 +137,6 @@ func (e *channelEventManager) Run() {
}
}
}
case deleteEventType:
e.handleDeleteEvent(event.vChanName)
}
case <-e.closeChan:
return
}
}
}()
}
func (e *channelEventManager) handleEvent(event event) {
......@@ -97,5 +144,13 @@ func (e *channelEventManager) handleEvent(event event) {
}
func (e *channelEventManager) Close() {
e.Do(func() {
close(e.closeChan)
})
}
func isEndWatchState(state datapb.ChannelWatchState) bool {
return state != datapb.ChannelWatchState_ToWatch && // start watch
state != datapb.ChannelWatchState_ToRelease && // start release
state != datapb.ChannelWatchState_Uncomplete // legacy state, equal to ToWatch
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"errors"
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
func TestChannelEventManager(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
ran = true
ch <- struct{}{}
return nil
}, func(name string) {}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
},
})
<-ch
assert.True(t, ran)
})
t.Run("event already timeout", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
ran = true
ch <- struct{}{}
return nil
}, func(name string) {}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: time.Now().Add(-time.Minute).UnixNano(),
},
})
select {
case <-ch:
t.FailNow()
case <-time.NewTimer(time.Millisecond * 100).C:
}
assert.False(t, ran)
})
t.Run("retry success", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
counter := atomic.Int32{}
counter.Store(0)
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
current := counter.Add(1)
if current == 2 {
ran = true
ch <- struct{}{}
return nil
}
return errors.New("mocked error")
}, func(name string) {}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
},
})
<-ch
assert.True(t, ran)
})
t.Run("retry until timeout", func(t *testing.T) {
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) {}, time.Millisecond*100)
ch := make(chan struct{}, 1)
go func() {
ddl := time.Now().Add(time.Millisecond * 50)
evt := event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: ddl.UnixNano(),
},
}
em.retryHandlePutEvent(evt)
ch <- struct{}{}
}()
select {
case <-ch:
case <-time.NewTimer(time.Second).C:
t.FailNow()
}
})
t.Run("close behavior", func(t *testing.T) {
ch := make(chan struct{}, 1)
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) {}, time.Millisecond*10)
go func() {
ddl := time.Now().Add(time.Minute)
evt := event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: ddl.UnixNano(),
},
}
em.retryHandlePutEvent(evt)
ch <- struct{}{}
}()
close(em.eventChan)
select {
case <-ch:
case <-time.NewTimer(time.Second).C:
t.FailNow()
}
assert.NotPanics(t, func() {
em.Close()
em.Close()
})
})
t.Run("cancel by delete event", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) {
ran = true
ch <- struct{}{}
}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
},
})
em.handleEvent(event{
eventType: deleteEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
},
})
<-ch
assert.True(t, ran)
})
t.Run("overwrite put event", func(t *testing.T) {
ch := make(chan struct{}, 1)
ran := false
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
if version > 0 {
ran = true
ch <- struct{}{}
return nil
}
return errors.New("mocked error")
}, func(name string) {
t.FailNow()
}, time.Millisecond*10)
em.Run()
em.handleEvent(event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
},
})
em.handleEvent(event{
eventType: putEventType,
vChanName: "",
version: 1,
info: &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(time.Minute).UnixNano(),
},
})
<-ch
assert.True(t, ran)
})
t.Run("canceled by EndStates", func(t *testing.T) {
endStates := []datapb.ChannelWatchState{
datapb.ChannelWatchState_Complete,
datapb.ChannelWatchState_WatchSuccess,
datapb.ChannelWatchState_WatchFailure,
datapb.ChannelWatchState_ReleaseSuccess,
datapb.ChannelWatchState_ReleaseFailure,
}
for _, es := range endStates {
em := newChannelEventManager(func(info *datapb.ChannelWatchInfo, version int64) error {
return errors.New("mocked error")
}, func(name string) { t.FailNow() }, time.Millisecond*100)
ch := make(chan struct{}, 1)
ddl := time.Now().Add(time.Minute)
go func() {
evt := event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
TimeoutTs: ddl.UnixNano(),
},
}
em.retryHandlePutEvent(evt)
ch <- struct{}{}
}()
em.eventChan <- event{
eventType: putEventType,
vChanName: "",
version: 0,
info: &datapb.ChannelWatchInfo{
State: es,
TimeoutTs: ddl.UnixNano(),
},
}
select {
case <-ch:
case <-time.NewTimer(time.Minute).C:
t.FailNow()
}
}
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册