未验证 提交 0c39e037 编写于 作者: B Bingyi Sun 提交者: GitHub

Move segment statistics update to datanode tt loop (#13233)

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Co-authored-by: Nsunby <bingyi.sun@zilliz.com>
上级 40234475
......@@ -69,7 +69,6 @@ type ParamTable struct {
// --- Channels ---
ClusterChannelPrefix string
InsertChannelPrefixName string
StatisticsChannelName string
TimeTickChannelName string
SegmentInfoChannelName string
DataCoordSubscriptionName string
......@@ -117,7 +116,6 @@ func (p *ParamTable) Init() {
// Has to init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initInsertChannelPrefixName()
p.initStatisticsChannelName()
p.initTimeTickChannelName()
p.initSegmentInfoChannelName()
p.initDataCoordSubscriptionName()
......@@ -244,15 +242,6 @@ func (p *ParamTable) initInsertChannelPrefixName() {
p.InsertChannelPrefixName = strings.Join(s, "-")
}
func (p *ParamTable) initStatisticsChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.StatisticsChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initTimeTickChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
if err != nil {
......
......@@ -29,9 +29,6 @@ func TestParamTable(t *testing.T) {
assert.Equal(t, Params.InsertChannelPrefixName, "by-dev-insert-channel-")
t.Logf("data coord insert channel = %s", Params.InsertChannelPrefixName)
assert.Equal(t, Params.StatisticsChannelName, "by-dev-datacoord-statistics-channel")
t.Logf("data coord stats channel = %s", Params.StatisticsChannelName)
assert.Equal(t, Params.TimeTickChannelName, "by-dev-datacoord-timetick-channel")
t.Logf("data coord timetick channel = %s", Params.TimeTickChannelName)
......
......@@ -142,5 +142,6 @@ const flushInterval = 2 * time.Second
func flushPolicyV1(segment *SegmentInfo, t Timestamp) bool {
return segment.GetState() == commonpb.SegmentState_Sealed &&
segment.GetLastExpireTime() <= t &&
time.Since(segment.lastFlushTime) >= flushInterval
time.Since(segment.lastFlushTime) >= flushInterval &&
segment.currRows != 0
}
......@@ -302,6 +302,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
meta.SetCurrentRows(allocations[0].SegmentID, 1)
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
......
......@@ -399,8 +399,7 @@ func (s *Server) initMeta() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(4)
s.startStatsChannel(s.serverLoopCtx)
s.serverLoopWg.Add(3)
s.startDataNodeTtLoop(s.serverLoopCtx)
s.startWatchService(s.serverLoopCtx)
s.startFlushLoop(s.serverLoopCtx)
......@@ -415,44 +414,6 @@ func (s *Server) startServerLoop() {
})
}
func (s *Server) startStatsChannel(ctx context.Context) {
statsStream, _ := s.msFactory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
log.Debug("DataCoord creates statistics channel consumer",
zap.String("channel", Params.StatisticsChannelName),
zap.String("description", Params.DataCoordSubscriptionName))
statsStream.Start()
go func() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
defer statsStream.Close()
for {
select {
case <-ctx.Done():
log.Debug("statistics channel shutdown")
return
default:
}
msgPack := statsStream.Consume()
if msgPack == nil {
log.Debug("receive nil stats msg, shutdown stats channel")
return
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_SegmentStatistics {
log.Warn("receive unknown msg from segment statistics channel",
zap.Stringer("msgType", msg.Type()))
continue
}
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
}
}
}()
}
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
......@@ -475,6 +436,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
checker.Start()
defer checker.Stop()
}
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
defer ttMsgStream.Close()
......@@ -491,75 +453,119 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
return
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_DataNodeTt {
log.Warn("receive unexpected msg type from tt channel",
zap.Stringer("msgType", msg.Type()))
ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
if !ok {
log.Warn("receive unexpected msg type from tt channel")
continue
}
ttMsg := msg.(*msgstream.DataNodeTtMsg)
if enableTtChecker {
checker.Check()
}
ch := ttMsg.ChannelName
ts := ttMsg.Timestamp
if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
log.Warn("failed to expire allocations", zap.Error(err))
continue
}
physical, _ := tsoutil.ParseTS(ts)
if time.Since(physical).Minutes() > 1 {
// if lag behind, log every 1 mins about
log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
}
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))
continue
}
staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isSegmentHealthy(info) &&
info.GetInsertChannel() == ch &&
!info.lastFlushTime.IsZero() &&
time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
})
if len(segments)+len(staleSegments) == 0 {
if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
log.Error("failed to handle timetick message", zap.Error(err))
continue
}
log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
for _, id := range segments {
sInfo := s.meta.GetSegment(id)
if sInfo == nil {
log.Error("get segment from meta error", zap.Int64("id", id),
zap.Error(err))
continue
}
segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
s.meta.SetLastFlushTime(id, time.Now())
}
markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
for _, segment := range staleSegments {
for _, fSeg := range segmentInfos {
// check segment needs flush first
if segment.GetID() == fSeg.GetID() {
continue
}
}
markSegments = append(markSegments, segment.SegmentInfo)
s.meta.SetLastFlushTime(segment.GetID(), time.Now())
}
if len(segmentInfos)+len(markSegments) > 0 {
s.cluster.Flush(s.ctx, segmentInfos, markSegments)
}
}
s.helper.eventAfterHandleDataNodeTt()
}
}()
}
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
ch := ttMsg.GetChannelName()
ts := ttMsg.GetTimestamp()
physical, _ := tsoutil.ParseTS(ts)
if time.Since(physical).Minutes() > 1 {
// if lag behind, log every 1 mins about
log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
}
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())
if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
return fmt.Errorf("expire allocations: %w", err)
}
flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
return fmt.Errorf("get flushable segments: %w", err)
}
flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)
staleSegments := s.getStaleSegmentsInfo(ch)
staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)
if len(flushableSegments)+len(staleSegments) == 0 {
return nil
}
log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))
s.setLastFlushTime(flushableSegments)
s.setLastFlushTime(staleSegments)
finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
for _, info := range flushableSegments {
finfo = append(finfo, info.SegmentInfo)
}
for _, info := range staleSegments {
minfo = append(minfo, info.SegmentInfo)
}
s.cluster.Flush(s.ctx, finfo, minfo)
return nil
}
func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
for _, stat := range stats {
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
}
func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
res := make([]*SegmentInfo, 0, len(flushableIDs))
for _, id := range flushableIDs {
sinfo := s.meta.GetSegment(id)
if sinfo == nil {
log.Error("get segment from meta error", zap.Int64("id", id))
continue
}
res = append(res, sinfo)
}
return res
}
func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
return s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isSegmentHealthy(info) &&
info.GetInsertChannel() == ch &&
!info.lastFlushTime.IsZero() &&
time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
})
}
func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
filter := map[int64]struct{}{}
for _, sid := range flushableIDs {
filter[sid] = struct{}{}
}
res := make([]*SegmentInfo, 0, len(staleSegments))
for _, sinfo := range staleSegments {
if _, ok := filter[sinfo.GetID()]; ok {
continue
}
res = append(res, sinfo)
}
return res
}
func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
for _, sinfo := range segments {
s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
}
}
// start a goroutine wto watch services
func (s *Server) startWatchService(ctx context.Context) {
go s.watchService(ctx)
......
......@@ -194,6 +194,8 @@ func TestFlush(t *testing.T) {
resp, err := svr.Flush(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
svr.meta.SetCurrentRows(segID, 1)
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
......@@ -250,15 +252,6 @@ func TestGetTimeTickChannel(t *testing.T) {
assert.EqualValues(t, Params.TimeTickChannelName, resp.Value)
}
func TestGetStatisticsChannel(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
resp, err := svr.GetStatisticsChannel(context.TODO())
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, Params.StatisticsChannelName, resp.Value)
}
func TestGetSegmentStates(t *testing.T) {
t.Run("normal cases", func(t *testing.T) {
svr := newTestServer(t, nil)
......@@ -720,55 +713,6 @@ func TestServer_getSystemInfoMetrics(t *testing.T) {
}
}
func TestChannel(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
t.Run("Test StatsChannel", func(t *testing.T) {
const segID = 0
const rowNum = int64(100)
segInfo := &datapb.SegmentInfo{
ID: segID,
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
stats := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
NumRows: rowNum,
}
genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.SegmentStatisticsMsg {
return &msgstream.SegmentStatisticsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentStatistics: internalpb.SegmentStatistics{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
SegStats: []*internalpb.SegmentStatisticsUpdates{stats},
},
}
}
statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
statsStream.AsProducer([]string{Params.StatisticsChannelName})
statsStream.Start()
defer statsStream.Close()
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 123))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345))
err = statsStream.Produce(&msgPack)
assert.Nil(t, err)
})
}
type spySegmentManager struct {
spyCh chan struct{}
}
......@@ -1143,6 +1087,10 @@ func TestDataNodeTtChannel(t *testing.T) {
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime)
msg.SegmentsStats = append(msg.SegmentsStats, &datapb.SegmentStats{
SegmentID: assign.GetSegID(),
NumRows: 1,
})
msgPack.Msgs = append(msgPack.Msgs, msg)
err = ttMsgStream.Produce(&msgPack)
assert.Nil(t, err)
......@@ -1217,6 +1165,10 @@ func TestDataNodeTtChannel(t *testing.T) {
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime)
msg.SegmentsStats = append(msg.SegmentsStats, &datapb.SegmentStats{
SegmentID: assign.GetSegID(),
NumRows: 1,
})
msgPack.Msgs = append(msgPack.Msgs, msg)
err = ttMsgStream.Produce(&msgPack)
assert.Nil(t, err)
......@@ -2250,7 +2202,6 @@ func TestGetFlushState(t *testing.T) {
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int())
Params.StatisticsChannelName = Params.StatisticsChannelName + strconv.Itoa(rand.Int())
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
......
......@@ -55,9 +55,9 @@ func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRespon
func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "no statistics channel",
},
Value: Params.StatisticsChannelName,
}, nil
}
......
......@@ -204,7 +204,6 @@ func (node *DataNode) Register() error {
// Init function does nothing now.
func (node *DataNode) Init() error {
log.Debug("DataNode Init",
zap.String("SegmentStatisticsChannelName", Params.SegmentStatisticsChannelName),
zap.String("TimeTickChannelName", Params.TimeTickChannelName),
)
......
......@@ -55,7 +55,6 @@ func TestMain(t *testing.M) {
Params.Init()
// change to specific channel for test
Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int())
Params.SegmentStatisticsChannelName = Params.SegmentStatisticsChannelName + strconv.Itoa(rand.Int())
code := t.Run()
os.Exit(code)
}
......
......@@ -63,10 +63,9 @@ type insertBufferNode struct {
flushingSegCache *Cache
flushManager flushManager
timeTickStream msgstream.MsgStream
segmentStatisticsStream msgstream.MsgStream
ttLogger timeTickLogger
ttMerger *mergedTimeTickerSender
timeTickStream msgstream.MsgStream
ttLogger timeTickLogger
ttMerger *mergedTimeTickerSender
}
type timeTickLogger struct {
......@@ -158,10 +157,6 @@ func (ibNode *insertBufferNode) Close() {
if ibNode.timeTickStream != nil {
ibNode.timeTickStream.Close()
}
if ibNode.segmentStatisticsStream != nil {
ibNode.segmentStatisticsStream.Close()
}
}
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
......@@ -211,14 +206,6 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
return []Msg{}
}
if len(seg2Upload) > 0 {
log.Debug("flowgraph insert buffer node consumed msgs with end position", zap.String("channel", ibNode.channelName), zap.Any("end position", endPositions[0]))
err := ibNode.uploadMemStates2Coord(seg2Upload)
if err != nil {
log.Error("upload segment statistics to coord error", zap.Error(err))
}
}
// insert messages -> buffer
for _, msg := range fgMsg.insertMessages {
err := ibNode.bufferInsertMsg(msg, endPositions[0])
......@@ -357,7 +344,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
}
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax); err != nil {
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload); err != nil {
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
}
......@@ -709,58 +696,11 @@ func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataTy
}
// writeHardTimeTick writes timetick once insertBufferNode operates.
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp, segmentIDs []int64) error {
ibNode.ttLogger.LogTs(ts)
ibNode.ttMerger.bufferTs(ts)
ibNode.ttMerger.bufferTs(ts, segmentIDs)
return nil
}
// uploadMemStates2Coord uploads latest changed segments statistics in DataNode memory to DataCoord
// through a msgStream channel.
//
// Currently, the statistics includes segment ID and its total number of rows in memory.
func (ibNode *insertBufferNode) uploadMemStates2Coord(segIDs []UniqueID) error {
statsUpdates := make([]*internalpb.SegmentStatisticsUpdates, 0, len(segIDs))
for _, segID := range segIDs {
updates, err := ibNode.replica.getSegmentStatisticsUpdates(segID)
if err != nil {
log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
continue
}
log.Debug("Segment Statistics to Update",
zap.Int64("segment ID", updates.GetSegmentID()),
zap.Int64("collection ID", ibNode.replica.getCollectionID()),
zap.String("vchannel name", ibNode.channelName),
zap.Int64("numOfRows", updates.GetNumRows()),
)
statsUpdates = append(statsUpdates, updates)
}
segStats := internalpb.SegmentStatistics{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentStatistics,
MsgID: UniqueID(0), // GOOSE TODO
Timestamp: Timestamp(0), // GOOSE TODO
SourceID: Params.NodeID,
},
SegStats: statsUpdates,
}
var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0}, // GOOSE TODO
},
SegmentStatistics: segStats,
}
var msgPack = msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
return ibNode.segmentStatisticsStream.Produce(&msgPack)
}
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
......@@ -782,17 +722,16 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start()
// update statistics channel
segS, err := config.msFactory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
log.Debug("datanode AsProducer", zap.String("SegmentStatisChannelName", Params.SegmentStatisticsChannelName))
var segStatisticsMsgStream msgstream.MsgStream = segS
segStatisticsMsgStream.Start()
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
mt := newMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error {
stats := make([]*datapb.SegmentStats, 0, len(segmentIDs))
for _, sid := range segmentIDs {
stat, err := config.replica.getSegmentStatisticsUpdates(sid)
if err != nil {
log.Warn("failed to get segment statistics info", zap.Int64("segmentID", sid), zap.Error(err))
continue
}
stats = append(stats, stat)
}
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.DataNodeTtMsg{
BaseMsg: msgstream.BaseMsg{
......@@ -806,8 +745,9 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
MsgID: 0,
Timestamp: ts,
},
ChannelName: config.vChannelName,
Timestamp: ts,
ChannelName: config.vChannelName,
Timestamp: ts,
SegmentsStats: stats,
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
......@@ -818,9 +758,7 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
BaseNode: baseNode,
insertBuffer: sync.Map{},
timeTickStream: wTtMsgStream,
segmentStatisticsStream: segStatisticsMsgStream,
timeTickStream: wTtMsgStream,
flushMap: sync.Map{},
flushChan: flushCh,
flushingSegCache: flushingSegCache,
......
......@@ -110,13 +110,6 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
_, err = newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
assert.Error(t, err)
c.msFactory = &CDFMsFactory{
Factory: msFactory,
cd: 1,
}
_, err = newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
assert.Error(t, err)
}
type mockMsg struct{}
......@@ -142,7 +135,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
for _, test := range invalidInTests {
te.Run(test.description, func(t0 *testing.T) {
ibn := &insertBufferNode{
ttMerger: newMergedTimeTickerSender(func(Timestamp) error { return nil }),
ttMerger: newMergedTimeTickerSender(func(Timestamp, []int64) error { return nil }),
}
rt := ibn.Operate(test.in)
assert.Empty(t0, rt)
......
......@@ -19,22 +19,22 @@ package datanode
import (
"sync"
"time"
"go.uber.org/atomic"
)
type sendTimeTick func(Timestamp) error
type sendTimeTick func(Timestamp, []int64) error
// mergedTimeTickerSender reduces time ticker sending rate when datanode is doing `fast-forwarding`
// it makes sure time ticker send at most 10 times a second (1tick/100millisecond)
// and the last time tick is always sent
type mergedTimeTickerSender struct {
ts atomic.Uint64 // current ts value
cond *sync.Cond // condition to send timeticker
send sendTimeTick // actual sender logic
ts uint64
segmentIDs map[int64]struct{}
lastSent time.Time
mu sync.Mutex
cond *sync.Cond // condition to send timeticker
send sendTimeTick // actual sender logic
lastSent time.Time
lastMut sync.RWMutex
wg sync.WaitGroup
closeCh chan struct{}
closeOnce sync.Once
......@@ -42,12 +42,12 @@ type mergedTimeTickerSender struct {
func newMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
mt := &mergedTimeTickerSender{
cond: sync.NewCond(&sync.Mutex{}),
send: send,
closeCh: make(chan struct{}),
ts: 0, // 0 for not tt send
segmentIDs: make(map[int64]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
send: send,
closeCh: make(chan struct{}),
}
mt.ts.Store(0) // 0 for not tt send
mt.wg.Add(2)
go mt.tick()
go mt.work()
......@@ -55,10 +55,13 @@ func newMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
return mt
}
func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp) {
mt.ts.Store(ts)
mt.lastMut.RLock()
defer mt.lastMut.RUnlock()
func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.ts = ts
for _, sid := range segmentIDs {
mt.segmentIDs[sid] = struct{}{}
}
if !mt.lastSent.IsZero() && time.Since(mt.lastSent) > time.Millisecond*100 {
mt.cond.L.Lock()
......@@ -94,7 +97,7 @@ func (mt *mergedTimeTickerSender) isClosed() bool {
func (mt *mergedTimeTickerSender) work() {
defer mt.wg.Done()
ts, lastTs := uint64(0), uint64(0)
lastTs := uint64(0)
for {
mt.cond.L.Lock()
if mt.isClosed() {
......@@ -102,15 +105,21 @@ func (mt *mergedTimeTickerSender) work() {
return
}
mt.cond.Wait()
ts = mt.ts.Load()
mt.cond.L.Unlock()
if ts != lastTs {
mt.send(ts)
lastTs = ts
mt.lastMut.Lock()
mt.mu.Lock()
if mt.ts != lastTs {
var sids []int64
for sid := range mt.segmentIDs {
sids = append(sids, sid)
}
mt.segmentIDs = make(map[int64]struct{})
lastTs = mt.ts
mt.lastSent = time.Now()
mt.lastMut.Unlock()
mt.send(mt.ts, sids)
}
mt.mu.Unlock()
}
}
......
......@@ -12,7 +12,7 @@ func TestMergedTimeTicker(t *testing.T) {
var ticks []uint64
var mut sync.Mutex
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
mt := newMergedTimeTickerSender(func(ts Timestamp, _ []int64) error {
mut.Lock()
defer mut.Unlock()
ticks = append(ticks, ts)
......@@ -21,7 +21,7 @@ func TestMergedTimeTicker(t *testing.T) {
for i := 1; i < 100; i++ {
time.Sleep(time.Millisecond * 10)
mt.bufferTs(uint64(i))
mt.bufferTs(uint64(i), nil)
}
mt.close()
mut.Lock()
......@@ -35,7 +35,7 @@ func TestMergedTimeTicker_close10000(t *testing.T) {
batchSize := 10000
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
mt := newMergedTimeTickerSender(func(ts Timestamp, _ []int64) error {
return nil
})
go func(mt *mergedTimeTickerSender) {
......
......@@ -59,9 +59,6 @@ type ParamTable struct {
// Cluster channels
ClusterChannelPrefix string
// Segment statistics channel
SegmentStatisticsChannelName string
// Timetick channel
TimeTickChannelName string
......@@ -116,7 +113,6 @@ func (p *ParamTable) Init() {
// Must init global msgchannel prefix before other channel names
p.initClusterMsgChannelPrefix()
p.initSegmentStatisticsChannelName()
p.initTimeTickChannelName()
p.initEtcdEndpoints()
......@@ -196,15 +192,6 @@ func (p *ParamTable) initClusterMsgChannelPrefix() {
p.ClusterChannelPrefix = name
}
func (p *ParamTable) initSegmentStatisticsChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
if err != nil {
panic(err)
}
s := []string{p.ClusterChannelPrefix, config}
p.SegmentStatisticsChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initTimeTickChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
if err != nil {
......
......@@ -71,12 +71,6 @@ func TestParamTable(t *testing.T) {
log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix)
})
t.Run("Test SegmentStatisticsChannelName", func(t *testing.T) {
path := Params.SegmentStatisticsChannelName
assert.Equal(t, path, "by-dev-datacoord-statistics-channel")
log.Println("SegmentStatisticsChannelName:", path)
})
t.Run("Test TimeTickChannelName", func(t *testing.T) {
name := Params.TimeTickChannelName
assert.Equal(t, name, "by-dev-datacoord-timetick-channel")
......
......@@ -67,7 +67,7 @@ type Replica interface {
updateStatistics(segID UniqueID, numRows int64)
refreshFlushedSegStatistics(segID UniqueID, numRows int64)
getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error)
segmentFlushed(segID UniqueID)
}
......@@ -580,12 +580,10 @@ func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) {
}
// getSegmentStatisticsUpdates gives current segment's statistics updates.
func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*datapb.SegmentStats, error) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
}
updates := &datapb.SegmentStats{SegmentID: segID}
if seg, ok := replica.newSegments[segID]; ok {
updates.NumRows = seg.numRows
......
......@@ -609,69 +609,6 @@ func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
return queryNodeSegStatsMsg, nil
}
/////////////////////////////////////////SegmentStatisticsMsg//////////////////////////////////////////
// SegmentStatisticsMsg is a message pack that contains segment statistic
type SegmentStatisticsMsg struct {
BaseMsg
internalpb.SegmentStatistics
}
// interface implementation validation
var _ TsMsg = &SegmentStatisticsMsg{}
// TraceCtx returns the context of opentracing
func (ss *SegmentStatisticsMsg) TraceCtx() context.Context {
return ss.BaseMsg.Ctx
}
// SetTraceCtx is used to set context for opentracing
func (ss *SegmentStatisticsMsg) SetTraceCtx(ctx context.Context) {
ss.BaseMsg.Ctx = ctx
}
// ID returns the ID of this message pack
func (ss *SegmentStatisticsMsg) ID() UniqueID {
return ss.Base.MsgID
}
// Type returns the type of this message pack
func (ss *SegmentStatisticsMsg) Type() MsgType {
return ss.Base.MsgType
}
// SourceID indicated which component generated this message
func (ss *SegmentStatisticsMsg) SourceID() int64 {
return ss.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) {
segStatsTask := input.(*SegmentStatisticsMsg)
segStats := &segStatsTask.SegmentStatistics
mb, err := proto.Marshal(segStats)
if err != nil {
return nil, err
}
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
segStats := internalpb.SegmentStatistics{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, &segStats)
if err != nil {
return nil, err
}
segStatsMsg := &SegmentStatisticsMsg{SegmentStatistics: segStats}
return segStatsMsg, nil
}
/////////////////////////////////////////CreateCollection//////////////////////////////////////////
// CreateCollectionMsg is a message pack that contains create collection request
......
......@@ -510,50 +510,6 @@ func TestQueryNodeStatsMsg_Unmarshal_IllegalParameter(t *testing.T) {
assert.Nil(t, tsMsg)
}
func TestSegmentStatisticsMsg(t *testing.T) {
segmentStatisticsMsg := &SegmentStatisticsMsg{
BaseMsg: generateBaseMsg(),
SegmentStatistics: internalpb.SegmentStatistics{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentStatistics,
MsgID: 1,
Timestamp: 2,
SourceID: 3,
},
SegStats: []*internalpb.SegmentStatisticsUpdates{},
},
}
assert.NotNil(t, segmentStatisticsMsg.TraceCtx())
ctx := context.Background()
segmentStatisticsMsg.SetTraceCtx(ctx)
assert.Equal(t, ctx, segmentStatisticsMsg.TraceCtx())
assert.Equal(t, int64(1), segmentStatisticsMsg.ID())
assert.Equal(t, commonpb.MsgType_SegmentStatistics, segmentStatisticsMsg.Type())
assert.Equal(t, int64(3), segmentStatisticsMsg.SourceID())
bytes, err := segmentStatisticsMsg.Marshal(segmentStatisticsMsg)
assert.Nil(t, err)
tsMsg, err := segmentStatisticsMsg.Unmarshal(bytes)
assert.Nil(t, err)
segmentStatisticsMsg2, ok := tsMsg.(*SegmentStatisticsMsg)
assert.True(t, ok)
assert.Equal(t, int64(1), segmentStatisticsMsg2.ID())
assert.Equal(t, commonpb.MsgType_SegmentStatistics, segmentStatisticsMsg2.Type())
assert.Equal(t, int64(3), segmentStatisticsMsg2.SourceID())
}
func TestSegmentStatisticsMsg_Unmarshal_IllegalParameter(t *testing.T) {
segmentStatisticsMsg := &SegmentStatisticsMsg{}
tsMsg, err := segmentStatisticsMsg.Unmarshal(10)
assert.NotNil(t, err)
assert.Nil(t, tsMsg)
}
func TestCreateCollectionMsg(t *testing.T) {
createCollectionMsg := &CreateCollectionMsg{
BaseMsg: generateBaseMsg(),
......
......@@ -66,7 +66,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
createPartitionMsg := CreatePartitionMsg{}
dropPartitionMsg := DropPartitionMsg{}
queryNodeSegStatsMsg := QueryNodeStatsMsg{}
segmentStatisticsMsg := SegmentStatisticsMsg{}
dataNodeTtMsg := DataNodeTtMsg{}
sealedSegmentsChangeInfoMsg := SealedSegmentsChangeInfoMsg{}
......@@ -84,7 +83,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_DropCollection] = dropCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreatePartition] = createPartitionMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropPartition] = dropPartitionMsg.Unmarshal
p.TempMap[commonpb.MsgType_SegmentStatistics] = segmentStatisticsMsg.Unmarshal
p.TempMap[commonpb.MsgType_DataNodeTt] = dataNodeTtMsg.Unmarshal
p.TempMap[commonpb.MsgType_SealedSegmentsChangeInfo] = sealedSegmentsChangeInfoMsg.Unmarshal
......
......@@ -253,6 +253,12 @@ message DataNodeTtMsg {
common.MsgBase base =1;
string channel_name = 2;
uint64 timestamp = 3;
repeated SegmentStats segments_stats = 4;
}
message SegmentStats {
int64 SegmentID = 1;
int64 NumRows = 2;
}
enum ChannelWatchState {
......
......@@ -222,21 +222,6 @@ message LoadIndex {
repeated common.KeyValuePair index_params = 6;
}
message SegmentStatisticsUpdates {
int64 SegmentID = 1;
int64 MemorySize = 2;
int64 NumRows = 3;
uint64 create_time = 4;
uint64 end_time = 5;
internal.MsgPosition start_position = 6;
internal.MsgPosition end_position = 7;
}
message SegmentStatistics {
common.MsgBase base = 1;
repeated SegmentStatisticsUpdates SegStats = 2;
}
message IndexStats {
repeated common.KeyValuePair index_params = 1;
int64 num_related_segments = 2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册