未验证 提交 1da970af 编写于 作者: B bigsheeper 提交者: GitHub

Remove useless statsService in QueryNode (#17395)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 a9aff3a6
......@@ -648,7 +648,6 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 2))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4))
factory := ProtoUDFactory{}
......@@ -1973,18 +1972,6 @@ func getTsMsg(msgType MsgType, reqID UniqueID) TsMsg {
TimeTickMsg: timeTickResult,
}
return timeTickMsg
case commonpb.MsgType_QueryNodeStats:
queryNodeSegStats := internalpb.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_QueryNodeStats,
SourceID: reqID,
},
}
queryNodeSegStatsMsg := &QueryNodeStatsMsg{
BaseMsg: baseMsg,
QueryNodeStats: queryNodeSegStats,
}
return queryNodeSegStatsMsg
}
return nil
}
......
......@@ -723,70 +723,6 @@ func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) {
return timeTick, nil
}
/////////////////////////////////////////QueryNodeStats//////////////////////////////////////////
// QueryNodeStatsMsg is a message pack that contains statistic from querynode
// GOOSE TODO: remove QueryNodeStats
type QueryNodeStatsMsg struct {
BaseMsg
internalpb.QueryNodeStats
}
// interface implementation validation
var _ TsMsg = &QueryNodeStatsMsg{}
// TraceCtx returns the context of opentracing
func (qs *QueryNodeStatsMsg) TraceCtx() context.Context {
return qs.BaseMsg.Ctx
}
// SetTraceCtx is used to set context for opentracing
func (qs *QueryNodeStatsMsg) SetTraceCtx(ctx context.Context) {
qs.BaseMsg.Ctx = ctx
}
// ID returns the ID of this message pack
func (qs *QueryNodeStatsMsg) ID() UniqueID {
return qs.Base.MsgID
}
// Type returns the type of this message pack
func (qs *QueryNodeStatsMsg) Type() MsgType {
return qs.Base.MsgType
}
// SourceID indicates which component generated this message
func (qs *QueryNodeStatsMsg) SourceID() int64 {
return qs.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) {
queryNodeSegStatsTask := input.(*QueryNodeStatsMsg)
queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats
mb, err := proto.Marshal(queryNodeSegStats)
if err != nil {
return nil, err
}
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
queryNodeSegStats := internalpb.QueryNodeStats{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, &queryNodeSegStats)
if err != nil {
return nil, err
}
queryNodeSegStatsMsg := &QueryNodeStatsMsg{QueryNodeStats: queryNodeSegStats}
return queryNodeSegStatsMsg, nil
}
/////////////////////////////////////////CreateCollection//////////////////////////////////////////
// CreateCollectionMsg is a message pack that contains create collection request
......
......@@ -597,51 +597,6 @@ func TestTimeTickMsg_Unmarshal_IllegalParameter(t *testing.T) {
assert.Nil(t, tsMsg)
}
func TestQueryNodeStatsMsg(t *testing.T) {
queryNodeStatsMsg := &QueryNodeStatsMsg{
BaseMsg: generateBaseMsg(),
QueryNodeStats: internalpb.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: 1,
Timestamp: 2,
SourceID: 3,
},
SegStats: []*internalpb.SegmentStats{},
FieldStats: []*internalpb.FieldStats{},
},
}
assert.NotNil(t, queryNodeStatsMsg.TraceCtx())
ctx := context.Background()
queryNodeStatsMsg.SetTraceCtx(ctx)
assert.Equal(t, ctx, queryNodeStatsMsg.TraceCtx())
assert.Equal(t, int64(1), queryNodeStatsMsg.ID())
assert.Equal(t, commonpb.MsgType_TimeTick, queryNodeStatsMsg.Type())
assert.Equal(t, int64(3), queryNodeStatsMsg.SourceID())
bytes, err := queryNodeStatsMsg.Marshal(queryNodeStatsMsg)
assert.Nil(t, err)
tsMsg, err := queryNodeStatsMsg.Unmarshal(bytes)
assert.Nil(t, err)
queryNodeStatsMsg2, ok := tsMsg.(*QueryNodeStatsMsg)
assert.True(t, ok)
assert.Equal(t, int64(1), queryNodeStatsMsg2.ID())
assert.Equal(t, commonpb.MsgType_TimeTick, queryNodeStatsMsg2.Type())
assert.Equal(t, int64(3), queryNodeStatsMsg2.SourceID())
}
func TestQueryNodeStatsMsg_Unmarshal_IllegalParameter(t *testing.T) {
queryNodeStatsMsg := &QueryNodeStatsMsg{}
tsMsg, err := queryNodeStatsMsg.Unmarshal(10)
assert.NotNil(t, err)
assert.Nil(t, tsMsg)
}
func TestCreateCollectionMsg(t *testing.T) {
createCollectionMsg := &CreateCollectionMsg{
BaseMsg: generateBaseMsg(),
......
......@@ -93,7 +93,6 @@ func allowTrace(in interface{}) bool {
switch res := in.(type) {
case TsMsg:
return !(res.Type() == commonpb.MsgType_TimeTick ||
res.Type() == commonpb.MsgType_QueryNodeStats ||
res.Type() == commonpb.MsgType_LoadIndex)
default:
return false
......
......@@ -65,7 +65,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
dropCollectionMsg := DropCollectionMsg{}
createPartitionMsg := CreatePartitionMsg{}
dropPartitionMsg := DropPartitionMsg{}
queryNodeSegStatsMsg := QueryNodeStatsMsg{}
dataNodeTtMsg := DataNodeTtMsg{}
sealedSegmentsChangeInfoMsg := SealedSegmentsChangeInfoMsg{}
......@@ -78,7 +77,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_Retrieve] = retrieveMsg.Unmarshal
p.TempMap[commonpb.MsgType_RetrieveResult] = retrieveResultMsg.Unmarshal
p.TempMap[commonpb.MsgType_TimeTick] = timeTickMsg.Unmarshal
p.TempMap[commonpb.MsgType_QueryNodeStats] = queryNodeSegStatsMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreateCollection] = createCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropCollection] = dropCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreatePartition] = createPartitionMsg.Unmarshal
......
......@@ -255,12 +255,6 @@ message SegmentStats {
bool recently_modified = 4;
}
message QueryNodeStats {
common.MsgBase base = 1;
repeated SegmentStats seg_stats = 2;
repeated FieldStats field_stats = 3;
}
message MsgPosition {
string channel_name = 1;
bytes msgID = 2;
......
......@@ -84,7 +84,6 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.CommonCfg.QueryNodeStats,
}, nil
}
......
......@@ -56,7 +56,6 @@ func setup() {
func refreshParams() {
rand.Seed(time.Now().UnixNano())
suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10)
Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + suffix
Params.CommonCfg.QueryCoordTimeTick = Params.CommonCfg.QueryCoordTimeTick + suffix
Params.EtcdCfg.MetaRootPath = Params.EtcdCfg.MetaRootPath + suffix
GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo)
......
......@@ -87,7 +87,6 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: Params.CommonCfg.QueryNodeStats,
}, nil
}
......
......@@ -94,10 +94,6 @@ type QueryNode struct {
// dataSyncService
dataSyncService *dataSyncService
// internal services
//queryService *queryService
statsService *statsService
// segment loader
loader *segmentLoader
......@@ -259,7 +255,6 @@ func (node *QueryNode) Start() error {
// start services
go node.watchChangeInfo()
//go node.statsService.start()
// create shardClusterService for shardLeader functions.
node.ShardClusterService = newShardClusterService(node.etcdCli, node.session, node)
......
......@@ -19,10 +19,8 @@ package querynode
import (
"context"
"io/ioutil"
"math/rand"
"net/url"
"os"
"strconv"
"sync"
"testing"
"time"
......@@ -97,7 +95,6 @@ func newQueryNodeMock() *QueryNode {
replica := newCollectionReplica()
svr.metaReplica = replica
svr.dataSyncService = newDataSyncService(ctx, svr.metaReplica, tsReplica, factory)
svr.statsService = newStatsService(ctx, svr.metaReplica, factory)
svr.vectorStorage, err = factory.NewVectorStorageChunkManager(ctx)
if err != nil {
panic(err)
......@@ -143,7 +140,6 @@ func startEmbedEtcdServer() (*embed.Etcd, error) {
func TestMain(m *testing.M) {
setup()
Params.CommonCfg.QueryNodeStats = Params.CommonCfg.QueryNodeStats + strconv.Itoa(rand.Int())
// init embed etcd
var err error
embedetcdServer, err = startEmbedEtcdServer()
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"context"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"go.uber.org/zap"
)
type statsService struct {
ctx context.Context
replica ReplicaInterface
statsStream msgstream.MsgStream
msFactory msgstream.Factory
}
func newStatsService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *statsService {
return &statsService{
ctx: ctx,
replica: replica,
statsStream: nil,
msFactory: factory,
}
}
func (sService *statsService) start() {
sleepTimeInterval := Params.QueryNodeCfg.StatsPublishInterval
// start pulsar
producerChannels := []string{Params.CommonCfg.QueryNodeStats}
statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx)
statsStream.AsProducer(producerChannels)
log.Info("QueryNode statsService AsProducer succeed", zap.Strings("channels", producerChannels))
var statsMsgStream msgstream.MsgStream = statsStream
sService.statsStream = statsMsgStream
sService.statsStream.Start()
// start service
for {
select {
case <-sService.ctx.Done():
log.Info("stats service closed")
return
case <-time.After(time.Duration(sleepTimeInterval) * time.Millisecond):
sService.publicStatistic(nil)
}
}
}
func (sService *statsService) close() {
if sService.statsStream != nil {
sService.statsStream.Close()
}
}
func (sService *statsService) publicStatistic(fieldStats []*internalpb.FieldStats) {
segStats := sService.replica.getSegmentStatistics()
queryNodeStats := internalpb.QueryNodeStats{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_QueryNodeStats,
SourceID: Params.QueryNodeCfg.GetNodeID(),
},
SegStats: segStats,
FieldStats: fieldStats,
}
var msg msgstream.TsMsg = &msgstream.QueryNodeStatsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
QueryNodeStats: queryNodeStats,
}
var msgPack = msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
err := sService.statsStream.Produce(&msgPack)
if err != nil {
log.Warn("failed to publish stats", zap.Error(err))
}
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/stretchr/testify/assert"
)
// NOTE: start pulsar before test
func TestStatsService_start(t *testing.T) {
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
factory := dependency.NewDefaultFactory(true)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.metaReplica, factory)
node.statsService.start()
node.Stop()
}
//NOTE: start pulsar before test
func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
const receiveBufSize = 1024
// start pulsar
producerChannels := []string{Params.CommonCfg.QueryNodeStats}
factory := dependency.NewDefaultFactory(true)
statsStream, err := factory.NewMsgStream(node.queryNodeLoopCtx)
assert.Nil(t, err)
statsStream.AsProducer(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
node.statsService = newStatsService(node.queryNodeLoopCtx, node.metaReplica, factory)
node.statsService.statsStream = statsMsgStream
node.statsService.statsStream.Start()
// send stats
node.statsService.publicStatistic(nil)
node.Stop()
}
......@@ -111,7 +111,6 @@ type commonConfig struct {
QueryCoordSearch string
QueryCoordSearchResult string
QueryCoordTimeTick string
QueryNodeStats string
QueryNodeSubName string
DataCoordStatistic string
......@@ -151,7 +150,6 @@ func (p *commonConfig) init(base *BaseTable) {
p.initQueryCoordSearch()
p.initQueryCoordSearchResult()
p.initQueryCoordTimeTick()
p.initQueryNodeStats()
p.initQueryNodeSubName()
p.initDataCoordStatistic()
......@@ -274,14 +272,6 @@ func (p *commonConfig) initQueryCoordTimeTick() {
}
// --- querynode ---
func (p *commonConfig) initQueryNodeStats() {
keys := []string{
"msgChannel.chanNamePrefix.queryNodeStats",
"common.chanNamePrefix.queryNodeStats",
}
p.QueryNodeStats = p.initChanNamePrefix(keys)
}
func (p *commonConfig) initQueryNodeSubName() {
keys := []string{
"msgChannel.subNamePrefix.queryNodeSubNamePrefix",
......
......@@ -91,9 +91,6 @@ func TestComponentParam(t *testing.T) {
t.Logf("querycoord timetick channel = %s", Params.QueryCoordTimeTick)
// -- querynode --
assert.Equal(t, Params.QueryNodeStats, "by-dev-query-node-stats")
t.Logf("querynode stats channel = %s", Params.QueryNodeStats)
assert.Equal(t, Params.QueryNodeSubName, "by-dev-queryNode")
t.Logf("querynode subname = %s", Params.QueryNodeSubName)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册