diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index a95ec3d6c67606099cf4f61108cac5f0807de3ad..91b607e78022c2a0169679d54061bb30afc47c25 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -853,6 +853,27 @@ func TestSaveBinlogPaths(t *testing.T) { assert.EqualValues(t, segmentInfo.NumOfRows, 10) }) + t.Run("with channel not matched", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + err := svr.channelManager.AddNode(0) + require.Nil(t, err) + err = svr.channelManager.Watch(&channel{"ch1", 0}) + require.Nil(t, err) + s := &datapb.SegmentInfo{ + ID: 1, + InsertChannel: "ch2", + State: commonpb.SegmentState_Growing, + } + svr.meta.AddSegment(NewSegmentInfo(s)) + + resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{ + SegmentID: 1, + }) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetErrorCode()) + }) + t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr) @@ -1038,7 +1059,7 @@ func TestDropVirtualChannel(t *testing.T) { ChannelName: "ch2", }) assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetStatus().GetErrorCode()) }) t.Run("with closed server", func(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index fb0a032a7a18f1b058cf4fe262750aadee06092d..9cb0d2d560a983bada3461ea2a33ca65cfce593f 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -335,6 +335,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath channel := segment.GetInsertChannel() if !s.channelManager.Match(nodeID, channel) { FailResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID)) + resp.ErrorCode = commonpb.ErrorCode_MetaFailed log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID)) return resp, nil } @@ -412,6 +413,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual nodeID := req.GetBase().GetSourceID() if !s.channelManager.Match(nodeID, channel) { FailResponse(resp.Status, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID)) + resp.Status.ErrorCode = commonpb.ErrorCode_MetaFailed log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID)) return resp, nil } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 68bfcbf1fc775a561949511e54d525f138ed4af6..46095c64f2e0f88beef574c2b968982a8e0a7424 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -686,7 +686,13 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl return fmt.Errorf(err.Error()) } - // TODO should retry only when datacoord status is unhealthy + // meta error, datanode handles a virtual channel does not belong here + if rsp.GetStatus().GetErrorCode() == commonpb.ErrorCode_MetaFailed { + log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName)) + return nil + } + + // retry for other error if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return fmt.Errorf("data service DropVirtualChannel failed, reason = %s", rsp.GetStatus().GetReason()) } @@ -772,7 +778,12 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet return fmt.Errorf(err.Error()) } - // TODO should retry only when datacoord status is unhealthy + // meta error, datanode handles a virtual channel does not belong here + if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed { + log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName)) + return nil + } + if rsp.ErrorCode != commonpb.ErrorCode_Success { return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason) } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 5459d451c0ca2451c83821995488f8b111215f38..ef8eeaa4bc7904805e5cfa79f239b9eab465d067 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -555,12 +556,21 @@ func TestFlushNotifyFunc(t *testing.T) { }) t.Run("datacoord Save fails", func(t *testing.T) { - dataCoord.SaveBinlogPathNotSuccess = true + dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_UnexpectedError assert.Panics(t, func() { notifyFunc(&segmentFlushPack{}) }) }) + // issue https://github.com/milvus-io/milvus/issues/17097 + // meta error, datanode shall not panic, just drop the virtual channel + t.Run("datacoord found meta error", func(t *testing.T) { + dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_MetaFailed + assert.NotPanics(t, func() { + notifyFunc(&segmentFlushPack{}) + }) + }) + t.Run("datacoord call error", func(t *testing.T) { dataCoord.SaveBinlogPathError = true assert.Panics(t, func() { @@ -623,7 +633,7 @@ func TestDropVirtualChannelFunc(t *testing.T) { }) }) t.Run("datacoord drop fails", func(t *testing.T) { - dataCoord.DropVirtualChannelNotSuccess = true + dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_UnexpectedError assert.Panics(t, func() { dropFunc(nil) }) @@ -631,11 +641,21 @@ func TestDropVirtualChannelFunc(t *testing.T) { t.Run("datacoord call error", func(t *testing.T) { - dataCoord.DropVirtualChannelNotSuccess = false + dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_UnexpectedError dataCoord.DropVirtualChannelError = true assert.Panics(t, func() { dropFunc(nil) }) }) + // issue https://github.com/milvus-io/milvus/issues/17097 + // meta error, datanode shall not panic, just drop the virtual channel + t.Run("datacoord found meta error", func(t *testing.T) { + dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_MetaFailed + dataCoord.DropVirtualChannelError = false + assert.NotPanics(t, func() { + dropFunc(nil) + }) + }) + } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index ec3e1fe6a773d870bcbac4647477577a29f26d4f..2f4820a28b9af5d41a40179d51fc5bea794f06c4 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -164,14 +164,14 @@ type RootCoordFactory struct { type DataCoordFactory struct { types.DataCoord - SaveBinlogPathError bool - SaveBinlogPathNotSuccess bool + SaveBinlogPathError bool + SaveBinlogPathStatus commonpb.ErrorCode CompleteCompactionError bool CompleteCompactionNotSuccess bool - DropVirtualChannelError bool - DropVirtualChannelNotSuccess bool + DropVirtualChannelError bool + DropVirtualChannelStatus commonpb.ErrorCode } func (ds *DataCoordFactory) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { @@ -202,27 +202,16 @@ func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.Sav if ds.SaveBinlogPathError { return nil, errors.New("Error") } - if ds.SaveBinlogPathNotSuccess { - return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil - } - - return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil + return &commonpb.Status{ErrorCode: ds.SaveBinlogPathStatus}, nil } func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { if ds.DropVirtualChannelError { return nil, errors.New("error") } - if ds.DropVirtualChannelNotSuccess { - return &datapb.DropVirtualChannelResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } return &datapb.DropVirtualChannelResponse{ Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, + ErrorCode: ds.DropVirtualChannelStatus, }, }, nil }