未验证 提交 641afae2 编写于 作者: J Jiquan Long 提交者: GitHub

Watch channels with start positions (#18744)

Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
上级 9589bd38
......@@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/logutil"
......@@ -60,8 +61,9 @@ type ChannelManager struct {
}
type channel struct {
Name string
CollectionID UniqueID
Name string
CollectionID UniqueID
StartPositions []*commonpb.KeyDataPair
}
// ChannelManagerOpt is to set optional parameters in channel manager.
......@@ -434,7 +436,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
// fillChannelWatchInfo updates the channel op by filling in channel watch info.
func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
for _, ch := range op.Channels {
vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
vcInfo := c.h.GetVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: time.Now().Unix(),
......@@ -451,7 +453,7 @@ func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state data
startTs := time.Now().Unix()
timeoutTs := time.Now().Add(maxWatchDuration).UnixNano()
for _, ch := range op.Channels {
vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
vcInfo := c.h.GetVChanPositions(ch, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: startTs,
......
......@@ -116,7 +116,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID})
chManager.Watch(&channel{Name: channel1, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
......@@ -143,7 +143,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID})
chManager.Watch(&channel{Name: channel1, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchFailure)
......@@ -171,7 +171,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID})
chManager.Watch(&channel{Name: channel1, CollectionID: collectionID})
// simulating timeout behavior of startOne, cuz 20s is a long wait
e := &ackEvent{
......@@ -210,7 +210,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
{Name: channel1, CollectionID: collectionID},
}},
oldNode: {oldNode, []*channel{}},
},
......@@ -252,7 +252,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
{Name: channel1, CollectionID: collectionID},
}},
},
}
......@@ -295,7 +295,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
{Name: channel1, CollectionID: collectionID},
}},
oldNode: {oldNode, []*channel{}},
},
......@@ -340,7 +340,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
{Name: channel1, CollectionID: collectionID},
}},
},
}
......@@ -385,8 +385,8 @@ func TestChannelManager(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
{channel2, collectionID},
{Name: channel1, CollectionID: collectionID},
{Name: channel2, CollectionID: collectionID},
}},
},
}
......@@ -399,7 +399,7 @@ func TestChannelManager(t *testing.T) {
assert.False(t, chManager.Match(nodeToAdd, channel1))
assert.False(t, chManager.Match(nodeToAdd, channel2))
err = chManager.Watch(&channel{"channel-3", collectionID})
err = chManager.Watch(&channel{Name: "channel-3", CollectionID: collectionID})
assert.NoError(t, err)
assert.True(t, chManager.Match(nodeToAdd, "channel-3"))
......@@ -423,8 +423,8 @@ func TestChannelManager(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
bufferID: {bufferID, []*channel{
{channel1, collectionID},
{channel2, collectionID},
{Name: channel1, CollectionID: collectionID},
{Name: channel2, CollectionID: collectionID},
}},
},
}
......@@ -441,7 +441,7 @@ func TestChannelManager(t *testing.T) {
assert.True(t, chManager.Match(nodeID, channel1))
assert.True(t, chManager.Match(nodeID, channel2))
err = chManager.Watch(&channel{"channel-3", collectionID})
err = chManager.Watch(&channel{Name: "channel-3", CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
......@@ -460,13 +460,13 @@ func TestChannelManager(t *testing.T) {
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
err = chManager.Watch(&channel{bufferCh, collectionID})
err = chManager.Watch(&channel{Name: bufferCh, CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID)
chManager.store.Add(nodeID)
err = chManager.Watch(&channel{chanToAdd, collectionID})
err = chManager.Watch(&channel{Name: chanToAdd, CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID)
......@@ -486,7 +486,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}},
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
},
}
......@@ -518,7 +518,7 @@ func TestChannelManager(t *testing.T) {
// prepare tests
for _, test := range tests {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
ops := getOpsWithWatchInfo(test.nodeID, &channel{Name: test.chName, CollectionID: collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
......@@ -562,8 +562,8 @@ func TestChannelManager(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{
{"channel-1", collectionID},
{"channel-2", collectionID}}},
{Name: "channel-1", CollectionID: collectionID},
{Name: "channel-2", CollectionID: collectionID}}},
bufferID: {bufferID, []*channel{}},
},
}
......@@ -598,7 +598,7 @@ func TestChannelManager(t *testing.T) {
// prepare tests
for _, test := range tests {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
ops := getOpsWithWatchInfo(test.nodeID, &channel{Name: test.chName, CollectionID: collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
......@@ -652,7 +652,7 @@ func TestChannelManager(t *testing.T) {
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}},
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
},
}
ch = chManager.getChannelByNodeAndName(nodeID, channelName)
......@@ -683,7 +683,7 @@ func TestChannelManager(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ops := getReleaseOp(nodeID, &channel{channelName, collectionID})
ops := getReleaseOp(nodeID, &channel{Name: channelName, CollectionID: collectionID})
for _, op := range ops {
chs := chManager.fillChannelWatchInfoWithState(op, test.inState)
assert.Equal(t, 1, len(chs))
......@@ -708,7 +708,7 @@ func TestChannelManager(t *testing.T) {
require.NoError(t, err)
chManager.store.Add(nodeID)
opSet := getReleaseOp(nodeID, &channel{channelName, collectionID})
opSet := getReleaseOp(nodeID, &channel{Name: channelName, CollectionID: collectionID})
chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch)
chManager.stateTimer.removeTimers([]string{channelName})
......@@ -783,7 +783,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}}},
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}}},
}
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure, collectionID, channelName))
......@@ -805,7 +805,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}}},
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}}},
}
require.NoError(t, err)
......@@ -831,7 +831,7 @@ func TestChannelManager_Reload(t *testing.T) {
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}},
nodeID: {nodeID, []*channel{{Name: channelName, CollectionID: collectionID}}},
999: {999, []*channel{}},
},
}
......@@ -863,8 +863,8 @@ func TestChannelManager_Reload(t *testing.T) {
cm.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{{"channel1", 1}}},
2: {2, []*channel{{"channel2", 1}}},
1: {1, []*channel{{Name: "channel1", CollectionID: 1}}},
2: {2, []*channel{{Name: "channel2", CollectionID: 1}}},
},
}
......@@ -918,9 +918,9 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{
{"channel-1", collectionID},
{"channel-2", collectionID},
{"channel-3", collectionID}}}},
{Name: "channel-1", CollectionID: collectionID},
{Name: "channel-2", CollectionID: collectionID},
{Name: "channel-3", CollectionID: collectionID}}}},
}
var (
......@@ -962,7 +962,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
assert.True(t, chManager.Match(2, "channel-1"))
chManager.AddNode(3)
chManager.Watch(&channel{"channel-4", collectionID})
chManager.Watch(&channel{Name: "channel-4", CollectionID: collectionID})
key = path.Join(prefix, "3", "channel-4")
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
......@@ -1022,7 +1022,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
1: {
NodeID: 1,
Channels: []*channel{
{"ch1", 1},
{Name: "ch1", CollectionID: 1},
},
},
},
......
......@@ -83,7 +83,7 @@ func genNodeChannelInfos(id int64, num int) *NodeChannelInfo {
channels := make([]*channel, 0, num)
for i := 0; i < num; i++ {
name := fmt.Sprintf("ch%d", i)
channels = append(channels, &channel{name, 1})
channels = append(channels, &channel{Name: name, CollectionID: 1})
}
return &NodeChannelInfo{
NodeID: id,
......@@ -97,7 +97,7 @@ func genChannelOperations(from, to int64, num int) ChannelOpSet {
channelWatchInfos := make([]*datapb.ChannelWatchInfo, 0, num)
for i := 0; i < num; i++ {
name := fmt.Sprintf("ch%d", i)
channels = append(channels, &channel{name, 1})
channels = append(channels, &channel{Name: name, CollectionID: 1})
channelWatchInfos = append(channelWatchInfos, &datapb.ChannelWatchInfo{})
}
......
......@@ -100,7 +100,7 @@ func TestClusterCreate(t *testing.T) {
assert.Nil(t, err)
channels := channelManager.GetChannels()
assert.EqualValues(t, []*NodeChannelInfo{{1, []*channel{{"channel1", 1}}}}, channels)
assert.EqualValues(t, []*NodeChannelInfo{{1, []*channel{{Name: "channel1", CollectionID: 1}}}}, channels)
})
t.Run("remove all nodes and restart with other nodes", func(t *testing.T) {
......
......@@ -30,7 +30,7 @@ import (
// Handler handles some channel method for ChannelManager
type Handler interface {
// GetVChanPositions gets the information recovery needed of a channel
GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo
GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(channel string) bool
FinishDropChannel(channel string)
}
......@@ -46,13 +46,13 @@ func newServerHandler(s *Server) *ServerHandler {
}
// GetVChanPositions gets vchannel latest postitions with provided dml channel names
func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel
return s.InsertChannel == channel.Name
})
log.Info("GetSegmentsByChannel",
zap.Any("collectionID", collectionID),
zap.Any("collectionID", channel.CollectionID),
zap.Any("channel", channel),
zap.Any("numOfSegments", len(segments)),
)
......@@ -90,15 +90,20 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
}
// use collection start position when segment position is not found
if seekPosition == nil {
collection := h.GetCollection(h.s.ctx, collectionID)
if collection != nil {
seekPosition = getCollectionStartPosition(channel, collection)
if channel.StartPositions == nil {
collection := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil {
seekPosition = getCollectionStartPosition(channel.Name, collection)
}
} else {
// use passed start positions, skip to ask rootcoord.
seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
}
}
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
SeekPosition: seekPosition,
FlushedSegmentIds: flushedIds,
UnflushedSegmentIds: unflushedIds,
......@@ -107,7 +112,11 @@ func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID,
}
func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition {
for _, sp := range collectionInfo.GetStartPositions() {
return toMsgPosition(channel, collectionInfo.GetStartPositions())
}
func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
for _, sp := range startPositions {
if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
continue
}
......
......@@ -719,10 +719,10 @@ func newMockHandler() *mockHandler {
return &mockHandler{}
}
func (h *mockHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
func (h *mockHandler) GetVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
}
}
......
......@@ -41,7 +41,7 @@ func fillEmptyPosition(operations ChannelOpSet) {
func TestBufferChannelAssignPolicy(t *testing.T) {
kv := memkv.NewMemoryKV()
channels := []*channel{{"chan1", 1}}
channels := []*channel{{Name: "chan1", CollectionID: 1}}
store := &ChannelStore{
store: kv,
channelsInfo: map[int64]*NodeChannelInfo{bufferID: {bufferID, channels}},
......@@ -60,8 +60,8 @@ func TestConsistentHashRegisterPolicy(t *testing.T) {
t.Run("first register", func(t *testing.T) {
kv := memkv.NewMemoryKV()
channels := []*channel{
{"chan1", 1},
{"chan2", 2},
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 2},
}
store := &ChannelStore{
store: kv,
......@@ -82,8 +82,8 @@ func TestConsistentHashRegisterPolicy(t *testing.T) {
kv := memkv.NewMemoryKV()
channels := []*channel{
{"chan1", 1},
{"chan2", 2},
{Name: "chan1", CollectionID: 1},
{Name: "chan2", CollectionID: 2},
}
store := &ChannelStore{
......@@ -121,9 +121,9 @@ func TestAverageAssignPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{},
},
[]*channel{{"chan1", 1}},
[]*channel{{Name: "chan1", CollectionID: 1}},
},
[]*ChannelOp{{Add, bufferID, []*channel{{"chan1", 1}}, nil}},
[]*ChannelOp{{Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
},
{
"test watch same channel",
......@@ -131,10 +131,10 @@ func TestAverageAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
},
},
[]*channel{{"chan1", 1}},
[]*channel{{Name: "chan1", CollectionID: 1}},
},
nil,
},
......@@ -144,13 +144,13 @@ func TestAverageAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}, {"chan2", 1}}},
2: {2, []*channel{{"chan3", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{{Name: "chan3", CollectionID: 1}}},
},
},
[]*channel{{"chan4", 1}},
[]*channel{{Name: "chan4", CollectionID: 1}},
},
[]*ChannelOp{{Add, 2, []*channel{{"chan4", 1}}, nil}},
[]*ChannelOp{{Add, 2, []*channel{{Name: "chan4", CollectionID: 1}}, nil}},
},
}
for _, tt := range tests {
......@@ -180,9 +180,9 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{},
},
[]*channel{{"chan1", 1}},
[]*channel{{Name: "chan1", CollectionID: 1}},
},
[]*ChannelOp{{Add, bufferID, []*channel{{"chan1", 1}}, nil}},
[]*ChannelOp{{Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
},
{
"test watch same channel",
......@@ -191,10 +191,10 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}, {"chan2", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
},
},
[]*channel{{"chan1", 1}},
[]*channel{{Name: "chan1", CollectionID: 1}},
},
nil,
},
......@@ -206,9 +206,9 @@ func TestConsistentHashChannelAssignPolicy(t *testing.T) {
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{1: {1, nil}, 2: {2, nil}, 3: {3, nil}},
},
[]*channel{{"chan1", 1}, {"chan2", 1}, {"chan3", 1}},
[]*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}, {Name: "chan3", CollectionID: 1}},
},
[]*ChannelOp{{Add, 2, []*channel{{"chan1", 1}}, nil}, {Add, 1, []*channel{{"chan2", 1}}, nil}, {Add, 3, []*channel{{"chan3", 1}}, nil}},
[]*ChannelOp{{Add, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, 1, []*channel{{Name: "chan2", CollectionID: 1}}, nil}, {Add, 3, []*channel{{Name: "chan3", CollectionID: 1}}, nil}},
},
}
for _, tt := range tests {
......@@ -239,12 +239,12 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
},
},
1,
},
[]*ChannelOp{{Delete, 1, []*channel{{"chan1", 1}}, nil}, {Add, bufferID, []*channel{{"chan1", 1}}, nil}},
[]*ChannelOp{{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
},
{
"test rebalance channels after deregister",
......@@ -252,14 +252,14 @@ func TestAvgAssignUnregisteredChannels(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}}},
2: {2, []*channel{{"chan2", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
2: {2, []*channel{{Name: "chan2", CollectionID: 1}}},
3: {3, []*channel{}},
},
},
2,
},
[]*ChannelOp{{Delete, 2, []*channel{{"chan2", 1}}, nil}, {Add, 3, []*channel{{"chan2", 1}}, nil}},
[]*ChannelOp{{Delete, 2, []*channel{{Name: "chan2", CollectionID: 1}}, nil}, {Add, 3, []*channel{{Name: "chan2", CollectionID: 1}}, nil}},
},
}
for _, tt := range tests {
......@@ -288,12 +288,12 @@ func TestConsistentHashDeregisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
},
},
1,
},
[]*ChannelOp{{Delete, 1, []*channel{{"chan1", 1}}, nil}, {Add, bufferID, []*channel{{"chan1", 1}}, nil}},
[]*ChannelOp{{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, bufferID, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
},
{
"rebalance after deregister",
......@@ -302,14 +302,14 @@ func TestConsistentHashDeregisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan2", 1}}},
2: {2, []*channel{{"chan1", 1}}},
3: {3, []*channel{{"chan3", 1}}},
1: {1, []*channel{{Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{{Name: "chan1", CollectionID: 1}}},
3: {3, []*channel{{Name: "chan3", CollectionID: 1}}},
},
},
2,
},
[]*ChannelOp{{Delete, 2, []*channel{{"chan1", 1}}, nil}, {Add, 1, []*channel{{"chan1", 1}}, nil}},
[]*ChannelOp{{Delete, 2, []*channel{{Name: "chan1", CollectionID: 1}}, nil}, {Add, 1, []*channel{{Name: "chan1", CollectionID: 1}}, nil}},
},
}
for _, tt := range tests {
......@@ -337,10 +337,10 @@ func TestAverageReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}}},
},
},
[]*NodeChannelInfo{{1, []*channel{{"chan1", 1}}}},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
},
nil,
},
......@@ -350,13 +350,13 @@ func TestAverageReassignPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"chan1", 1}, {"chan2", 1}}},
1: {1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}},
2: {2, []*channel{}},
},
},
[]*NodeChannelInfo{{1, []*channel{{"chan1", 1}, {"chan2", 1}}}},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}},
},
[]*ChannelOp{{Delete, 1, []*channel{{"chan1", 1}, {"chan2", 1}}, nil}, {Add, 2, []*channel{{"chan1", 1}, {"chan2", 1}}, nil}},
[]*ChannelOp{{Delete, 1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil}, {Add, 2, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}, nil}},
},
}
for _, tt := range tests {
......@@ -401,17 +401,17 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) {
args{
getKv([]*watch{{1, "chan1", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Uncomplete}},
{1, "chan2", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Complete}}}),
[]*NodeChannelInfo{{1, []*channel{{"chan1", 1}, {"chan2", 1}}}},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}, {Name: "chan2", CollectionID: 1}}}},
ts.Add(maxWatchDuration),
},
[]*NodeChannelInfo{{1, []*channel{{"chan1", 1}}}},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
nil,
},
{
"test no expiration",
args{
getKv([]*watch{{1, "chan1", &datapb.ChannelWatchInfo{StartTs: ts.Unix(), State: datapb.ChannelWatchState_Uncomplete}}}),
[]*NodeChannelInfo{{1, []*channel{{"chan1", 1}}}},
[]*NodeChannelInfo{{1, []*channel{{Name: "chan1", CollectionID: 1}}}},
ts.Add(maxWatchDuration).Add(-time.Second),
},
[]*NodeChannelInfo{},
......@@ -455,7 +455,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
bufferID: {bufferID, []*channel{{"ch1", 1}}},
bufferID: {bufferID, []*channel{{Name: "ch1", CollectionID: 1}}},
},
},
1,
......@@ -464,12 +464,12 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
{
Type: Delete,
NodeID: bufferID,
Channels: []*channel{{"ch1", 1}},
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
{
Type: Add,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
},
},
......@@ -479,7 +479,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"ch1", 1}, {"ch2", 1}}},
1: {1, []*channel{{Name: "ch1", CollectionID: 1}, {Name: "ch2", CollectionID: 1}}},
},
},
3,
......@@ -488,7 +488,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
{
Type: Add,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
},
},
......@@ -498,8 +498,8 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"ch1", 1}}},
2: {2, []*channel{{"ch3", 1}}},
1: {1, []*channel{{Name: "ch1", CollectionID: 1}}},
2: {2, []*channel{{Name: "ch3", CollectionID: 1}}},
},
},
3,
......@@ -512,7 +512,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
&ChannelStore{
memkv.NewMemoryKV(),
map[int64]*NodeChannelInfo{
1: {1, []*channel{{"ch1", 1}, {"ch2", 1}, {"ch3", 1}}},
1: {1, []*channel{{Name: "ch1", CollectionID: 1}, {Name: "ch2", CollectionID: 1}, {Name: "ch3", CollectionID: 1}}},
2: {2, []*channel{}},
},
},
......@@ -522,7 +522,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
{
Type: Add,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
},
},
},
......
......@@ -30,6 +30,7 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/minio/minio-go/v7"
......@@ -1079,7 +1080,7 @@ func TestSaveBinlogPaths(t *testing.T) {
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
ctx := context.Background()
......@@ -1142,7 +1143,7 @@ func TestSaveBinlogPaths(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
s := &datapb.SegmentInfo{
ID: 1,
......@@ -1185,7 +1186,7 @@ func TestSaveBinlogPaths(t *testing.T) {
err = svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 1})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 1})
assert.Nil(t, err)
_, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{
......@@ -1248,7 +1249,7 @@ func TestDropVirtualChannel(t *testing.T) {
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
ctx := context.Background()
......@@ -1321,7 +1322,7 @@ func TestDropVirtualChannel(t *testing.T) {
<-spyCh
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
//resend
......@@ -1336,7 +1337,7 @@ func TestDropVirtualChannel(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{
......@@ -1661,13 +1662,13 @@ func TestGetVChannelPos(t *testing.T) {
assert.Nil(t, err)
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.handler.GetVChanPositions("chx1", 0, allPartitionID)
vchan := svr.handler.GetVChanPositions(&channel{Name: "chx1", CollectionID: 0}, allPartitionID)
assert.Empty(t, vchan.UnflushedSegmentIds)
assert.Empty(t, vchan.FlushedSegmentIds)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.handler.GetVChanPositions("ch1", 0, allPartitionID)
vchan := svr.handler.GetVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 1, vchan.FlushedSegmentIds[0])
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
......@@ -1675,7 +1676,7 @@ func TestGetVChannelPos(t *testing.T) {
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.handler.GetVChanPositions("ch0_suffix", 1, allPartitionID)
infos := svr.handler.GetVChanPositions(&channel{Name: "ch0_suffix", CollectionID: 1}, allPartitionID)
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 0, len(infos.UnflushedSegmentIds))
......@@ -1683,12 +1684,25 @@ func TestGetVChannelPos(t *testing.T) {
})
t.Run("filter partition", func(t *testing.T) {
infos := svr.handler.GetVChanPositions("ch1", 0, 1)
infos := svr.handler.GetVChanPositions(&channel{Name: "ch1", CollectionID: 0}, 1)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegmentIds))
assert.EqualValues(t, 1, len(infos.UnflushedSegmentIds))
assert.EqualValues(t, []byte{11, 12, 13}, infos.SeekPosition.MsgID)
})
t.Run("empty collection with passed positions", func(t *testing.T) {
vchannel := "ch_no_segment_1"
pchannel := funcutil.ToPhysicalChannel(vchannel)
infos := svr.handler.GetVChanPositions(&channel{
Name: vchannel,
CollectionID: 0,
StartPositions: []*commonpb.KeyDataPair{{Key: pchannel, Data: []byte{14, 15, 16}}},
}, allPartitionID)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, vchannel, infos.ChannelName)
assert.EqualValues(t, []byte{14, 15, 16}, infos.SeekPosition.MsgID)
})
}
func TestShouldDropChannel(t *testing.T) {
......@@ -1995,7 +2009,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err = svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
......@@ -2686,7 +2700,7 @@ func TestDataCoord_Import(t *testing.T) {
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{
......@@ -2706,7 +2720,7 @@ func TestDataCoord_Import(t *testing.T) {
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{
......@@ -2811,7 +2825,7 @@ func TestDataCoord_AddSegment(t *testing.T) {
err := svr.channelManager.AddNode(110)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 100})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 100})
assert.Nil(t, err)
status, err := svr.AddSegment(context.TODO(), &datapb.AddSegmentRequest{
......@@ -2831,7 +2845,7 @@ func TestDataCoord_AddSegment(t *testing.T) {
err := svr.channelManager.AddNode(110)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 100})
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 100})
assert.Nil(t, err)
status, err := svr.AddSegment(context.TODO(), &datapb.AddSegmentRequest{
......
......@@ -662,7 +662,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
channels := dresp.GetVirtualChannelNames()
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
for _, c := range channels {
channelInfo := s.handler.GetVChanPositions(c, collectionID, partitionID)
channelInfo := s.handler.GetVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID)
channelInfos = append(channelInfos, channelInfo)
log.Debug("datacoord append channelInfo in GetRecoveryInfo",
zap.Any("collectionID", collectionID),
......@@ -1010,8 +1010,9 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}
for _, channelName := range req.GetChannelNames() {
ch := &channel{
Name: channelName,
CollectionID: req.GetCollectionID(),
Name: channelName,
CollectionID: req.GetCollectionID(),
StartPositions: req.GetStartPositions(),
}
err := s.channelManager.Watch(ch)
if err != nil {
......
......@@ -177,7 +177,7 @@ func TestGrpcService(t *testing.T) {
}, nil
}
core.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) error {
core.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string, startPositions []*commonpb.KeyDataPair) error {
return nil
}
......
......@@ -429,6 +429,7 @@ message SegmentFieldBinlogMeta {
message WatchChannelsRequest {
int64 collectionID = 1;
repeated string channelNames = 2;
repeated common.KeyDataPair start_positions = 3;
}
message WatchChannelsResponse {
......
......@@ -149,7 +149,7 @@ type Core struct {
// Communicates with queryCoord service for segments info.
CallGetSegmentInfoService func(ctx context.Context, collectionID int64, segIDs []int64) (*querypb.GetSegmentInfoResponse, error)
CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string) error
CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string, startPositions []*commonpb.KeyDataPair) error
//assign import task to data service
CallImportService func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse
......@@ -724,7 +724,7 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
return resp.Binlogs, nil
}
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) (retErr error) {
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string, startPositions []*commonpb.KeyDataPair) (retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("watch channels panic, msg = %v", err)
......@@ -732,8 +732,9 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
}()
<-initCh
req := &datapb.WatchChannelsRequest{
CollectionID: collectionID,
ChannelNames: channelNames,
CollectionID: collectionID,
ChannelNames: channelNames,
StartPositions: startPositions,
}
rsp, err := s.WatchChannels(ctx, req)
if err != nil {
......
......@@ -3077,7 +3077,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.Error(t, err)
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) error {
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string, startPositions []*commonpb.KeyDataPair) error {
return nil
}
err = c.checkInit()
......
......@@ -270,7 +270,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return err
}
if err = t.core.CallWatchChannels(ctx, collID, vchanNames); err != nil {
if err = t.core.CallWatchChannels(ctx, collID, vchanNames, collInfo.StartPositions); err != nil {
return err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册