未验证 提交 65f168ea 编写于 作者: C congqixia 提交者: GitHub

Fix datanode corner cases (#7336)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 599fb0b7
......@@ -156,7 +156,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
vchanInfo.GetSeekPosition(),
)
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
var insertBufferNode Node = newInsertBufferNode(
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.replica,
dsService.msFactory,
......@@ -165,6 +166,9 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
saveBinlog,
vchanInfo.GetChannelName(),
)
if err != nil {
return err
}
var deleteNode Node = newDeleteDNode(
dsService.ctx,
......
......@@ -672,12 +672,12 @@ func flushSegment(
// write insert binlog
for _, blob := range binLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
log.Debug("save binlog", zap.Int64("fieldID", fieldID))
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
clearFn(false)
return
}
log.Debug("save binlog", zap.Int64("fieldID", fieldID))
logidx, err := idAllocator.allocID()
if err != nil {
......@@ -800,7 +800,7 @@ func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timest
collID := ibNode.replica.getCollectionID()
sch, err := ibNode.replica.getCollectionSchema(collID, ts)
if err != nil {
return
return nil, err
}
meta = &etcdpb.CollectionMeta{
......@@ -822,7 +822,7 @@ func newInsertBufferNode(
flushCh <-chan *flushMsg,
saveBinlog func(*segmentFlushUnit) error,
channelName string,
) *insertBufferNode {
) (*insertBufferNode, error) {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......@@ -849,20 +849,26 @@ func newInsertBufferNode(
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
panic(err)
return nil, err
}
//input stream, data node time tick
wTt, _ := factory.NewMsgStream(ctx)
wTt, err := factory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
wTt.AsProducer([]string{Params.TimeTickChannelName})
log.Debug("datanode AsProducer: " + Params.TimeTickChannelName)
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.TimeTickChannelName))
var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start()
// update statistics channel
segS, _ := factory.NewMsgStream(ctx)
segS, err := factory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName)
log.Debug("datanode AsProducer", zap.String("SegmentStatisChannelName", Params.SegmentStatisticsChannelName))
var segStatisticsMsgStream msgstream.MsgStream = segS
segStatisticsMsgStream.Start()
......@@ -881,5 +887,5 @@ func newInsertBufferNode(
idAllocator: idAllocator,
dsSaveBinlog: saveBinlog,
segmentCheckPoints: make(map[UniqueID]segmentCheckPoint),
}
}, nil
}
......@@ -13,6 +13,7 @@ package datanode
import (
"context"
"errors"
"fmt"
"math"
"path"
......@@ -36,6 +37,78 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
// CDFMsFactory count down fails msg factory
type CDFMsFactory struct {
msgstream.Factory
cd int
}
func (f *CDFMsFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
f.cd--
if f.cd < 0 {
return nil, errors.New("fail")
}
return f.Factory.NewMsgStream(ctx)
}
func TestFLowGraphInsertBufferNodeCreate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-create"
testPath := "/test/datanode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
mockRootCoord := &RootCoordFactory{}
replica := newReplica(mockRootCoord, collMeta.ID)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
return nil
}
flushChan := make(chan *flushMsg, 100)
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
assert.NotNil(t, iBNode)
require.NoError(t, err)
ctxDone, cancel := context.WithCancel(ctx)
cancel() // cancel now to make context done
_, err = newInsertBufferNode(ctxDone, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
assert.Error(t, err)
cdf := &CDFMsFactory{
Factory: msFactory,
cd: 0,
}
_, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string")
assert.Error(t, err)
cdf = &CDFMsFactory{
Factory: msFactory,
cd: 1,
}
_, err = newInsertBufferNode(ctx, replica, cdf, NewAllocatorFactory(), flushChan, saveBinlog, "string")
assert.Error(t, err)
}
func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
......@@ -70,7 +143,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
require.NoError(t, err)
dmlFlushedCh := make(chan []*datapb.FieldBinlog, 1)
......@@ -175,7 +249,8 @@ func TestFlushSegment(t *testing.T) {
saveBinlog := func(*segmentFlushUnit) error {
return nil
}
ibNode := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
ibNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
require.NoError(t, err)
flushSegment(collMeta,
segmentID,
......@@ -289,7 +364,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
}
flushChan := make(chan *flushMsg, 100)
iBNode := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
iBNode, err := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
require.NoError(t, err)
// Auto flush number of rows set to 2
......
......@@ -55,14 +55,14 @@ func (mService *metaService) getCollectionSchema(ctx context.Context, collID Uni
}
response, err := mService.rootCoord.DescribeCollection(ctx, req)
if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("Describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason())
}
if err != nil {
return nil, fmt.Errorf("Grpc error when describe collection %v from rootcoord: %s", collID, err.Error())
}
if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("Describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason())
}
return response.GetSchema(), nil
}
......
......@@ -13,8 +13,11 @@ package datanode
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
......@@ -48,3 +51,48 @@ func TestMetaService_All(t *testing.T) {
printCollectionStruct(collectionMeta)
})
}
//RootCoordFails1 root coord mock for failure
type RootCoordFails1 struct {
RootCoordFactory
}
// DescribeCollection override method that will fails
func (rc *RootCoordFails1) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return nil, errors.New("always fail")
}
//RootCoordFails2 root coord mock for failure
type RootCoordFails2 struct {
RootCoordFactory
}
// DescribeCollection override method that will fails
func (rc *RootCoordFails2) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
}, nil
}
func TestMetaServiceRootCoodFails(t *testing.T) {
t.Run("Test Describe with error", func(t *testing.T) {
rc := &RootCoordFails1{}
rc.setCollectionID(collectionID0)
rc.setCollectionName(collectionName0)
ms := newMetaService(rc, collectionID0)
_, err := ms.getCollectionSchema(context.Background(), collectionID1, 0)
assert.NotNil(t, err)
})
t.Run("Test Describe wit nil response", func(t *testing.T) {
rc := &RootCoordFails2{}
rc.setCollectionID(collectionID0)
rc.setCollectionName(collectionName0)
ms := newMetaService(rc, collectionID0)
_, err := ms.getCollectionSchema(context.Background(), collectionID1, 0)
assert.NotNil(t, err)
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册