flow_graph_dd_node_test.go 4.2 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4
package datanode

import (
	"context"
S
sunby 已提交
5
	"log"
X
XuanYang-cn 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18
	"testing"
	"time"

	"github.com/stretchr/testify/require"

	"github.com/zilliztech/milvus-distributed/internal/msgstream"
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)

func TestFlowGraphDDNode_Operate(t *testing.T) {
	const ctxTimeInMillisecond = 2000
X
XuanYang-cn 已提交
19
	const closeWithDeadline = true
X
XuanYang-cn 已提交
20 21 22 23 24 25 26 27 28 29 30
	var ctx context.Context

	if closeWithDeadline {
		var cancel context.CancelFunc
		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
		ctx, cancel = context.WithDeadline(context.Background(), d)
		defer cancel()
	} else {
		ctx = context.Background()
	}

X
XuanYang-cn 已提交
31 32
	inFlushCh := make(chan *flushMsg, 10)
	defer close(inFlushCh)
X
XuanYang-cn 已提交
33 34 35 36 37 38

	testPath := "/test/datanode/root/meta"
	err := clearEtcd(testPath)
	require.NoError(t, err)
	Params.MetaRootPath = testPath

S
sunby 已提交
39
	// Params.FlushDdBufferSize = 4
X
XuanYang-cn 已提交
40
	replica := newReplica()
S
sunby 已提交
41 42 43
	allocatorMock := NewAllocatorFactory()
	ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, allocatorMock)
	log.Print()
X
XuanYang-cn 已提交
44

S
sunby 已提交
45 46
	collID := UniqueID(0)
	collName := "col-test-0"
X
XuanYang-cn 已提交
47
	// create collection
S
sunby 已提交
48
	createCollReq := internalpb2.CreateCollectionRequest{
X
XuanYang-cn 已提交
49 50 51 52 53 54
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kCreateCollection,
			MsgID:     1,
			Timestamp: 1,
			SourceID:  1,
		},
S
sunby 已提交
55 56 57 58 59
		CollectionID:   collID,
		Schema:         make([]byte, 0),
		CollectionName: collName,
		DbName:         "DbName",
		DbID:           UniqueID(0),
X
XuanYang-cn 已提交
60
	}
S
sunby 已提交
61
	createCollMsg := msgstream.CreateCollectionMsg{
X
XuanYang-cn 已提交
62 63 64 65 66
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: Timestamp(1),
			EndTimestamp:   Timestamp(1),
			HashValues:     []uint32{uint32(0)},
		},
S
sunby 已提交
67
		CreateCollectionRequest: createCollReq,
X
XuanYang-cn 已提交
68 69 70
	}

	// drop collection
S
sunby 已提交
71
	dropCollReq := internalpb2.DropCollectionRequest{
X
XuanYang-cn 已提交
72 73 74 75 76 77
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kDropCollection,
			MsgID:     2,
			Timestamp: 2,
			SourceID:  2,
		},
S
sunby 已提交
78 79 80 81
		CollectionID:   collID,
		CollectionName: collName,
		DbName:         "DbName",
		DbID:           UniqueID(0),
X
XuanYang-cn 已提交
82
	}
S
sunby 已提交
83
	dropCollMsg := msgstream.DropCollectionMsg{
X
XuanYang-cn 已提交
84 85 86 87 88
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: Timestamp(2),
			EndTimestamp:   Timestamp(2),
			HashValues:     []uint32{uint32(0)},
		},
S
sunby 已提交
89
		DropCollectionRequest: dropCollReq,
X
XuanYang-cn 已提交
90 91 92
	}

	partitionID := UniqueID(100)
X
XuanYang-cn 已提交
93
	partitionName := "partition-test-0"
X
XuanYang-cn 已提交
94 95 96 97 98 99 100 101
	// create partition
	createPartitionReq := internalpb2.CreatePartitionRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kCreatePartition,
			MsgID:     3,
			Timestamp: 3,
			SourceID:  3,
		},
S
sunby 已提交
102
		CollectionID:   collID,
X
XuanYang-cn 已提交
103
		PartitionID:    partitionID,
S
sunby 已提交
104
		CollectionName: collName,
X
XuanYang-cn 已提交
105
		PartitionName:  partitionName,
S
sunby 已提交
106 107
		DbName:         "DbName",
		DbID:           UniqueID(0),
X
XuanYang-cn 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
	}
	createPartitionMsg := msgstream.CreatePartitionMsg{
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: Timestamp(3),
			EndTimestamp:   Timestamp(3),
			HashValues:     []uint32{uint32(0)},
		},
		CreatePartitionRequest: createPartitionReq,
	}

	// drop partition
	dropPartitionReq := internalpb2.DropPartitionRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kDropPartition,
			MsgID:     4,
			Timestamp: 4,
			SourceID:  4,
		},
S
sunby 已提交
126
		CollectionID:   collID,
X
XuanYang-cn 已提交
127
		PartitionID:    partitionID,
S
sunby 已提交
128
		CollectionName: collName,
X
XuanYang-cn 已提交
129
		PartitionName:  partitionName,
S
sunby 已提交
130 131
		DbName:         "DbName",
		DbID:           UniqueID(0),
X
XuanYang-cn 已提交
132 133 134 135 136 137 138 139 140 141
	}
	dropPartitionMsg := msgstream.DropPartitionMsg{
		BaseMsg: msgstream.BaseMsg{
			BeginTimestamp: Timestamp(4),
			EndTimestamp:   Timestamp(4),
			HashValues:     []uint32{uint32(0)},
		},
		DropPartitionRequest: dropPartitionReq,
	}

X
XuanYang-cn 已提交
142
	replica.addSegment(1, collID, partitionID, "insert-01")
X
XuanYang-cn 已提交
143
	inFlushCh <- &flushMsg{
S
sunby 已提交
144 145
		msgID:        5,
		timestamp:    5,
X
XuanYang-cn 已提交
146
		segmentIDs:   []UniqueID{1},
S
sunby 已提交
147
		collectionID: collID,
X
XuanYang-cn 已提交
148 149 150
	}

	tsMessages := make([]msgstream.TsMsg, 0)
S
sunby 已提交
151 152
	tsMessages = append(tsMessages, msgstream.TsMsg(&createCollMsg))
	tsMessages = append(tsMessages, msgstream.TsMsg(&dropCollMsg))
X
XuanYang-cn 已提交
153 154
	tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
	tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
155
	msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0))
X
XuanYang-cn 已提交
156 157 158
	var inMsg Msg = msgStream
	ddNode.Operate([]*Msg{&inMsg})
}