query_node_test.go 5.4 KB
Newer Older
1
package querynode
Z
zhenshan.cao 已提交
2

D
dragondriver 已提交
3 4
import (
	"context"
5
	"math/rand"
X
XuanYang-cn 已提交
6
	"os"
7
	"strconv"
D
dragondriver 已提交
8 9
	"testing"
	"time"
X
XuanYang-cn 已提交
10

T
ThreadDao 已提交
11 12
	"github.com/zilliztech/milvus-distributed/internal/types"

X
XuanYang-cn 已提交
13
	"github.com/stretchr/testify/assert"
14

G
groot 已提交
15
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
X
XuanYang-cn 已提交
16 17
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
G
godchen 已提交
18
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
19
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
X
XuanYang-cn 已提交
20
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
D
dragondriver 已提交
21 22
)

X
xige-16 已提交
23
const ctxTimeInMillisecond = 5000
B
bigsheeper 已提交
24
const debug = false
D
dragondriver 已提交
25

26 27
const defaultPartitionID = UniqueID(2021)

T
ThreadDao 已提交
28 29 30
type queryServiceMock struct {
	types.QueryService
}
31

X
XuanYang-cn 已提交
32
func setup() {
33
	os.Setenv("QUERY_NODE_ID", "1")
C
cai.zhang 已提交
34
	Params.Init()
35
	//Params.QueryNodeID = 1
36 37 38 39
	Params.initQueryTimeTickChannelName()
	Params.initSearchResultChannelNames()
	Params.initStatsChannelName()
	Params.initSearchChannelNames()
40
	Params.MetaRootPath = "/etcd/test/root/querynode"
41

X
XuanYang-cn 已提交
42 43
}

44
func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo {
X
xige-16 已提交
45 46 47 48 49 50
	var fieldVec schemapb.FieldSchema
	if isBinary {
		fieldVec = schemapb.FieldSchema{
			FieldID:      UniqueID(100),
			Name:         "vec",
			IsPrimaryKey: false,
G
godchen 已提交
51
			DataType:     schemapb.DataType_BinaryVector,
X
xige-16 已提交
52 53 54
			TypeParams: []*commonpb.KeyValuePair{
				{
					Key:   "dim",
B
bigsheeper 已提交
55
					Value: "128",
X
xige-16 已提交
56
				},
X
XuanYang-cn 已提交
57
			},
X
xige-16 已提交
58 59 60 61 62
			IndexParams: []*commonpb.KeyValuePair{
				{
					Key:   "metric_type",
					Value: "JACCARD",
				},
X
XuanYang-cn 已提交
63
			},
X
xige-16 已提交
64 65 66 67 68 69
		}
	} else {
		fieldVec = schemapb.FieldSchema{
			FieldID:      UniqueID(100),
			Name:         "vec",
			IsPrimaryKey: false,
G
godchen 已提交
70
			DataType:     schemapb.DataType_FloatVector,
X
xige-16 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83
			TypeParams: []*commonpb.KeyValuePair{
				{
					Key:   "dim",
					Value: "16",
				},
			},
			IndexParams: []*commonpb.KeyValuePair{
				{
					Key:   "metric_type",
					Value: "L2",
				},
			},
		}
X
XuanYang-cn 已提交
84 85 86
	}

	fieldInt := schemapb.FieldSchema{
B
bigsheeper 已提交
87
		FieldID:      UniqueID(101),
X
XuanYang-cn 已提交
88 89
		Name:         "age",
		IsPrimaryKey: false,
G
godchen 已提交
90
		DataType:     schemapb.DataType_Int32,
X
XuanYang-cn 已提交
91 92 93 94 95 96 97 98 99
	}

	schema := schemapb.CollectionSchema{
		AutoID: true,
		Fields: []*schemapb.FieldSchema{
			&fieldVec, &fieldInt,
		},
	}

100 101 102 103 104
	collectionMeta := etcdpb.CollectionInfo{
		ID:           collectionID,
		Schema:       &schema,
		CreateTime:   Timestamp(0),
		PartitionIDs: []UniqueID{defaultPartitionID},
X
XuanYang-cn 已提交
105
	}
B
bigsheeper 已提交
106

X
XuanYang-cn 已提交
107 108 109
	return &collectionMeta
}

