提交 81ac2014 编写于 作者: S sunby 提交者: zhenshan.cao

Add persistency of datanode cluster (#5387)

We save channels registered in datanode in etcd and restore the cluster info after
restarting.
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 3662b3f0
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
const clusterPrefix = "cluster-prefix/"
type dataNodeStatus int8
const (
online dataNodeStatus = iota
offline
)
type dataNodeInfo struct {
info *datapb.DataNodeInfo
status dataNodeStatus
}
type clusterNodeManager struct {
kv kv.TxnKV
dataNodes map[string]*dataNodeInfo
}
func newClusterNodeManager(kv kv.TxnKV) (*clusterNodeManager, error) {
c := &clusterNodeManager{
kv: kv,
dataNodes: make(map[string]*dataNodeInfo),
}
return c, c.loadFromKv()
}
func (c *clusterNodeManager) loadFromKv() error {
_, values, err := c.kv.LoadWithPrefix(clusterPrefix)
if err != nil {
return err
}
for _, v := range values {
info := &datapb.DataNodeInfo{}
if err := proto.UnmarshalText(v, info); err != nil {
return err
}
node := &dataNodeInfo{
info: info,
status: offline,
}
c.dataNodes[info.Address] = node
}
return nil
}
func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *clusterDeltaChange {
newNodes := make([]string, 0)
offlines := make([]string, 0)
restarts := make([]string, 0)
for _, n := range dataNodes {
node, ok := c.dataNodes[n.Address]
if ok {
node.status = online
if node.info.Version != n.Version {
restarts = append(restarts, n.Address)
}
continue
}
newNode := &dataNodeInfo{
info: &datapb.DataNodeInfo{
Address: n.Address,
Version: n.Version,
Channels: []*datapb.ChannelStatus{},
},
status: online,
}
c.dataNodes[n.Address] = newNode
newNodes = append(newNodes, n.Address)
}
for nAddr, node := range c.dataNodes {
if node.status == offline {
offlines = append(offlines, nAddr)
}
}
return &clusterDeltaChange{
newNodes: newNodes,
offlines: offlines,
restarts: restarts,
}
}
func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo) error {
for _, node := range dataNodes {
c.dataNodes[node.Address].info = node
}
return c.txnSaveNodes(dataNodes)
}
func (c *clusterNodeManager) getDataNodes(onlyOnline bool) map[string]*datapb.DataNodeInfo {
ret := make(map[string]*datapb.DataNodeInfo)
for k, v := range c.dataNodes {
if !onlyOnline || v.status == online {
ret[k] = proto.Clone(v.info).(*datapb.DataNodeInfo)
}
}
return ret
}
func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) {
node, ok := c.dataNodes[n.Address]
if ok {
node.status = online
node.info.Version = n.Version
} else {
c.dataNodes[n.Address] = &dataNodeInfo{
info: n,
status: online,
}
}
}
func (c *clusterNodeManager) unregister(n *datapb.DataNodeInfo) {
node, ok := c.dataNodes[n.Address]
if !ok {
return
}
node.status = offline
}
func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo) error {
if len(nodes) == 0 {
return nil
}
data := make(map[string]string)
for _, n := range nodes {
c.dataNodes[n.Address].info = n
key := clusterPrefix + n.Address
value := proto.MarshalTextString(n)
data[key] = value
}
return c.kv.MultiSave(data)
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"sync"
grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
"github.com/milvus-io/milvus/internal/types"
)
const retryTimes = 2
type sessionManager interface {
sendRequest(addr string, executor func(node types.DataNode) error) error
}
type clusterSessionManager struct {
mu sync.RWMutex
sessions map[string]types.DataNode
}
func newClusterSessionManager() *clusterSessionManager {
return &clusterSessionManager{sessions: make(map[string]types.DataNode)}
}
func (m *clusterSessionManager) createSession(addr string) error {
cli := grpcdatanodeclient.NewClient(addr)
if err := cli.Init(); err != nil {
return err
}
if err := cli.Start(); err != nil {
return err
}
m.sessions[addr] = cli
return nil
}
func (m *clusterSessionManager) getSession(addr string) types.DataNode {
return m.sessions[addr]
}
func (m *clusterSessionManager) hasSession(addr string) bool {
_, ok := m.sessions[addr]
return ok
}
func (m *clusterSessionManager) sendRequest(addr string, executor func(node types.DataNode) error) error {
m.mu.Lock()
defer m.mu.Unlock()
success := false
var err error
for i := 0; !success && i < retryTimes; i++ {
if i != 0 || !m.hasSession(addr) {
m.createSession(addr)
}
err = executor(m.getSession(addr))
if err == nil {
return nil
}
}
return err
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"golang.org/x/net/context"
)
type cluster struct {
mu sync.RWMutex
dataManager *clusterNodeManager
sessionManager sessionManager
startupPolicy clusterStartupPolicy
registerPolicy dataNodeRegisterPolicy
unregisterPolicy dataNodeUnregisterPolicy
assginPolicy channelAssignPolicy
}
type clusterOption struct {
apply func(c *cluster)
}
func withStartupPolicy(p clusterStartupPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.startupPolicy = p },
}
}
func withRegisterPolicy(p dataNodeRegisterPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.registerPolicy = p },
}
}
func withUnregistorPolicy(p dataNodeUnregisterPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.unregisterPolicy = p },
}
}
func withAssignPolicy(p channelAssignPolicy) clusterOption {
return clusterOption{
apply: func(c *cluster) { c.assginPolicy = p },
}
}
func defaultStartupPolicy() clusterStartupPolicy {
return newReWatchOnRestartsStartupPolicy()
}
func defaultRegisterPolicy() dataNodeRegisterPolicy {
return newDoNothingRegisterPolicy()
}
func defaultUnregisterPolicy() dataNodeUnregisterPolicy {
return newDoNothingUnregisterPolicy()
}
func defaultAssignPolicy() channelAssignPolicy {
return newAllAssignPolicy()
}
func newCluster(dataManager *clusterNodeManager, sessionManager sessionManager, opts ...clusterOption) *cluster {
c := &cluster{
dataManager: dataManager,
sessionManager: sessionManager,
}
c.startupPolicy = defaultStartupPolicy()
c.registerPolicy = defaultRegisterPolicy()
c.unregisterPolicy = defaultUnregisterPolicy()
c.assginPolicy = defaultAssignPolicy()
for _, opt := range opts {
opt.apply(c)
}
return c
}
func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
deltaChange := c.dataManager.updateCluster(dataNodes)
nodes := c.dataManager.getDataNodes(false)
rets := c.startupPolicy.apply(nodes, deltaChange)
c.dataManager.updateDataNodes(rets)
rets = c.watch(rets)
c.dataManager.updateDataNodes(rets)
return nil
}
func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo {
for _, n := range nodes {
uncompletes := make([]string, 0)
for _, ch := range n.Channels {
if ch.State == datapb.ChannelWatchState_Uncomplete {
uncompletes = append(uncompletes, ch.Name)
}
}
executor := func(cli types.DataNode) error {
req := &datapb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
SourceID: Params.NodeID,
},
ChannelNames: uncompletes,
}
resp, err := cli.WatchDmChannels(context.Background(), req)
if err != nil {
return err
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
return nil
}
for _, ch := range n.Channels {
if ch.State == datapb.ChannelWatchState_Uncomplete {
ch.State = datapb.ChannelWatchState_Complete
}
}
return nil
}
if err := c.sessionManager.sendRequest(n.Address, executor); err != nil {
log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
}
}
return nodes
}
func (c *cluster) register(n *datapb.DataNodeInfo) {
c.mu.Lock()
defer c.mu.Unlock()
c.dataManager.register(n)
cNodes := c.dataManager.getDataNodes(true)
rets := c.registerPolicy.apply(cNodes, n)
c.dataManager.updateDataNodes(rets)
rets = c.watch(rets)
c.dataManager.updateDataNodes(rets)
}
func (c *cluster) unregister(n *datapb.DataNodeInfo) {
c.mu.Lock()
defer c.mu.Unlock()
c.dataManager.unregister(n)
cNodes := c.dataManager.getDataNodes(true)
rets := c.unregisterPolicy.apply(cNodes, n)
c.dataManager.updateDataNodes(rets)
rets = c.watch(rets)
c.dataManager.updateDataNodes(rets)
}
func (c *cluster) watchIfNeeded(channel string) {
c.mu.Lock()
defer c.mu.Unlock()
cNodes := c.dataManager.getDataNodes(true)
rets := c.assginPolicy.apply(cNodes, channel)
c.dataManager.updateDataNodes(rets)
rets = c.watch(rets)
c.dataManager.updateDataNodes(rets)
}
func (c *cluster) flush(segments []*datapb.SegmentInfo) {
c.mu.Lock()
defer c.mu.Unlock()
m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs
for _, seg := range segments {
if _, ok := m[seg.InsertChannel]; !ok {
m[seg.InsertChannel] = make(map[UniqueID][]UniqueID)
}
m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID)
}
dataNodes := c.dataManager.getDataNodes(true)
channel2Node := make(map[string]string)
for _, node := range dataNodes {
for _, chstatus := range node.Channels {
channel2Node[chstatus.Name] = node.Address
}
}
for ch, coll2seg := range m {
node, ok := channel2Node[ch]
if !ok {
continue
}
for coll, segs := range coll2seg {
executor := func(cli types.DataNode) error {
req := &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
SourceID: Params.NodeID,
},
CollectionID: coll,
SegmentIDs: segs,
}
resp, err := cli.FlushSegments(context.Background(), req)
if err != nil {
return err
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("flush segment error", zap.String("dataNode", node), zap.Error(err))
}
return nil
}
if err := c.sessionManager.sendRequest(node, executor); err != nil {
log.Warn("flush segment error", zap.String("dataNode", node), zap.Error(err))
}
}
}
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"testing"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
func TestClusterCreate(t *testing.T) {
cPolicy := newMockStartupPolicy()
cluster := createCluster(t, withStartupPolicy(cPolicy))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
dataNodes := cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
}
func TestRegister(t *testing.T) {
cPolicy := newMockStartupPolicy()
registerPolicy := newDoNothingRegisterPolicy()
cluster := createCluster(t, withStartupPolicy(cPolicy), withRegisterPolicy(registerPolicy))
addr := "localhost:8080"
err := cluster.startup(nil)
assert.Nil(t, err)
cluster.register(&datapb.DataNodeInfo{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
})
dataNodes := cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
}
func TestUnregister(t *testing.T) {
cPolicy := newMockStartupPolicy()
unregisterPolicy := newDoNothingUnregisterPolicy()
cluster := createCluster(t, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
dataNodes := cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
cluster.unregister(&datapb.DataNodeInfo{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
})
dataNodes = cluster.dataManager.getDataNodes(false)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, offline, cluster.dataManager.dataNodes[addr].status)
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
}
func TestWatchIfNeeded(t *testing.T) {
cPolicy := newMockStartupPolicy()
cluster := createCluster(t, withStartupPolicy(cPolicy))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
dataNodes := cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
chName := "ch1"
cluster.watchIfNeeded(chName)
dataNodes = cluster.dataManager.getDataNodes(true)
assert.EqualValues(t, 1, len(dataNodes[addr].Channels))
assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name)
cluster.watchIfNeeded(chName)
assert.EqualValues(t, 1, len(dataNodes[addr].Channels))
assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name)
}
func TestFlushSegments(t *testing.T) {
cPolicy := newMockStartupPolicy()
cluster := createCluster(t, withStartupPolicy(cPolicy))
addr := "localhost:8080"
nodes := []*datapb.DataNodeInfo{
{
Address: addr,
Version: 1,
Channels: []*datapb.ChannelStatus{},
},
}
err := cluster.startup(nodes)
assert.Nil(t, err)
segments := []*datapb.SegmentInfo{
{
ID: 0,
CollectionID: 0,
InsertChannel: "ch1",
},
}
cluster.flush(segments)
}
func createCluster(t *testing.T, options ...clusterOption) *cluster {
kv := memkv.NewMemoryKV()
sessionManager := newMockSessionManager()
dataManager, err := newClusterNodeManager(kv)
assert.Nil(t, err)
return newCluster(dataManager, sessionManager, options...)
}
......@@ -16,6 +16,7 @@ import (
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -287,3 +288,25 @@ func (m *mockMasterService) GetDdChannel(ctx context.Context) (*milvuspb.StringR
func (m *mockMasterService) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
type mockStartupPolicy struct {
}
func newMockStartupPolicy() clusterStartupPolicy {
return &mockStartupPolicy{}
}
func (p *mockStartupPolicy) apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo {
return nil
}
type mockSessionManager struct {
}
func newMockSessionManager() sessionManager {
return &mockSessionManager{}
}
func (m *mockSessionManager) sendRequest(addr string, executor func(node types.DataNode) error) error {
return nil
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"fmt"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
type clusterDeltaChange struct {
newNodes []string
offlines []string
restarts []string
}
type clusterStartupPolicy interface {
apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo
}
type reWatchOnRestartsStartupPolicy struct {
}
func newReWatchOnRestartsStartupPolicy() clusterStartupPolicy {
return &reWatchOnRestartsStartupPolicy{}
}
func (p *reWatchOnRestartsStartupPolicy) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, addr := range delta.restarts {
node := cluster[addr]
for _, ch := range node.Channels {
ch.State = datapb.ChannelWatchState_Uncomplete
}
ret = append(ret, node)
}
return ret
}
type dataNodeRegisterPolicy interface {
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
type doNothingRegisterPolicy struct {
}
func newDoNothingRegisterPolicy() dataNodeRegisterPolicy {
return &doNothingRegisterPolicy{}
}
func (p *doNothingRegisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
return []*datapb.DataNodeInfo{session}
}
type dataNodeUnregisterPolicy interface {
apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
}
type doNothingUnregisterPolicy struct {
}
func newDoNothingUnregisterPolicy() dataNodeUnregisterPolicy {
return &doNothingUnregisterPolicy{}
}
func (p *doNothingUnregisterPolicy) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
return nil
}
type channelAssignPolicy interface {
apply(cluster map[string]*datapb.DataNodeInfo, channel string) []*datapb.DataNodeInfo
}
type allAssignPolicy struct {
}
func newAllAssignPolicy() channelAssignPolicy {
return &allAssignPolicy{}
}
func (p *allAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, node := range cluster {
fmt.Printf("xxxxnode: %v\n", node.Address)
has := false
for _, ch := range node.Channels {
if ch.Name == channel {
has = true
break
}
}
if has {
continue
}
node.Channels = append(node.Channels, &datapb.ChannelStatus{
Name: channel,
State: datapb.ChannelWatchState_Uncomplete,
})
fmt.Printf("channelxxxx: %v\n", node.Channels)
ret = append(ret, node)
}
return ret
}
......@@ -323,6 +323,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
log.Debug("DataService AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
// try to restore last processed pos
pos, err := s.loadStreamLastPos(streamTypeStats)
log.Debug("load last pos of stats channel", zap.Any("pos", pos))
if err == nil {
err = statsStream.Seek([]*internalpb.MsgPosition{pos})
if err != nil {
......
......@@ -18,6 +18,7 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -28,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)
func TestRegisterNode(t *testing.T) {
......@@ -796,6 +798,7 @@ func TestResumeChannel(t *testing.T) {
defer svr.meta.RUnlock()
for _, segID := range segmentIDs {
seg, has := svr.meta.segments[segID]
log.Debug("check segment in meta", zap.Any("id", seg.ID), zap.Any("has", has))
assert.True(t, has)
if has {
assert.Equal(t, segRows, seg.NumRows)
......
......@@ -276,6 +276,22 @@ message FieldBinlog{
repeated string binlogs = 2;
}
enum ChannelWatchState {
Uncomplete = 0;
Complete = 1;
}
message ChannelStatus {
string name = 1;
ChannelWatchState state=2;
}
message DataNodeInfo {
string address = 1;
int64 version = 2;
repeated ChannelStatus channels = 3;
}
message GetRecoveryInfoResponse {
common.MsgBase base = 1;
repeated VchannelInfo channels = 2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册