未验证 提交 72c5e2a4 编写于 作者: X Xiaofan 提交者: GitHub

Fix channel reassigned to other datanodes (#25015)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 e660cc3f
......@@ -285,7 +285,7 @@ indexNode:
dataCoord:
channel:
watchTimeoutInterval: 120 # Timeout on watching channels (in seconds). Datanode tickler update watch progress will reset timeout timer.
watchTimeoutInterval: 300 # Timeout on watching channels (in seconds). Datanode tickler update watch progress will reset timeout timer.
balanceSilentDuration: 300 # The duration before the channelBalancer on datacoord to run
balanceInterval: 360 #The interval for the channelBalancer on datacoord to check balance status
segment:
......
......@@ -506,6 +506,7 @@ func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo {
}
// Match checks and returns whether the node ID and channel match.
// use vchannel
func (c *ChannelManager) Match(nodeID int64, channel string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
......
......@@ -642,6 +642,11 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
// if lag behind, log every 1 mins about
log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
}
// ignore report from a different node
if !s.cluster.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) {
log.Warn("node is not matched with channel", zap.String("channel", ch), zap.Int64("nodeID", ttMsg.GetBase().GetSourceID()))
return nil
}
sub := tsoutil.SubByNow(ts)
pChannelName := funcutil.ToPhysicalChannel(ch)
......
......@@ -1551,6 +1551,7 @@ func TestSaveBinlogPaths(t *testing.T) {
resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{
SegmentID: 1,
Channel: "test",
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetErrorCode())
......
......@@ -423,8 +423,22 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
zap.Any("startPositions", req.GetStartPositions()),
zap.Any("checkpoints", req.GetCheckPoints()))
// validate
nodeID := req.GetBase().GetSourceID()
// virtual channel name
channelName := req.Channel
// for compatibility issue , if len(channelName) not exist, skip the check
// No need to check import channel--node matching in data import case.
// Also avoid to handle segment not found error if not the owner of shard
if !req.GetImporting() && len(channelName) != 0 {
if !s.channelManager.Match(nodeID, channelName) {
failResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channelName, nodeID))
resp.ErrorCode = commonpb.ErrorCode_MetaFailed
log.Warn("node is not matched with channel", zap.String("channel", channelName))
return resp, nil
}
}
// validate
segmentID := req.GetSegmentID()
segment := s.meta.GetSegment(segmentID)
......@@ -444,17 +458,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
// No need to check import channel--node matching in data import case.
if !req.GetImporting() {
channel := segment.GetInsertChannel()
if !s.channelManager.Match(nodeID, channel) {
failResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID))
resp.ErrorCode = commonpb.ErrorCode_MetaFailed
log.Warn("node is not matched with channel", zap.String("channel", channel))
return resp, nil
}
}
if req.GetDropped() {
s.segmentManager.DropSegment(ctx, segment.GetID())
}
......@@ -1447,6 +1450,15 @@ func (s *Server) handleRPCTimetickMessage(ctx context.Context, ttMsg *msgpb.Data
ch := ttMsg.GetChannelName()
ts := ttMsg.GetTimestamp()
// ignore to handle RPC Timetick message since it's no longer the leader
if !s.cluster.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) {
log.Warn("node is not matched with channel",
zap.String("channel", ch),
zap.Int64("nodeID", ttMsg.GetBase().GetSourceID()),
)
return nil
}
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())
if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
......
......@@ -909,6 +909,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
StartPositions: startPos,
Flushed: pack.flushed,
Dropped: pack.dropped,
Channel: dsService.vchannelName,
}
err := retry.Do(context.Background(), func() error {
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
......
......@@ -298,6 +298,7 @@ message SaveBinlogPathsRequest {
repeated FieldBinlog deltalogs = 9;
bool dropped = 10;
bool importing = 11;
string channel = 12; // report channel name for verification
}
message CheckPoint {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册