query_node_test.go 6.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querynode
Z
zhenshan.cao 已提交
13

D
dragondriver 已提交
14 15
import (
	"context"
16
	"math/rand"
X
XuanYang-cn 已提交
17
	"os"
18
	"strconv"
D
dragondriver 已提交
19 20
	"testing"
	"time"
X
XuanYang-cn 已提交
21 22

	"github.com/stretchr/testify/assert"
23

X
Xiangyu Wang 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
XuanYang-cn 已提交
25 26
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
G
godchen 已提交
27
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
28
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
X
XuanYang-cn 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
B
bigsheeper 已提交
30
	"github.com/zilliztech/milvus-distributed/internal/types"
D
dragondriver 已提交
31 32
)

X
xige-16 已提交
33
const ctxTimeInMillisecond = 5000
B
bigsheeper 已提交
34
const debug = false
D
dragondriver 已提交
35

36 37
const defaultPartitionID = UniqueID(2021)

T
ThreadDao 已提交
38 39 40
type queryServiceMock struct {
	types.QueryService
}
41

X
XuanYang-cn 已提交
42
func setup() {
43
	os.Setenv("QUERY_NODE_ID", "1")
C
cai.zhang 已提交
44
	Params.Init()
45
	//Params.QueryNodeID = 1
46 47 48 49
	Params.initQueryTimeTickChannelName()
	Params.initSearchResultChannelNames()
	Params.initStatsChannelName()
	Params.initSearchChannelNames()
50
	Params.MetaRootPath = "/etcd/test/root/querynode"
51

X
XuanYang-cn 已提交
52 53
}

54
func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo {
X
xige-16 已提交
55 56 57 58 59 60
	var fieldVec schemapb.FieldSchema
	if isBinary {
		fieldVec = schemapb.FieldSchema{
			FieldID:      UniqueID(100),
			Name:         "vec",
			IsPrimaryKey: false,
G
godchen 已提交
61
			DataType:     schemapb.DataType_BinaryVector,
X
xige-16 已提交
62 63 64
			TypeParams: []*commonpb.KeyValuePair{
				{
					Key:   "dim",
B
bigsheeper 已提交
65
					Value: "128",
X
xige-16 已提交
66
				},
X
XuanYang-cn 已提交
67
			},
X
xige-16 已提交
68 69 70 71 72
			IndexParams: []*commonpb.KeyValuePair{
				{
					Key:   "metric_type",
					Value: "JACCARD",
				},
X
XuanYang-cn 已提交
73
			},
X
xige-16 已提交
74 75 76 77 78 79
		}
	} else {
		fieldVec = schemapb.FieldSchema{
			FieldID:      UniqueID(100),
			Name:         "vec",
			IsPrimaryKey: false,
G
godchen 已提交
80
			DataType:     schemapb.DataType_FloatVector,
X
xige-16 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93
			TypeParams: []*commonpb.KeyValuePair{
				{
					Key:   "dim",
					Value: "16",
				},
			},
			IndexParams: []*commonpb.KeyValuePair{
				{
					Key:   "metric_type",
					Value: "L2",
				},
			},
		}
X
XuanYang-cn 已提交
94 95 96
	}

	fieldInt := schemapb.FieldSchema{
B
bigsheeper 已提交
97
		FieldID:      UniqueID(101),
X
XuanYang-cn 已提交
98 99
		Name:         "age",
		IsPrimaryKey: false,
G
godchen 已提交
100
		DataType:     schemapb.DataType_Int32,
X
XuanYang-cn 已提交
101 102 103 104 105 106 107 108 109
	}

	schema := schemapb.CollectionSchema{
		AutoID: true,
		Fields: []*schemapb.FieldSchema{
			&fieldVec, &fieldInt,
		},
	}

110 111 112 113 114
	collectionMeta := etcdpb.CollectionInfo{
		ID:           collectionID,
		Schema:       &schema,
		CreateTime:   Timestamp(0),
		PartitionIDs: []UniqueID{defaultPartitionID},
X
XuanYang-cn 已提交
115
	}
B
bigsheeper 已提交
116

X
XuanYang-cn 已提交
117 118 119
	return &collectionMeta
}

