query_node_test.go 5.0 KB
Newer Older
1
package querynode
Z
zhenshan.cao 已提交
2

D
dragondriver 已提交
3 4
import (
	"context"
5
	"math/rand"
X
XuanYang-cn 已提交
6
	"os"
7
	"strconv"
D
dragondriver 已提交
8 9
	"testing"
	"time"
X
XuanYang-cn 已提交
10 11

	"github.com/stretchr/testify/assert"
12

G
groot 已提交
13
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
X
XuanYang-cn 已提交
14 15
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
16 17
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
X
XuanYang-cn 已提交
18
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
D
dragondriver 已提交
19 20
)

X
xige-16 已提交
21
const ctxTimeInMillisecond = 5000
B
bigsheeper 已提交
22
const debug = false
D
dragondriver 已提交
23

24 25
const defaultPartitionID = UniqueID(2021)

26 27
type queryServiceMock struct{}

X
XuanYang-cn 已提交
28
func setup() {
C
cai.zhang 已提交
29
	Params.Init()
30
	Params.MetaRootPath = "/etcd/test/root/querynode"
X
XuanYang-cn 已提交
31 32
}

33
func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo {
X
xige-16 已提交
34 35 36 37 38 39 40 41 42 43
	var fieldVec schemapb.FieldSchema
	if isBinary {
		fieldVec = schemapb.FieldSchema{
			FieldID:      UniqueID(100),
			Name:         "vec",
			IsPrimaryKey: false,
			DataType:     schemapb.DataType_VECTOR_BINARY,
			TypeParams: []*commonpb.KeyValuePair{
				{
					Key:   "dim",
B
bigsheeper 已提交
44
					Value: "128",
X
xige-16 已提交
45
				},
X
XuanYang-cn 已提交
46
			},
X
xige-16 已提交
47 48 49 50 51
			IndexParams: []*commonpb.KeyValuePair{
				{
					Key:   "metric_type",
					Value: "JACCARD",
				},
X
XuanYang-cn 已提交
52
			},
X
xige-16 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
		}
	} else {
		fieldVec = schemapb.FieldSchema{
			FieldID:      UniqueID(100),
			Name:         "vec",
			IsPrimaryKey: false,
			DataType:     schemapb.DataType_VECTOR_FLOAT,
			TypeParams: []*commonpb.KeyValuePair{
				{
					Key:   "dim",
					Value: "16",
				},
			},
			IndexParams: []*commonpb.KeyValuePair{
				{
					Key:   "metric_type",
					Value: "L2",
				},
			},
		}
X
XuanYang-cn 已提交
73 74 75
	}

	fieldInt := schemapb.FieldSchema{
B
bigsheeper 已提交
76
		FieldID:      UniqueID(101),
X
XuanYang-cn 已提交
77 78 79 80 81 82 83 84 85 86 87 88
		Name:         "age",
		IsPrimaryKey: false,
		DataType:     schemapb.DataType_INT32,
	}

	schema := schemapb.CollectionSchema{
		AutoID: true,
		Fields: []*schemapb.FieldSchema{
			&fieldVec, &fieldInt,
		},
	}

89 90 91 92 93
	collectionMeta := etcdpb.CollectionInfo{
		ID:           collectionID,
		Schema:       &schema,
		CreateTime:   Timestamp(0),
		PartitionIDs: []UniqueID{defaultPartitionID},
X
XuanYang-cn 已提交
94
	}
B
bigsheeper 已提交
95

X
XuanYang-cn 已提交
96 97 98
	return &collectionMeta
}

B
bigsheeper 已提交
99
func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentID UniqueID, optional ...bool) {
B
bigsheeper 已提交
100 101 102 103
	isBinary := false
	if len(optional) > 0 {
		isBinary = optional[0]
	}
B
bigsheeper 已提交
104
	collectionMeta := genTestCollectionMeta(collectionID, isBinary)
X
XuanYang-cn 已提交
105

106
	var err = node.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
X
XuanYang-cn 已提交
107 108
	assert.NoError(t, err)

B
bigsheeper 已提交
109
	collection, err := node.replica.getCollectionByID(collectionID)
X
XuanYang-cn 已提交
110
	assert.NoError(t, err)
111
	assert.Equal(t, collection.ID(), collectionID)
X
XuanYang-cn 已提交
112 113
	assert.Equal(t, node.replica.getCollectionNum(), 1)

114
	err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0])
X
XuanYang-cn 已提交
115 116
	assert.NoError(t, err)

117
	err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segTypeGrowing)
X
XuanYang-cn 已提交
118 119 120
	assert.NoError(t, err)
}

X
xige-16 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
func initDmChannel(insertChannels []string, node *QueryNode) {
	watchReq := &querypb.WatchDmChannelsRequest{
		ChannelIDs: insertChannels,
	}
	_, err := node.WatchDmChannels(watchReq)
	if err != nil {
		panic(err)
	}
}

func initSearchChannel(searchChan string, resultChan string, node *QueryNode) {
	searchReq := &querypb.AddQueryChannelsRequest{
		RequestChannelID: searchChan,
		ResultChannelID:  resultChan,
	}
	_, err := node.AddQueryChannel(searchReq)
	if err != nil {
		panic(err)
	}
}

142
func newQueryNodeMock() *QueryNode {
D
dragondriver 已提交
143 144

	var ctx context.Context
X
XuanYang-cn 已提交
145

B
bigsheeper 已提交
146 147 148
	if debug {
		ctx = context.Background()
	} else {
D
dragondriver 已提交
149 150 151
		var cancel context.CancelFunc
		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
		ctx, cancel = context.WithDeadline(context.Background(), d)
152 153 154 155
		go func() {
			<-ctx.Done()
			cancel()
		}()
D
dragondriver 已提交
156 157
	}

G
groot 已提交
158 159
	msFactory := pulsarms.NewFactory()
	svr := NewQueryNode(ctx, 0, msFactory)
160 161 162 163 164
	err := svr.SetQueryService(&queryServiceMock{})
	if err != nil {
		panic(err)
	}

X
XuanYang-cn 已提交
165 166 167
	return svr
}

168 169 170 171 172 173 174 175 176
func makeNewChannelNames(names []string, suffix string) []string {
	var ret []string
	for _, name := range names {
		ret = append(ret, name+suffix)
	}
	return ret
}

func refreshChannelNames() {
177
	suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10)
178 179 180 181 182 183 184
	Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
	Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
	Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix)
	Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix)
	Params.StatsChannelName = Params.StatsChannelName + suffix
}

185 186 187 188 189 190 191 192 193 194 195
func (q *queryServiceMock) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
	return &querypb.RegisterNodeResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		InitParams: &internalpb2.InitParams{
			NodeID: int64(1),
		},
	}, nil
}

X
XuanYang-cn 已提交
196 197
func TestMain(m *testing.M) {
	setup()
198
	refreshChannelNames()
X
XuanYang-cn 已提交
199 200 201 202 203 204
	exitCode := m.Run()
	os.Exit(exitCode)
}

// NOTE: start pulsar and etcd before test
func TestQueryNode_Start(t *testing.T) {
205
	localNode := newQueryNodeMock()
C
cai.zhang 已提交
206 207
	localNode.Start()
	localNode.Stop()
D
dragondriver 已提交
208
}