B
bigsheeper 已提交
110
func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentID UniqueID, optional ...bool) {
B
bigsheeper 已提交
111 112 113 114
	isBinary := false
	if len(optional) > 0 {
		isBinary = optional[0]
	}
B
bigsheeper 已提交
115
	collectionMeta := genTestCollectionMeta(collectionID, isBinary)
X
XuanYang-cn 已提交
116

117
	var err = node.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
X
XuanYang-cn 已提交
118 119
	assert.NoError(t, err)

B
bigsheeper 已提交
120
	collection, err := node.replica.getCollectionByID(collectionID)
X
XuanYang-cn 已提交
121
	assert.NoError(t, err)
122
	assert.Equal(t, collection.ID(), collectionID)
X
XuanYang-cn 已提交
123 124
	assert.Equal(t, node.replica.getCollectionNum(), 1)

125
	err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0])
X
XuanYang-cn 已提交
126 127
	assert.NoError(t, err)

128
	err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segmentTypeGrowing)
X
XuanYang-cn 已提交
129 130 131
	assert.NoError(t, err)
}

G
godchen 已提交
132
func initDmChannel(ctx context.Context, insertChannels []string, node *QueryNode) {
X
xige-16 已提交
133 134 135
	watchReq := &querypb.WatchDmChannelsRequest{
		ChannelIDs: insertChannels,
	}
G
godchen 已提交
136
	_, err := node.WatchDmChannels(ctx, watchReq)
X
xige-16 已提交
137 138 139 140 141
	if err != nil {
		panic(err)
	}
}

G
godchen 已提交
142 143
func initSearchChannel(ctx context.Context, searchChan string, resultChan string, node *QueryNode) {
	searchReq := &querypb.AddQueryChannelRequest{
X
xige-16 已提交
144 145 146
		RequestChannelID: searchChan,
		ResultChannelID:  resultChan,
	}
G
godchen 已提交
147
	_, err := node.AddQueryChannel(ctx, searchReq)
X
xige-16 已提交
148 149 150 151 152
	if err != nil {
		panic(err)
	}
}

153
func newQueryNodeMock() *QueryNode {
D
dragondriver 已提交
154 155

	var ctx context.Context
X
XuanYang-cn 已提交
156

B
bigsheeper 已提交
157 158 159
	if debug {
		ctx = context.Background()
	} else {
D
dragondriver 已提交
160 161 162
		var cancel context.CancelFunc
		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
		ctx, cancel = context.WithDeadline(context.Background(), d)
163 164 165 166
		go func() {
			<-ctx.Done()
			cancel()
		}()
D
dragondriver 已提交
167 168
	}

G
groot 已提交
169
	msFactory := pulsarms.NewFactory()
170
	svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory)
171 172 173 174 175
	err := svr.SetQueryService(&queryServiceMock{})
	if err != nil {
		panic(err)
	}

X
XuanYang-cn 已提交
176 177 178
	return svr
}

179 180 181 182 183 184 185 186 187
func makeNewChannelNames(names []string, suffix string) []string {
	var ret []string
	for _, name := range names {
		ret = append(ret, name+suffix)
	}
	return ret
}

func refreshChannelNames() {
188
	suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10)
189 190 191 192 193 194 195
	Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
	Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
	Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix)
	Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix)
	Params.StatsChannelName = Params.StatsChannelName + suffix
}

G
godchen 已提交
196
func (q *queryServiceMock) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
197 198
	return &querypb.RegisterNodeResponse{
		Status: &commonpb.Status{
199
			ErrorCode: commonpb.ErrorCode_Success,
200
		},
G
godchen 已提交
201
		InitParams: &internalpb.InitParams{
202 203 204 205 206
			NodeID: int64(1),
		},
	}, nil
}

X
XuanYang-cn 已提交
207 208
func TestMain(m *testing.M) {
	setup()
209
	refreshChannelNames()
X
XuanYang-cn 已提交
210 211 212 213 214 215
	exitCode := m.Run()
	os.Exit(exitCode)
}

// NOTE: start pulsar and etcd before test
func TestQueryNode_Start(t *testing.T) {
216
	localNode := newQueryNodeMock()
C
cai.zhang 已提交
217
	localNode.Start()
218
	<-localNode.queryNodeLoopCtx.Done()
C
cai.zhang 已提交
219
	localNode.Stop()
D
dragondriver 已提交
220
}