stats_service_test.go 2.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querynode
13

D
dragondriver 已提交
14
import (
15
	"context"
D
dragondriver 已提交
16 17
	"testing"

X
Xiangyu Wang 已提交
18
	"github.com/milvus-io/milvus/internal/msgstream"
Z
zhenshan.cao 已提交
19
	"github.com/stretchr/testify/assert"
D
dragondriver 已提交
20 21 22 23
)

// NOTE: start pulsar before test
func TestStatsService_start(t *testing.T) {
24
	node := newQueryNodeMock()
B
bigsheeper 已提交
25
	initTestMeta(t, node, 0, 0)
G
groot 已提交
26

X
Xiangyu Wang 已提交
27
	msFactory := msgstream.NewPmsFactory()
G
groot 已提交
28 29 30 31 32
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	msFactory.SetParams(m)
33 34
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.loader.indexLoader.fieldStatsChan, msFactory)
	node.statsService.start()
C
cai.zhang 已提交
35
	node.Stop()
D
dragondriver 已提交
36 37
}

X
XuanYang-cn 已提交
38
//NOTE: start pulsar before test
39
func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
40 41 42 43 44
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	node, err := genSimpleQueryNode(ctx)
	assert.NoError(t, err)
D
dragondriver 已提交
45 46 47

	const receiveBufSize = 1024
	// start pulsar
48
	producerChannels := []string{Params.StatsChannelName}
D
dragondriver 已提交
49

X
Xiangyu Wang 已提交
50
	msFactory := msgstream.NewPmsFactory()
G
groot 已提交
51 52 53 54
	m := map[string]interface{}{
		"receiveBufSize": receiveBufSize,
		"pulsarAddress":  Params.PulsarAddress,
		"pulsarBufSize":  1024}
55
	err = msFactory.SetParams(m)
G
groot 已提交
56 57 58
	assert.Nil(t, err)

	statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx)
Z
zhenshan.cao 已提交
59 60
	assert.Nil(t, err)
	statsStream.AsProducer(producerChannels)
D
dragondriver 已提交
61 62 63

	var statsMsgStream msgstream.MsgStream = statsStream

64 65 66
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.loader.indexLoader.fieldStatsChan, msFactory)
	node.statsService.statsStream = statsMsgStream
	node.statsService.statsStream.Start()
D
dragondriver 已提交
67 68

	// send stats
69
	node.statsService.publicStatistic(nil)
C
cai.zhang 已提交
70
	node.Stop()
D
dragondriver 已提交
71
}