B
bigsheeper 已提交
120
func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentID UniqueID, optional ...bool) {
B
bigsheeper 已提交
121 122 123 124
	isBinary := false
	if len(optional) > 0 {
		isBinary = optional[0]
	}
B
bigsheeper 已提交
125
	collectionMeta := genTestCollectionMeta(collectionID, isBinary)
X
XuanYang-cn 已提交
126

127
	var err = node.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
X
XuanYang-cn 已提交
128 129
	assert.NoError(t, err)

B
bigsheeper 已提交
130
	collection, err := node.replica.getCollectionByID(collectionID)
X
XuanYang-cn 已提交
131
	assert.NoError(t, err)
132
	assert.Equal(t, collection.ID(), collectionID)
X
XuanYang-cn 已提交
133 134
	assert.Equal(t, node.replica.getCollectionNum(), 1)

135
	err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0])
X
XuanYang-cn 已提交
136 137
	assert.NoError(t, err)

138
	err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segmentTypeGrowing)
X
XuanYang-cn 已提交
139 140 141
	assert.NoError(t, err)
}

G
godchen 已提交
142
func initDmChannel(ctx context.Context, insertChannels []string, node *QueryNode) {
X
xige-16 已提交
143 144 145
	watchReq := &querypb.WatchDmChannelsRequest{
		ChannelIDs: insertChannels,
	}
G
godchen 已提交
146
	_, err := node.WatchDmChannels(ctx, watchReq)
X
xige-16 已提交
147 148 149 150 151
	if err != nil {
		panic(err)
	}
}

G
godchen 已提交
152 153
func initSearchChannel(ctx context.Context, searchChan string, resultChan string, node *QueryNode) {
	searchReq := &querypb.AddQueryChannelRequest{
X
xige-16 已提交
154 155 156
		RequestChannelID: searchChan,
		ResultChannelID:  resultChan,
	}
G
godchen 已提交
157
	_, err := node.AddQueryChannel(ctx, searchReq)
X
xige-16 已提交
158 159 160 161 162
	if err != nil {
		panic(err)
	}
}

163
func newQueryNodeMock() *QueryNode {
D
dragondriver 已提交
164 165

	var ctx context.Context
X
XuanYang-cn 已提交
166

B
bigsheeper 已提交
167 168 169
	if debug {
		ctx = context.Background()
	} else {
D
dragondriver 已提交
170 171 172
		var cancel context.CancelFunc
		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
		ctx, cancel = context.WithDeadline(context.Background(), d)
173 174 175 176
		go func() {
			<-ctx.Done()
			cancel()
		}()
D
dragondriver 已提交
177 178
	}

X
Xiangyu Wang 已提交
179
	msFactory := msgstream.NewPmsFactory()
180
	svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory)
181 182 183 184 185
	err := svr.SetQueryService(&queryServiceMock{})
	if err != nil {
		panic(err)
	}

X
XuanYang-cn 已提交
186 187 188
	return svr
}

189 190 191 192 193 194 195 196 197
func makeNewChannelNames(names []string, suffix string) []string {
	var ret []string
	for _, name := range names {
		ret = append(ret, name+suffix)
	}
	return ret
}

func refreshChannelNames() {
198
	suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10)
199 200 201 202 203
	Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix)
	Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix)
	Params.StatsChannelName = Params.StatsChannelName + suffix
}

G
godchen 已提交
204
func (q *queryServiceMock) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
205 206
	return &querypb.RegisterNodeResponse{
		Status: &commonpb.Status{
207
			ErrorCode: commonpb.ErrorCode_Success,
208
		},
G
godchen 已提交
209
		InitParams: &internalpb.InitParams{
210 211 212 213 214
			NodeID: int64(1),
		},
	}, nil
}

B
bigsheeper 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227
func newMessageStreamFactory() (msgstream.Factory, error) {
	const receiveBufSize = 1024

	pulsarURL := Params.PulsarAddress
	msFactory := msgstream.NewPmsFactory()
	m := map[string]interface{}{
		"receiveBufSize": receiveBufSize,
		"pulsarAddress":  pulsarURL,
		"pulsarBufSize":  1024}
	err := msFactory.SetParams(m)
	return msFactory, err
}

X
XuanYang-cn 已提交
228 229
func TestMain(m *testing.M) {
	setup()
230
	refreshChannelNames()
X
XuanYang-cn 已提交
231 232 233 234 235 236
	exitCode := m.Run()
	os.Exit(exitCode)
}

// NOTE: start pulsar and etcd before test
func TestQueryNode_Start(t *testing.T) {
237
	localNode := newQueryNodeMock()
C
cai.zhang 已提交
238
	localNode.Start()
239
	<-localNode.queryNodeLoopCtx.Done()
C
cai.zhang 已提交
240
	localNode.Stop()
D
dragondriver 已提交
241
}