未验证 提交 dfc6670f 编写于 作者: X xige-16 提交者: GitHub

Unsubscribe channel after query node down (#15230)

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 7b48dc7b
...@@ -308,6 +308,16 @@ message CollectionInfo { ...@@ -308,6 +308,16 @@ message CollectionInfo {
int64 inMemory_percentage = 7; int64 inMemory_percentage = 7;
} }
message UnsubscribeChannels {
int64 collectionID = 1;
repeated string channels = 2;
}
message UnsubscribeChannelInfo {
int64 nodeID = 1;
repeated UnsubscribeChannels collection_channels = 2;
}
//---- synchronize messages proto between QueryCoord and QueryNode ----- //---- synchronize messages proto between QueryCoord and QueryNode -----
message SegmentChangeInfo { message SegmentChangeInfo {
int64 online_nodeID = 1; int64 online_nodeID = 1;
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"container/list"
"context"
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
)
const (
unsubscribeChannelInfoPrefix = "queryCoord-unsubscribeChannelInfo"
unsubscribeChannelCheckInterval = time.Second
)
type channelUnsubscribeHandler struct {
ctx context.Context
cancel context.CancelFunc
kvClient *etcdkv.EtcdKV
factory msgstream.Factory
channelInfos *list.List
downNodeChan chan int64
wg sync.WaitGroup
}
// newChannelUnsubscribeHandler create a new handler service to unsubscribe channels
func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factory msgstream.Factory) (*channelUnsubscribeHandler, error) {
childCtx, cancel := context.WithCancel(ctx)
handler := &channelUnsubscribeHandler{
ctx: childCtx,
cancel: cancel,
kvClient: kv,
factory: factory,
channelInfos: list.New(),
//TODO:: if the query nodes that are down exceed 1024, query coord will not be able to restart
downNodeChan: make(chan int64, 1024),
}
err := handler.reloadFromKV()
if err != nil {
return nil, err
}
return handler, nil
}
// reloadFromKV reload unsolved channels to unsubscribe
func (csh *channelUnsubscribeHandler) reloadFromKV() error {
log.Debug("start reload unsubscribe channelInfo from kv")
_, channelInfoValues, err := csh.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix)
if err != nil {
return err
}
for _, value := range channelInfoValues {
channelInfo := &querypb.UnsubscribeChannelInfo{}
err = proto.Unmarshal([]byte(value), channelInfo)
if err != nil {
return err
}
csh.channelInfos.PushBack(channelInfo)
csh.downNodeChan <- channelInfo.NodeID
}
return nil
}
// addUnsubscribeChannelInfo add channel info to handler service, and persistent to etcd
func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.UnsubscribeChannelInfo) {
nodeID := info.NodeID
channelInfoValue, err := proto.Marshal(info)
if err != nil {
panic(err)
}
// when queryCoord is restarted multiple times, the nodeID of added channelInfo may be the same
hasEnqueue := false
for e := csh.channelInfos.Back(); e != nil; e = e.Prev() {
if e.Value.(*querypb.UnsubscribeChannelInfo).NodeID == nodeID {
hasEnqueue = true
}
}
if !hasEnqueue {
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err = csh.kvClient.Save(channelInfoKey, string(channelInfoValue))
if err != nil {
panic(err)
}
csh.channelInfos.PushBack(info)
csh.downNodeChan <- info.NodeID
log.Debug("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID))
}
}
// handleChannelUnsubscribeLoop handle the unsubscription of channels which query node has watched
func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
defer csh.wg.Done()
for {
select {
case <-csh.ctx.Done():
log.Debug("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end")
return
case <-csh.downNodeChan:
channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo)
nodeID := channelInfo.NodeID
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
panic(err)
}
}
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err := csh.kvClient.Remove(channelInfoKey)
if err != nil {
log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID))
panic(err)
}
log.Debug("unsubscribe channels success", zap.Int64("nodeID", nodeID))
}
}
}
func (csh *channelUnsubscribeHandler) start() {
csh.wg.Add(1)
go csh.handleChannelUnsubscribeLoop()
}
func (csh *channelUnsubscribeHandler) close() {
csh.cancel()
csh.wg.Wait()
}
// unsubscribeChannels create consumer fist, and unsubscribe channel through msgStream.close()
func unsubscribeChannels(ctx context.Context, factory msgstream.Factory, subName string, channels []string) error {
msgStream, err := factory.NewMsgStream(ctx)
if err != nil {
return err
}
msgStream.AsConsumer(channels, subName)
msgStream.Close()
return nil
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"context"
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/etcd"
)
func Test_HandlerReloadFromKV(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID)
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: defaultQueryNodeID,
}
channelInfoBytes, err := proto.Marshal(unsubscribeChannelInfo)
assert.Nil(t, err)
err = kv.Save(channelInfoKey, string(channelInfoBytes))
assert.Nil(t, err)
factory := msgstream.NewPmsFactory()
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
assert.Equal(t, 1, len(handler.downNodeChan))
cancel()
}
func Test_AddUnsubscribeChannelInfo(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
factory := msgstream.NewPmsFactory()
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
collectionChannels := &querypb.UnsubscribeChannels{
CollectionID: defaultCollectionID,
Channels: []string{"test-channel"},
}
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: defaultQueryNodeID,
CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels},
}
handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
frontValue := handler.channelInfos.Front()
assert.NotNil(t, frontValue)
assert.Equal(t, defaultQueryNodeID, frontValue.Value.(*querypb.UnsubscribeChannelInfo).NodeID)
// repeat nodeID which has down
handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
assert.Equal(t, 1, len(handler.downNodeChan))
cancel()
}
func Test_HandleChannelUnsubscribeLoop(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
factory.SetParams(m)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
collectionChannels := &querypb.UnsubscribeChannels{
CollectionID: defaultCollectionID,
Channels: []string{"test-channel"},
}
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: defaultQueryNodeID,
CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels},
}
handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID)
_, err = kv.Load(channelInfoKey)
assert.Nil(t, err)
handler.start()
for {
_, err = kv.Load(channelInfoKey)
if err != nil {
break
}
}
cancel()
}
...@@ -103,6 +103,7 @@ type queryNodeCluster struct { ...@@ -103,6 +103,7 @@ type queryNodeCluster struct {
sync.RWMutex sync.RWMutex
clusterMeta Meta clusterMeta Meta
handler *channelUnsubscribeHandler
nodes map[int64]Node nodes map[int64]Node
newNodeFn newQueryNodeFn newNodeFn newQueryNodeFn
segmentAllocator SegmentAllocatePolicy segmentAllocator SegmentAllocatePolicy
...@@ -110,7 +111,7 @@ type queryNodeCluster struct { ...@@ -110,7 +111,7 @@ type queryNodeCluster struct {
segSizeEstimator func(request *querypb.LoadSegmentsRequest, dataKV kv.DataKV) (int64, error) segSizeEstimator func(request *querypb.LoadSegmentsRequest, dataKV kv.DataKV) (int64, error)
} }
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (Cluster, error) { func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session, handler *channelUnsubscribeHandler) (Cluster, error) {
childCtx, cancel := context.WithCancel(ctx) childCtx, cancel := context.WithCancel(ctx)
nodes := make(map[int64]Node) nodes := make(map[int64]Node)
c := &queryNodeCluster{ c := &queryNodeCluster{
...@@ -119,6 +120,7 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK ...@@ -119,6 +120,7 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK
client: kv, client: kv,
session: session, session: session,
clusterMeta: clusterMeta, clusterMeta: clusterMeta,
handler: handler,
nodes: nodes, nodes: nodes,
newNodeFn: newNodeFn, newNodeFn: newNodeFn,
segmentAllocator: defaultSegAllocatePolicy(), segmentAllocator: defaultSegAllocatePolicy(),
...@@ -546,6 +548,26 @@ func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetri ...@@ -546,6 +548,26 @@ func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetri
return ret return ret
} }
// setNodeState update queryNode state, which may be offline, disconnect, online
// when queryCoord restart, it will call setNodeState via the registerNode function
// when the new queryNode starts, queryCoord calls setNodeState via the registerNode function
// when the new queryNode down, queryCoord calls setNodeState via the stopNode function
func (c *queryNodeCluster) setNodeState(nodeID int64, node Node, state nodeState) {
// if query node down, should unsubscribe all channel the node has watched
// if not unsubscribe channel, may result in pulsar having too many backlogs
if state == offline {
// 1. find all the search/dmChannel/deltaChannel the node has watched
unsubscribeChannelInfo := c.clusterMeta.getWatchedChannelsByNodeID(nodeID)
// 2.add unsubscribed channels to handler, handler will auto unsubscribe channel
if len(unsubscribeChannelInfo.CollectionChannels) != 0 {
c.handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
}
}
node.setState(state)
}
func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error { func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
...@@ -566,7 +588,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti ...@@ -566,7 +588,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
log.Debug("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err)) log.Debug("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err))
return err return err
} }
node.setState(state) c.setNodeState(id, node, state)
if state < online { if state < online {
go node.start() go node.start()
} }
...@@ -614,6 +636,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) { ...@@ -614,6 +636,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) {
if node, ok := c.nodes[nodeID]; ok { if node, ok := c.nodes[nodeID]; ok {
node.stop() node.stop()
c.setNodeState(nodeID, node, offline)
log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID)) log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
} }
} }
......
...@@ -424,12 +424,21 @@ func TestReloadClusterFromKV(t *testing.T) { ...@@ -424,12 +424,21 @@ func TestReloadClusterFromKV(t *testing.T) {
t.Run("Test LoadOfflineNodes", func(t *testing.T) { t.Run("Test LoadOfflineNodes", func(t *testing.T) {
refreshParams() refreshParams()
ctx, cancel := context.WithCancel(context.Background())
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli)
clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false)
clusterSession.Register() clusterSession.Register()
factory := msgstream.NewPmsFactory()
handler, err := newChannelUnsubscribeHandler(ctx, kv, factory)
assert.Nil(t, err)
meta, err := newMeta(ctx, kv, factory, nil)
assert.Nil(t, err)
cluster := &queryNodeCluster{ cluster := &queryNodeCluster{
client: kv, client: kv,
handler: handler,
clusterMeta: meta,
nodes: make(map[int64]Node), nodes: make(map[int64]Node),
newNodeFn: newQueryNodeTest, newNodeFn: newQueryNodeTest,
session: clusterSession, session: clusterSession,
...@@ -454,6 +463,7 @@ func TestReloadClusterFromKV(t *testing.T) { ...@@ -454,6 +463,7 @@ func TestReloadClusterFromKV(t *testing.T) {
err = removeAllSession() err = removeAllSession()
assert.Nil(t, err) assert.Nil(t, err)
cancel()
}) })
} }
...@@ -480,11 +490,15 @@ func TestGrpcRequest(t *testing.T) { ...@@ -480,11 +490,15 @@ func TestGrpcRequest(t *testing.T) {
meta, err := newMeta(baseCtx, kv, factory, idAllocator) meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err) assert.Nil(t, err)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
cluster := &queryNodeCluster{ cluster := &queryNodeCluster{
ctx: baseCtx, ctx: baseCtx,
cancel: cancel, cancel: cancel,
client: kv, client: kv,
clusterMeta: meta, clusterMeta: meta,
handler: handler,
nodes: make(map[int64]Node), nodes: make(map[int64]Node),
newNodeFn: newQueryNodeTest, newNodeFn: newQueryNodeTest,
session: clusterSession, session: clusterSession,
...@@ -686,3 +700,71 @@ func TestEstimateSegmentSize(t *testing.T) { ...@@ -686,3 +700,71 @@ func TestEstimateSegmentSize(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, int64(2048), size) assert.Equal(t, int64(2048), size)
} }
func TestSetNodeState(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli)
clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false)
clusterSession.Register()
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = factory.SetParams(m)
assert.Nil(t, err)
idAllocator := func() (UniqueID, error) {
return 0, nil
}
meta, err := newMeta(baseCtx, kv, factory, idAllocator)
assert.Nil(t, err)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
cluster := &queryNodeCluster{
ctx: baseCtx,
cancel: cancel,
client: kv,
clusterMeta: meta,
handler: handler,
nodes: make(map[int64]Node),
newNodeFn: newQueryNodeTest,
session: clusterSession,
segSizeEstimator: segSizeEstimateForTest,
}
node, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
err = cluster.registerNode(baseCtx, node.session, node.queryNodeID, disConnect)
assert.Nil(t, err)
waitQueryNodeOnline(cluster, node.queryNodeID)
dmChannelWatchInfo := &querypb.DmChannelWatchInfo{
CollectionID: defaultCollectionID,
DmChannel: "test-dmChannel",
NodeIDLoaded: node.queryNodeID,
}
err = meta.setDmChannelInfos([]*querypb.DmChannelWatchInfo{dmChannelWatchInfo})
assert.Nil(t, err)
deltaChannelInfo := &datapb.VchannelInfo{
CollectionID: defaultCollectionID,
ChannelName: "test-deltaChannel",
}
err = meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{deltaChannelInfo})
assert.Nil(t, err)
nodeInfo, err := cluster.getNodeInfoByID(node.queryNodeID)
assert.Nil(t, err)
cluster.setNodeState(node.queryNodeID, nodeInfo, offline)
assert.Equal(t, 1, len(handler.downNodeChan))
node.stop()
removeAllSession()
cancel()
}
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util"
) )
...@@ -86,6 +87,8 @@ type Meta interface { ...@@ -86,6 +87,8 @@ type Meta interface {
saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error)
removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error)
sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error) sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error)
getWatchedChannelsByNodeID(nodeID int64) *querypb.UnsubscribeChannelInfo
} }
// MetaReplica records the current load information on all querynodes // MetaReplica records the current load information on all querynodes
...@@ -995,6 +998,75 @@ func (m *MetaReplica) setLoadPercentage(collectionID UniqueID, partitionID Uniqu ...@@ -995,6 +998,75 @@ func (m *MetaReplica) setLoadPercentage(collectionID UniqueID, partitionID Uniqu
return nil return nil
} }
func (m *MetaReplica) getWatchedChannelsByNodeID(nodeID int64) *querypb.UnsubscribeChannelInfo {
// 1. find all the search/dmChannel/deltaChannel the node has watched
colID2DmChannels := make(map[UniqueID][]string)
colID2DeltaChannels := make(map[UniqueID][]string)
colID2QueryChannel := make(map[UniqueID]string)
dmChannelInfos := m.getDmChannelInfosByNodeID(nodeID)
// get dmChannel/search channel the node has watched
for _, channelInfo := range dmChannelInfos {
collectionID := channelInfo.CollectionID
dmChannel := rootcoord.ToPhysicalChannel(channelInfo.DmChannel)
if _, ok := colID2DmChannels[collectionID]; !ok {
colID2DmChannels[collectionID] = []string{}
}
colID2DmChannels[collectionID] = append(colID2DmChannels[collectionID], dmChannel)
if _, ok := colID2QueryChannel[collectionID]; !ok {
queryChannelInfo := m.getQueryChannelInfoByID(collectionID)
colID2QueryChannel[collectionID] = queryChannelInfo.QueryChannel
}
}
segmentInfos := m.getSegmentInfosByNode(nodeID)
// get delta/search channel the node has watched
for _, segmentInfo := range segmentInfos {
collectionID := segmentInfo.CollectionID
if _, ok := colID2DeltaChannels[collectionID]; !ok {
deltaChanelInfos, err := m.getDeltaChannelsByCollectionID(collectionID)
if err != nil {
// all nodes succeeded in releasing the Data, but queryCoord hasn't cleaned up the meta in time, and a Node went down
// and meta was cleaned after m.getSegmentInfosByNode(nodeID)
continue
}
deltaChannels := make([]string, len(deltaChanelInfos))
for offset, channelInfo := range deltaChanelInfos {
deltaChannels[offset] = rootcoord.ToPhysicalChannel(channelInfo.ChannelName)
}
colID2DeltaChannels[collectionID] = deltaChannels
}
if _, ok := colID2QueryChannel[collectionID]; !ok {
queryChannelInfo := m.getQueryChannelInfoByID(collectionID)
colID2QueryChannel[collectionID] = queryChannelInfo.QueryChannel
}
}
// creating unsubscribeChannelInfo, which will be written to etcd
colID2Channels := make(map[UniqueID][]string)
for collectionID, channels := range colID2DmChannels {
colID2Channels[collectionID] = append(colID2Channels[collectionID], channels...)
}
for collectionID, channels := range colID2DeltaChannels {
colID2Channels[collectionID] = append(colID2Channels[collectionID], channels...)
}
for collectionID, channel := range colID2QueryChannel {
colID2Channels[collectionID] = append(colID2Channels[collectionID], channel)
}
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: nodeID,
}
for collectionID, channels := range colID2Channels {
unsubscribeChannelInfo.CollectionChannels = append(unsubscribeChannelInfo.CollectionChannels,
&querypb.UnsubscribeChannels{
CollectionID: collectionID,
Channels: channels,
})
}
return unsubscribeChannelInfo
}
//func (m *MetaReplica) printMeta() { //func (m *MetaReplica) printMeta() {
// m.RLock() // m.RLock()
// defer m.RUnlock() // defer m.RUnlock()
......
...@@ -78,6 +78,7 @@ type QueryCoord struct { ...@@ -78,6 +78,7 @@ type QueryCoord struct {
queryCoordID uint64 queryCoordID uint64
meta Meta meta Meta
cluster Cluster cluster Cluster
handler *channelUnsubscribeHandler
newNodeFn newQueryNodeFn newNodeFn newQueryNodeFn
scheduler *TaskScheduler scheduler *TaskScheduler
idAllocator func() (UniqueID, error) idAllocator func() (UniqueID, error)
...@@ -161,8 +162,15 @@ func (qc *QueryCoord) Init() error { ...@@ -161,8 +162,15 @@ func (qc *QueryCoord) Init() error {
return return
} }
// init channelUnsubscribeHandler
qc.handler, initError = newChannelUnsubscribeHandler(qc.loopCtx, qc.kvClient, qc.msFactory)
if initError != nil {
log.Error("query coordinator init channelUnsubscribeHandler failed", zap.Error(initError))
return
}
// init cluster // init cluster
qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session) qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session, qc.handler)
if initError != nil { if initError != nil {
log.Error("query coordinator init cluster failed", zap.Error(initError)) log.Error("query coordinator init cluster failed", zap.Error(initError))
return return
...@@ -204,6 +212,9 @@ func (qc *QueryCoord) Start() error { ...@@ -204,6 +212,9 @@ func (qc *QueryCoord) Start() error {
qc.indexChecker.start() qc.indexChecker.start()
log.Debug("start index checker ...") log.Debug("start index checker ...")
qc.handler.start()
log.Debug("start channel unsubscribe loop ...")
Params.QueryCoordCfg.CreatedTime = time.Now() Params.QueryCoordCfg.CreatedTime = time.Now()
Params.QueryCoordCfg.UpdatedTime = time.Now() Params.QueryCoordCfg.UpdatedTime = time.Now()
...@@ -237,6 +248,11 @@ func (qc *QueryCoord) Stop() error { ...@@ -237,6 +248,11 @@ func (qc *QueryCoord) Stop() error {
log.Debug("close index checker ...") log.Debug("close index checker ...")
} }
if qc.handler != nil {
qc.handler.close()
log.Debug("close channel unsubscribe loop ...")
}
if qc.loopCancel != nil { if qc.loopCancel != nil {
qc.loopCancel() qc.loopCancel()
log.Info("cancel the loop of QueryCoord") log.Info("cancel the loop of QueryCoord")
......
...@@ -132,9 +132,9 @@ func (qn *queryNode) start() error { ...@@ -132,9 +132,9 @@ func (qn *queryNode) start() error {
} }
func (qn *queryNode) stop() { func (qn *queryNode) stop() {
qn.stateLock.Lock() //qn.stateLock.Lock()
defer qn.stateLock.Unlock() //defer qn.stateLock.Unlock()
qn.state = offline //qn.state = offline
if qn.client != nil { if qn.client != nil {
qn.client.Stop() qn.client.Stop()
} }
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
minioKV "github.com/milvus-io/milvus/internal/kv/minio" minioKV "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
...@@ -39,13 +40,17 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { ...@@ -39,13 +40,17 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli)
clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false)
meta, err := newMeta(baseCtx, kv, nil, nil) factory := msgstream.NewPmsFactory()
meta, err := newMeta(baseCtx, kv, factory, nil)
assert.Nil(t, err)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err) assert.Nil(t, err)
cluster := &queryNodeCluster{ cluster := &queryNodeCluster{
ctx: baseCtx, ctx: baseCtx,
cancel: cancel, cancel: cancel,
client: kv, client: kv,
clusterMeta: meta, clusterMeta: meta,
handler: handler,
nodes: make(map[int64]Node), nodes: make(map[int64]Node),
newNodeFn: newQueryNodeTest, newNodeFn: newQueryNodeTest,
session: clusterSession, session: clusterSession,
......
...@@ -538,6 +538,19 @@ func (rct *releaseCollectionTask) timestamp() Timestamp { ...@@ -538,6 +538,19 @@ func (rct *releaseCollectionTask) timestamp() Timestamp {
return rct.Base.Timestamp return rct.Base.Timestamp
} }
func (rct *releaseCollectionTask) updateTaskProcess() {
collectionID := rct.CollectionID
parentTask := rct.getParentTask()
if parentTask == nil {
// all queryNodes have successfully released the data, clean up collectionMeta
err := rct.meta.releaseCollection(collectionID)
if err != nil {
log.Error("releaseCollectionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rct.Base.MsgID), zap.Error(err))
panic(err)
}
}
}
func (rct *releaseCollectionTask) preExecute(context.Context) error { func (rct *releaseCollectionTask) preExecute(context.Context) error {
collectionID := rct.CollectionID collectionID := rct.CollectionID
rct.setResultInfo(nil) rct.setResultInfo(nil)
...@@ -548,7 +561,8 @@ func (rct *releaseCollectionTask) preExecute(context.Context) error { ...@@ -548,7 +561,8 @@ func (rct *releaseCollectionTask) preExecute(context.Context) error {
} }
func (rct *releaseCollectionTask) execute(ctx context.Context) error { func (rct *releaseCollectionTask) execute(ctx context.Context) error {
defer rct.reduceRetryCount() // cancel the maximum number of retries for queryNode cleaning data until the data is completely freed
// defer rct.reduceRetryCount()
collectionID := rct.CollectionID collectionID := rct.CollectionID
// if nodeID ==0, it means that the release request has not been assigned to the specified query node // if nodeID ==0, it means that the release request has not been assigned to the specified query node
...@@ -593,20 +607,12 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error { ...@@ -593,20 +607,12 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
rct.addChildTask(releaseCollectionTask) rct.addChildTask(releaseCollectionTask)
log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask))
} }
//TODO::xige-16 delete collection info from should wait all internal release task execute done
// if some query nodes release collection failed, the collection data will can't be removed from query node
err = rct.meta.releaseCollection(collectionID)
if err != nil {
log.Error("releaseCollectionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rct.Base.MsgID), zap.Error(err))
rct.setResultInfo(err)
return err
}
} else { } else {
err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest)
if err != nil { if err != nil {
log.Error("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID))
rct.setResultInfo(err) // after release failed, the task will always redo
// if the query node happens to be down, the node release was judged to have succeeded
return err return err
} }
} }
...@@ -911,6 +917,21 @@ func (rpt *releasePartitionTask) timestamp() Timestamp { ...@@ -911,6 +917,21 @@ func (rpt *releasePartitionTask) timestamp() Timestamp {
return rpt.Base.Timestamp return rpt.Base.Timestamp
} }
func (rpt *releasePartitionTask) updateTaskProcess() {
collectionID := rpt.CollectionID
partitionIDs := rpt.PartitionIDs
parentTask := rpt.getParentTask()
if parentTask == nil {
// all queryNodes have successfully released the data, clean up collectionMeta
err := rpt.meta.releasePartitions(collectionID, partitionIDs)
if err != nil {
log.Error("releasePartitionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID), zap.Error(err))
panic(err)
}
}
}
func (rpt *releasePartitionTask) preExecute(context.Context) error { func (rpt *releasePartitionTask) preExecute(context.Context) error {
collectionID := rpt.CollectionID collectionID := rpt.CollectionID
partitionIDs := rpt.PartitionIDs partitionIDs := rpt.PartitionIDs
...@@ -923,7 +944,8 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error { ...@@ -923,7 +944,8 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error {
} }
func (rpt *releasePartitionTask) execute(ctx context.Context) error { func (rpt *releasePartitionTask) execute(ctx context.Context) error {
defer rpt.reduceRetryCount() // cancel the maximum number of retries for queryNode cleaning data until the data is completely freed
// defer rpt.reduceRetryCount()
collectionID := rpt.CollectionID collectionID := rpt.CollectionID
partitionIDs := rpt.PartitionIDs partitionIDs := rpt.PartitionIDs
...@@ -944,20 +966,12 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error { ...@@ -944,20 +966,12 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
rpt.addChildTask(releasePartitionTask) rpt.addChildTask(releasePartitionTask)
log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID)) log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID))
} }
//TODO::xige-16 delete partition info from meta should wait all internal release task execute done
// if some query nodes release partitions failed, the partition data will can't be removed from query node
err := rpt.meta.releasePartitions(collectionID, partitionIDs)
if err != nil {
log.Error("releasePartitionTask: release collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID), zap.Error(err))
rpt.setResultInfo(err)
return err
}
} else { } else {
err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest)
if err != nil { if err != nil {
log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.Int64("collectionID", collectionID), zap.String("nodeID", fmt.Sprintln(rpt.NodeID))) log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.Int64("collectionID", collectionID), zap.String("nodeID", fmt.Sprintln(rpt.NodeID)))
rpt.setResultInfo(err) // after release failed, the task will always redo
// if the query node happens to be down, the node release was judged to have succeeded
return err return err
} }
} }
...@@ -1288,6 +1302,15 @@ func (wdt *watchDeltaChannelTask) marshal() ([]byte, error) { ...@@ -1288,6 +1302,15 @@ func (wdt *watchDeltaChannelTask) marshal() ([]byte, error) {
return proto.Marshal(wdt.WatchDeltaChannelsRequest) return proto.Marshal(wdt.WatchDeltaChannelsRequest)
} }
func (wdt *watchDeltaChannelTask) isValid() bool {
online, err := wdt.cluster.isOnline(wdt.NodeID)
if err != nil {
return false
}
return wdt.ctx != nil && online
}
func (wdt *watchDeltaChannelTask) msgType() commonpb.MsgType { func (wdt *watchDeltaChannelTask) msgType() commonpb.MsgType {
return wdt.Base.MsgType return wdt.Base.MsgType
} }
......
...@@ -520,7 +520,14 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) { ...@@ -520,7 +520,14 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) {
err = queryCoord.scheduler.Enqueue(releaseCollectionTask) err = queryCoord.scheduler.Enqueue(releaseCollectionTask)
assert.Nil(t, err) assert.Nil(t, err)
waitTaskFinalState(releaseCollectionTask, taskFailed) for {
if releaseCollectionTask.getState() == taskDone {
break
}
}
node.releaseCollection = returnSuccessResult
waitTaskFinalState(releaseCollectionTask, taskExpired)
node.stop() node.stop()
queryCoord.Stop() queryCoord.Stop()
......
...@@ -43,7 +43,6 @@ import ( ...@@ -43,7 +43,6 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
) )
// ---------- unittest util functions ---------- // ---------- unittest util functions ----------
...@@ -1296,10 +1295,7 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) { ...@@ -1296,10 +1295,7 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
return nil, err return nil, err
} }
node.etcdCli = etcdCli node.etcdCli = etcdCli
session := &sessionutil.Session{ node.initSession()
ServerID: 1,
}
node.session = session
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
node.etcdKV = etcdKV node.etcdKV = etcdKV
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"runtime/debug" "runtime/debug"
"strconv"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -32,6 +31,7 @@ import ( ...@@ -32,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/mqclient" "github.com/milvus-io/milvus/internal/util/mqclient"
) )
...@@ -158,7 +158,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error { ...@@ -158,7 +158,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
return err return err
} }
consumeChannels := []string{r.req.QueryChannel} consumeChannels := []string{r.req.QueryChannel}
consumeSubName := Params.QueryNodeCfg.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int()) consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
if Params.QueryNodeCfg.SkipQueryChannelRecovery { if Params.QueryNodeCfg.SkipQueryChannelRecovery {
log.Debug("Skip query channel seek back ", zap.Strings("channels", consumeChannels), log.Debug("Skip query channel seek back ", zap.Strings("channels", consumeChannels),
...@@ -310,12 +310,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { ...@@ -310,12 +310,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
} }
log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID)) log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID))
// get subscription name consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
getUniqueSubName := func() string {
prefixName := Params.QueryNodeCfg.MsgChannelSubName
return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
}
consumeSubName := getUniqueSubName()
// group channels by to seeking or consuming // group channels by to seeking or consuming
toSeekChannels := make([]*internalpb.MsgPosition, 0) toSeekChannels := make([]*internalpb.MsgPosition, 0)
...@@ -574,12 +569,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { ...@@ -574,12 +569,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
sCol.addVDeltaChannels(vDeltaChannels) sCol.addVDeltaChannels(vDeltaChannels)
sCol.addPDeltaChannels(pDeltaChannels) sCol.addPDeltaChannels(pDeltaChannels)
// get subscription name consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
getUniqueSubName := func() string {
prefixName := Params.QueryNodeCfg.MsgChannelSubName
return prefixName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
}
consumeSubName := getUniqueSubName()
// group channels by to seeking or consuming // group channels by to seeking or consuming
toSubChannels := make([]Channel, 0) toSubChannels := make([]Channel, 0)
......
...@@ -175,6 +175,11 @@ func CheckCtxValid(ctx context.Context) bool { ...@@ -175,6 +175,11 @@ func CheckCtxValid(ctx context.Context) bool {
return ctx.Err() != context.DeadlineExceeded && ctx.Err() != context.Canceled return ctx.Err() != context.DeadlineExceeded && ctx.Err() != context.Canceled
} }
// GenChannelSubName generate subName to watch channel
func GenChannelSubName(prefix string, collectionID int64, nodeID int64) string {
return fmt.Sprintf("%s-%d-%d", prefix, collectionID, nodeID)
}
// CheckPortAvailable check if a port is available to be listened on // CheckPortAvailable check if a port is available to be listened on
func CheckPortAvailable(port int) bool { func CheckPortAvailable(port int) bool {
addr := ":" + strconv.Itoa(port) addr := ":" + strconv.Itoa(port)
......
...@@ -1028,7 +1028,7 @@ func (p *queryNodeConfig) initMsgChannelSubName() { ...@@ -1028,7 +1028,7 @@ func (p *queryNodeConfig) initMsgChannelSubName() {
log.Warn(err.Error()) log.Warn(err.Error())
} }
s := []string{p.ClusterChannelPrefix, namePrefix, strconv.FormatInt(p.QueryNodeID, 10)} s := []string{p.ClusterChannelPrefix, namePrefix}
p.MsgChannelSubName = strings.Join(s, "-") p.MsgChannelSubName = strings.Join(s, "-")
} }
......
...@@ -230,7 +230,7 @@ func TestGlobalParamTable(t *testing.T) { ...@@ -230,7 +230,7 @@ func TestGlobalParamTable(t *testing.T) {
Params.QueryNodeID = 3 Params.QueryNodeID = 3
Params.initMsgChannelSubName() Params.initMsgChannelSubName()
name := Params.MsgChannelSubName name := Params.MsgChannelSubName
assert.Equal(t, name, "by-dev-queryNode-3") assert.Equal(t, name, "by-dev-queryNode")
name = Params.StatsChannelName name = Params.StatsChannelName
assert.Equal(t, name, "by-dev-query-node-stats") assert.Equal(t, name, "by-dev-query-node-stats")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册