flow_graph_insert_buffer_node_test.go 2.4 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10
package datanode

import (
	"context"
	"math"
	"testing"
	"time"

	"github.com/stretchr/testify/require"

G
groot 已提交
11
	"github.com/stretchr/testify/assert"
X
XuanYang-cn 已提交
12
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
G
groot 已提交
13
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
X
XuanYang-cn 已提交
14
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
X
XuanYang-cn 已提交
15 16 17
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)

18
func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
X
XuanYang-cn 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
	const ctxTimeInMillisecond = 2000
	const closeWithDeadline = false
	var ctx context.Context

	if closeWithDeadline {
		var cancel context.CancelFunc
		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
		ctx, cancel = context.WithDeadline(context.Background(), d)
		defer cancel()
	} else {
		ctx = context.Background()
	}

	testPath := "/test/datanode/root/meta"
	err := clearEtcd(testPath)
	require.NoError(t, err)
	Params.MetaRootPath = testPath

X
XuanYang-cn 已提交
37
	Factory := &MetaFactory{}
X
XuanYang-cn 已提交
38
	collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
X
XuanYang-cn 已提交
39 40

	replica := newReplica()
41
	err = replica.addCollection(collMeta.ID, collMeta.Schema)
X
XuanYang-cn 已提交
42
	require.NoError(t, err)
X
XuanYang-cn 已提交
43 44
	err = replica.addSegment(1, collMeta.ID, 0, Params.InsertChannelNames[0])
	require.NoError(t, err)
X
XuanYang-cn 已提交
45

G
groot 已提交
46 47 48 49 50 51 52 53
	msFactory := pulsarms.NewFactory()
	m := map[string]interface{}{
		"receiveBufSize": 1024,
		"pulsarAddress":  Params.PulsarAddress,
		"pulsarBufSize":  1024}
	err = msFactory.SetParams(m)
	assert.Nil(t, err)

54
	iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory)
X
XuanYang-cn 已提交
55 56
	inMsg := genInsertMsg()
	var iMsg flowgraph.Msg = &inMsg
57
	iBNode.Operate([]flowgraph.Msg{iMsg})
X
XuanYang-cn 已提交
58 59 60 61 62 63 64 65 66
}

func genInsertMsg() insertMsg {

	timeRange := TimeRange{
		timestampMin: 0,
		timestampMax: math.MaxUint64,
	}

X
XuanYang-cn 已提交
67 68
	startPos := []*internalpb.MsgPosition{
		{
X
XuanYang-cn 已提交
69
			ChannelName: Params.InsertChannelNames[0],
X
xige-16 已提交
70
			MsgID:       make([]byte, 0),
X
XuanYang-cn 已提交
71 72 73 74
			Timestamp:   0,
		},
	}

X
XuanYang-cn 已提交
75 76
	var iMsg = &insertMsg{
		insertMessages: make([]*msgstream.InsertMsg, 0),
X
XuanYang-cn 已提交
77
		flushMessages:  make([]*flushMsg, 0),
X
XuanYang-cn 已提交
78 79 80 81
		timeRange: TimeRange{
			timestampMin: timeRange.timestampMin,
			timestampMax: timeRange.timestampMax,
		},
X
XuanYang-cn 已提交
82 83
		startPositions: startPos,
		endPositions:   startPos,
X
XuanYang-cn 已提交
84 85
	}

86 87
	dataFactory := NewDataFactory()
	iMsg.insertMessages = append(iMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(2)...)
X
XuanYang-cn 已提交
88

X
XuanYang-cn 已提交
89 90
	fmsg := &flushMsg{
		msgID:        1,
X
XuanYang-cn 已提交
91
		timestamp:    2000,
X
XuanYang-cn 已提交
92 93
		segmentIDs:   []UniqueID{1},
		collectionID: UniqueID(1),
X
XuanYang-cn 已提交
94
	}
X
XuanYang-cn 已提交
95 96

	iMsg.flushMessages = append(iMsg.flushMessages, fmsg)
X
XuanYang-cn 已提交
97 98 99
	return *iMsg

}