diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 1795042714c250b7f21f69b7e5c16d75e282ef30..fbeb2947bb178bcb992782e1acd086491ca58ea8 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -156,7 +156,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro vchanInfo.GetSeekPosition(), ) var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) - var insertBufferNode Node = newInsertBufferNode( + var insertBufferNode Node + insertBufferNode, err = newInsertBufferNode( dsService.ctx, dsService.replica, dsService.msFactory, @@ -165,6 +166,9 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro saveBinlog, vchanInfo.GetChannelName(), ) + if err != nil { + return err + } var deleteNode Node = newDeleteDNode( dsService.ctx, diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 9658fa22742819f7c1495027d1df02c2927996c0..b52d7ac424dbc3a487d2e6e91ed8e31d41e99f74 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -672,12 +672,12 @@ func flushSegment( // write insert binlog for _, blob := range binLogs { fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) - log.Debug("save binlog", zap.Int64("fieldID", fieldID)) if err != nil { log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) clearFn(false) return } + log.Debug("save binlog", zap.Int64("fieldID", fieldID)) logidx, err := idAllocator.allocID() if err != nil { @@ -800,7 +800,7 @@ func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timest collID := ibNode.replica.getCollectionID() sch, err := ibNode.replica.getCollectionSchema(collID, ts) if err != nil { - return + return nil, err } meta = &etcdpb.CollectionMeta{ @@ -822,7 +822,7 @@ func newInsertBufferNode( flushCh <-chan *flushMsg, saveBinlog func(*segmentFlushUnit) error, channelName string, -) *insertBufferNode { +) (*insertBufferNode, error) { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -849,20 +849,26 @@ func newInsertBufferNode( minIOKV, err := miniokv.NewMinIOKV(ctx, option) if err != nil { - panic(err) + return nil, err } //input stream, data node time tick - wTt, _ := factory.NewMsgStream(ctx) + wTt, err := factory.NewMsgStream(ctx) + if err != nil { + return nil, err + } wTt.AsProducer([]string{Params.TimeTickChannelName}) - log.Debug("datanode AsProducer: " + Params.TimeTickChannelName) + log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.TimeTickChannelName)) var wTtMsgStream msgstream.MsgStream = wTt wTtMsgStream.Start() // update statistics channel - segS, _ := factory.NewMsgStream(ctx) + segS, err := factory.NewMsgStream(ctx) + if err != nil { + return nil, err + } segS.AsProducer([]string{Params.SegmentStatisticsChannelName}) - log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName) + log.Debug("datanode AsProducer", zap.String("SegmentStatisChannelName", Params.SegmentStatisticsChannelName)) var segStatisticsMsgStream msgstream.MsgStream = segS segStatisticsMsgStream.Start() @@ -881,5 +887,5 @@ func newInsertBufferNode( idAllocator: idAllocator, dsSaveBinlog: saveBinlog, segmentCheckPoints: make(map[UniqueID]segmentCheckPoint), - } + }, nil } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 90e48c5acb51ea6ad2b7654ad9661d12bd74ec08..4c03bbddeb533a32ada18ac5aa63eed72cfcf284 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -13,6 +13,7 @@ package datanode import ( "context" + "errors" "fmt" "math" "path" @@ -36,6 +37,78 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) +// CDFMsFactory count down fails msg factory +type CDFMsFactory struct { + msgstream.Factory + cd int +} + +func (f *CDFMsFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { + f.cd-- + if f.cd < 0 { + return nil, errors.New("fail") + } + return f.Factory.NewMsgStream(ctx) +} + +func TestFLowGraphInsertBufferNodeCreate(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-create" + + testPath := "/test/datanode/root/meta" + err := clearEtcd(testPath) + require.NoError(t, err) + Params.MetaRootPath = testPath + + Factory := &MetaFactory{} + collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") + mockRootCoord := &RootCoordFactory{} + + replica := newReplica(mockRootCoord, collMeta.ID) + + err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) + require.NoError(t, err) + + msFactory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + saveBinlog := func(fu *segmentFlushUnit) error { + t.Log(fu) + return nil + } + + flushChan := make(chan *flushMsg, 100) + iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + assert.NotNil(t, iBNode) + require.NoError(t, err) + + ctxDone, cancel := context.WithCancel(ctx) + cancel() // cancel now to make context done + _, err = newInsertBufferNode(ctxDone, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + assert.Error(t, err) + + cdf := &CDFMsFactory{ + Factory: msFactory, + cd: 0, + } + + _, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string") + assert.Error(t, err) + cdf = &CDFMsFactory{ + Factory: msFactory, + cd: 1, + } + _, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string") + assert.Error(t, err) +} + func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -70,7 +143,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + require.NoError(t, err) dmlFlushedCh := make(chan []*datapb.FieldBinlog, 1) @@ -175,7 +249,8 @@ func TestFlushSegment(t *testing.T) { saveBinlog := func(*segmentFlushUnit) error { return nil } - ibNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + ibNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + require.NoError(t, err) flushSegment(collMeta, segmentID, @@ -289,7 +364,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { } flushChan := make(chan *flushMsg, 100) - iBNode := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + iBNode, err := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + require.NoError(t, err) // Auto flush number of rows set to 2 diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index 22eac52d527c47677795b38cfdd0728bf0db283b..c6737be46c6bb794c1cec2a3719fe324db0fab63 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -55,14 +55,14 @@ func (mService *metaService) getCollectionSchema(ctx context.Context, collID Uni } response, err := mService.rootCoord.DescribeCollection(ctx, req) - if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return nil, fmt.Errorf("Describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason()) - } - if err != nil { return nil, fmt.Errorf("Grpc error when describe collection %v from rootcoord: %s", collID, err.Error()) } + if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + return nil, fmt.Errorf("Describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason()) + } + return response.GetSchema(), nil } diff --git a/internal/datanode/meta_service_test.go b/internal/datanode/meta_service_test.go index 939c16aacae6f3288f30abe201dd36cc2852e67c..3246fff36aa830ac56d60d52f477a2b0845e7d07 100644 --- a/internal/datanode/meta_service_test.go +++ b/internal/datanode/meta_service_test.go @@ -13,8 +13,11 @@ package datanode import ( "context" + "errors" "testing" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/stretchr/testify/assert" ) @@ -48,3 +51,48 @@ func TestMetaService_All(t *testing.T) { printCollectionStruct(collectionMeta) }) } + +//RootCoordFails1 root coord mock for failure +type RootCoordFails1 struct { + RootCoordFactory +} + +// DescribeCollection override method that will fails +func (rc *RootCoordFails1) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + return nil, errors.New("always fail") +} + +//RootCoordFails2 root coord mock for failure +type RootCoordFails2 struct { + RootCoordFactory +} + +// DescribeCollection override method that will fails +func (rc *RootCoordFails2) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + return &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, + }, nil +} + +func TestMetaServiceRootCoodFails(t *testing.T) { + + t.Run("Test Describe with error", func(t *testing.T) { + rc := &RootCoordFails1{} + rc.setCollectionID(collectionID0) + rc.setCollectionName(collectionName0) + + ms := newMetaService(rc, collectionID0) + _, err := ms.getCollectionSchema(context.Background(), collectionID1, 0) + assert.NotNil(t, err) + }) + + t.Run("Test Describe wit nil response", func(t *testing.T) { + rc := &RootCoordFails2{} + rc.setCollectionID(collectionID0) + rc.setCollectionName(collectionName0) + + ms := newMetaService(rc, collectionID0) + _, err := ms.getCollectionSchema(context.Background(), collectionID1, 0) + assert.NotNil(t, err) + }) +}