未验证 提交 e1b3273d 编写于 作者: C congqixia 提交者: GitHub

Add Datanode watch etcd channel (#6965)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 5c383d1d
......@@ -21,16 +21,22 @@ import (
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -49,6 +55,9 @@ const (
// MetricRequestsSuccess used to count the num of successful requests
MetricRequestsSuccess = "success"
// ConnectEtcdMaxRetryTime used to limit the max retry time for connection etcd
ConnectEtcdMaxRetryTime = 1000
)
// DataNode communicates with outside services and unioun all
......@@ -82,7 +91,8 @@ type DataNode struct {
rootCoord types.RootCoord
dataCoord types.DataCoord
session *sessionutil.Session
session *sessionutil.Session
kvClient *etcdkv.EtcdKV
closer io.Closer
......@@ -138,6 +148,9 @@ func (node *DataNode) Register() error {
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = node.session.ServerID
node.NodeID = node.session.ServerID
// Start node watch node
go node.StartWatchChannels(node.ctx)
Params.initMsgChannelSubName()
log.Debug("DataNode Init",
......@@ -156,6 +169,69 @@ func (node *DataNode) Init() error {
return nil
}
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
func (node *DataNode) StartWatchChannels(ctx context.Context) {
defer logutil.LogPanic()
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
watchPrefix := fmt.Sprintf("channel/%d", node.NodeID)
evtChan := node.kvClient.WatchWithPrefix(watchPrefix)
for {
select {
case <-ctx.Done():
log.Debug("watch etcd loop quit")
return
case event := <-evtChan:
if event.Canceled { // failed to watch
log.Warn("Watch channel failed", zap.Error(event.Err()))
// if watch loop return due to event canceled, the datanode is not functional anymore
// stop the datanode and wait for restart
node.Stop()
return
}
for _, evt := range event.Events {
go node.handleChannelEvt(evt)
}
}
}
}
// handleChannelEvt handels event from kv watch event
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
switch evt.Type {
case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
watchInfo := datapb.ChannelWatchInfo{}
err := proto.Unmarshal(evt.Kv.Value, &watchInfo)
if err != nil {
log.Warn("fail to parse ChannelWatchInfo", zap.String("key", string(evt.Kv.Key)), zap.Error(err))
return
}
if watchInfo.State == datapb.ChannelWatchState_Complete {
return
}
if watchInfo.Vchan == nil {
log.Warn("found ChannelWatchInfo with nil VChannelInfo", zap.String("key", string(evt.Kv.Key)))
return
}
err = node.NewDataSyncService(watchInfo.Vchan)
if err != nil {
log.Warn("fail to create DataSyncService", zap.String("key", string(evt.Kv.Key)), zap.Error(err))
return
}
watchInfo.State = datapb.ChannelWatchState_Complete
v, _ := proto.Marshal(&watchInfo)
err = node.kvClient.Save(fmt.Sprintf("channel/%d/%s", node.NodeID, watchInfo.Vchan.ChannelName), string(v))
if err != nil {
log.Warn("fail to change WatchState to complete", zap.String("key", string(evt.Kv.Key)), zap.Error(err))
node.ReleaseDataSyncService(string(evt.Kv.Key))
// maybe retry logic and exit logic
}
case clientv3.EventTypeDelete:
// guaranteed there is no "/" in channel name
parts := strings.Split(string(evt.Kv.Key), "/")
node.ReleaseDataSyncService(parts[len(parts)-1])
}
}
// NewDataSyncService adds a new dataSyncService for new dmlVchannel and starts dataSyncService.
func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
node.chanMut.Lock()
......@@ -238,6 +314,23 @@ func (node *DataNode) Start() error {
},
Count: 1,
})
if err != nil {
log.Warn("fail to alloc timestamp", zap.Error(err))
return err
}
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
if err != nil {
return err
}
node.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
return nil
}
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
if err != nil {
return errors.New("DataNode fail to connect etcd")
}
if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil {
return errors.New("DataNode fail to start")
......
......@@ -13,15 +13,21 @@ package datanode
import (
"context"
"fmt"
"math"
"math/rand"
"os"
"strings"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -30,6 +36,7 @@ import (
)
func TestMain(t *testing.M) {
rand.Seed(time.Now().Unix())
Params.InitAlias("datanode-alias-1")
Params.Init()
refreshChannelNames()
......@@ -315,3 +322,70 @@ func TestDataNode(t *testing.T) {
<-node.ctx.Done()
node.Stop()
}
func TestWatchChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx)
node.Init()
node.Start()
node.Register()
defer cancel()
t.Run("test watch channel", func(t *testing.T) {
client, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
assert.Nil(t, err)
if assert.NotNil(t, client) {
kv := etcdkv.NewEtcdKV(client, Params.MetaRootPath)
ch := fmt.Sprintf("datanode-etcd-test-channel_%d", rand.Int31())
path := fmt.Sprintf("channel/%d/%s", node.NodeID, ch)
c := make(chan struct{})
go func() {
ec := kv.WatchWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
cnt := 0
for {
evt := <-ec
for _, event := range evt.Events {
if strings.Contains(string(event.Kv.Key), ch) {
cnt++
}
}
if cnt >= 2 {
break
}
}
c <- struct{}{}
}()
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: ch,
UnflushedSegments: []*datapb.SegmentInfo{},
}
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_Uncomplete,
Vchan: vchan,
}
val, err := proto.Marshal(info)
assert.Nil(t, err)
err = kv.Save(path, string(val))
assert.Nil(t, err)
<-c
node.chanMut.RLock()
_, has := node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.True(t, has)
kv.RemoveWithPrefix(fmt.Sprintf("channel/%d", node.NodeID))
//TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
node.chanMut.RLock()
_, has = node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.False(t, has)
}
})
}
......@@ -306,3 +306,8 @@ message SegmentFlushCompletedMsg {
SegmentInfo segment = 2;
}
message ChannelWatchInfo {
VchannelInfo vchan= 1;
int64 startTs = 2;
ChannelWatchState state = 3;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册