stats_service_test.go 2.4 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package querynode
18

D
dragondriver 已提交
19
import (
20
	"context"
D
dragondriver 已提交
21 22
	"testing"

X
Xiangyu Wang 已提交
23
	"github.com/milvus-io/milvus/internal/msgstream"
Z
zhenshan.cao 已提交
24
	"github.com/stretchr/testify/assert"
D
dragondriver 已提交
25 26 27 28
)

// NOTE: start pulsar before test
func TestStatsService_start(t *testing.T) {
29
	node := newQueryNodeMock()
B
bigsheeper 已提交
30
	initTestMeta(t, node, 0, 0)
G
groot 已提交
31

X
Xiangyu Wang 已提交
32
	msFactory := msgstream.NewPmsFactory()
G
groot 已提交
33
	m := map[string]interface{}{
34
		"PulsarAddress":  Params.PulsarCfg.Address,
G
groot 已提交
35 36 37
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	msFactory.SetParams(m)
38 39
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.loader.indexLoader.fieldStatsChan, msFactory)
	node.statsService.start()
C
cai.zhang 已提交
40
	node.Stop()
D
dragondriver 已提交
41 42
}

X
XuanYang-cn 已提交
43
//NOTE: start pulsar before test
44
func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
45 46 47 48 49
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	node, err := genSimpleQueryNode(ctx)
	assert.NoError(t, err)
D
dragondriver 已提交
50 51 52

	const receiveBufSize = 1024
	// start pulsar
53
	producerChannels := []string{Params.QueryNodeCfg.StatsChannelName}
D
dragondriver 已提交
54

X
Xiangyu Wang 已提交
55
	msFactory := msgstream.NewPmsFactory()
G
groot 已提交
56 57
	m := map[string]interface{}{
		"receiveBufSize": receiveBufSize,
58
		"pulsarAddress":  Params.PulsarCfg.Address,
G
groot 已提交
59
		"pulsarBufSize":  1024}
60
	err = msFactory.SetParams(m)
G
groot 已提交
61 62 63
	assert.Nil(t, err)

	statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx)
Z
zhenshan.cao 已提交
64 65
	assert.Nil(t, err)
	statsStream.AsProducer(producerChannels)
D
dragondriver 已提交
66 67 68

	var statsMsgStream msgstream.MsgStream = statsStream

69 70 71
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, node.loader.indexLoader.fieldStatsChan, msFactory)
	node.statsService.statsStream = statsMsgStream
	node.statsService.statsStream.Start()
D
dragondriver 已提交
72 73

	// send stats
74
	node.statsService.publicStatistic(nil)
C
cai.zhang 已提交
75
	node.Stop()
D
dragondriver 已提交
76
}