query_node_test.go 3.5 KB
Newer Older
N
neza2017 已提交
1
package querynode
Z
zhenshan.cao 已提交
2

D
dragondriver 已提交
3 4
import (
	"context"
5
	"math/rand"
X
XuanYang-cn 已提交
6
	"os"
7
	"strconv"
D
dragondriver 已提交
8 9
	"testing"
	"time"
X
XuanYang-cn 已提交
10 11 12

	"github.com/golang/protobuf/proto"
	"github.com/stretchr/testify/assert"
13

X
XuanYang-cn 已提交
14 15 16
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
D
dragondriver 已提交
17 18
)

19
const ctxTimeInMillisecond = 2000
D
dragondriver 已提交
20 21
const closeWithDeadline = true

X
XuanYang-cn 已提交
22
func setup() {
C
cai.zhang 已提交
23
	Params.Init()
24
	Params.MetaRootPath = "/etcd/test/root/querynode"
X
XuanYang-cn 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
}

func genTestCollectionMeta(collectionName string, collectionID UniqueID) *etcdpb.CollectionMeta {
	fieldVec := schemapb.FieldSchema{
		Name:         "vec",
		IsPrimaryKey: false,
		DataType:     schemapb.DataType_VECTOR_FLOAT,
		TypeParams: []*commonpb.KeyValuePair{
			{
				Key:   "dim",
				Value: "16",
			},
		},
		IndexParams: []*commonpb.KeyValuePair{
			{
				Key:   "metric_type",
				Value: "L2",
			},
		},
	}

	fieldInt := schemapb.FieldSchema{
		Name:         "age",
		IsPrimaryKey: false,
		DataType:     schemapb.DataType_INT32,
	}

	schema := schemapb.CollectionSchema{
		Name:   collectionName,
		AutoID: true,
		Fields: []*schemapb.FieldSchema{
			&fieldVec, &fieldInt,
		},
	}

	collectionMeta := etcdpb.CollectionMeta{
		ID:            collectionID,
		Schema:        &schema,
		CreateTime:    Timestamp(0),
		SegmentIDs:    []UniqueID{0},
		PartitionTags: []string{"default"},
	}
	return &collectionMeta
}

func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collectionID UniqueID, segmentID UniqueID) {
	collectionMeta := genTestCollectionMeta(collectionName, collectionID)

73 74
	schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
	assert.NotEqual(t, "", schemaBlob)
X
XuanYang-cn 已提交
75

76
	var err = node.replica.addCollection(collectionMeta.ID, schemaBlob)
X
XuanYang-cn 已提交
77 78 79 80
	assert.NoError(t, err)

	collection, err := node.replica.getCollectionByName(collectionName)
	assert.NoError(t, err)
81 82
	assert.Equal(t, collection.Name(), collectionName)
	assert.Equal(t, collection.ID(), collectionID)
X
XuanYang-cn 已提交
83 84 85 86 87 88 89 90 91 92
	assert.Equal(t, node.replica.getCollectionNum(), 1)

	err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionTags[0])
	assert.NoError(t, err)

	err = node.replica.addSegment(segmentID, collectionMeta.PartitionTags[0], collectionID)
	assert.NoError(t, err)
}

func newQueryNode() *QueryNode {
D
dragondriver 已提交
93 94

	var ctx context.Context
X
XuanYang-cn 已提交
95

D
dragondriver 已提交
96 97 98 99
	if closeWithDeadline {
		var cancel context.CancelFunc
		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
		ctx, cancel = context.WithDeadline(context.Background(), d)
100 101 102 103
		go func() {
			<-ctx.Done()
			cancel()
		}()
D
dragondriver 已提交
104 105 106 107
	} else {
		ctx = context.Background()
	}

X
XuanYang-cn 已提交
108 109 110 111 112
	svr := NewQueryNode(ctx, 0)
	return svr

}

113 114 115 116 117 118 119 120 121
func makeNewChannelNames(names []string, suffix string) []string {
	var ret []string
	for _, name := range names {
		ret = append(ret, name+suffix)
	}
	return ret
}

func refreshChannelNames() {
N
neza2017 已提交
122
	suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(100), 10)
123 124 125 126 127 128 129
	Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
	Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
	Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix)
	Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix)
	Params.StatsChannelName = Params.StatsChannelName + suffix
}

X
XuanYang-cn 已提交
130 131
func TestMain(m *testing.M) {
	setup()
132
	refreshChannelNames()
X
XuanYang-cn 已提交
133 134 135 136 137 138 139 140 141 142
	exitCode := m.Run()
	os.Exit(exitCode)
}

// NOTE: start pulsar and etcd before test
func TestQueryNode_Start(t *testing.T) {
	localNode := newQueryNode()
	err := localNode.Start()
	assert.Nil(t, err)
	localNode.Close()
D
dragondriver 已提交
143
}