未验证 提交 a0b98dac 编写于 作者: X Xiaofan 提交者: GitHub

Skip reconsume tt msgstream when datacoord restart (#9684)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 997e88d8
......@@ -393,8 +393,7 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
}
// GetFlushableSegments get segment ids with Sealed State and flushable (meets flushPolicy)
func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string,
t Timestamp) ([]UniqueID, error) {
func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string, t Timestamp) ([]UniqueID, error) {
s.mu.Lock()
defer s.mu.Unlock()
sp, _ := trace.StartSpanFromContext(ctx)
......
......@@ -20,12 +20,13 @@ import (
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
......@@ -360,8 +361,8 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
log.Error("new msg stream failed", zap.Error(err))
return
}
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
Params.DataCoordSubscriptionName)
ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
log.Debug("dataCoord create time tick channel consumer",
zap.String("timeTickChannelName", Params.TimeTickChannelName),
zap.String("subscriptionName", Params.DataCoordSubscriptionName))
......@@ -403,6 +404,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
log.Warn("failed to expire allocations", zap.Error(err))
continue
}
physical, _ := tsoutil.ParseTS(ts)
if time.Since(physical).Minutes() > 1 {
// if lag behind, log every 1 mins about
log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
}
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))
......
......@@ -18,6 +18,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/stretchr/testify/assert"
)
......@@ -60,8 +61,10 @@ func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
return make(chan *msgstream.MsgPack, 100)
}
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsProducer(channels []string) {}
func (mtm *mockTtMsgStream) AsConsumer(channels []string, subName string) {}
func (mtm *mockTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
}
func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
func (mtm *mockTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {
return make([][]int32, 0)
......
......@@ -59,6 +59,39 @@ func Fatal(msg string, fields ...zap.Field) {
L().WithOptions(zap.AddCallerSkip(1)).Fatal(msg, fields...)
}
// RatedDebug print logs at debug level
// it limit log print to avoid too many logs
// return true if log successfully
func RatedDebug(cost float64, msg string, fields ...zap.Field) bool {
if R().CheckCredit(cost) {
L().WithOptions(zap.AddCallerSkip(1)).Debug(msg, fields...)
return true
}
return false
}
// RatedInfo print logs at debug level
// it limit log print to avoid too many logs
// return true if log successfully
func RatedInfo(cost float64, msg string, fields ...zap.Field) bool {
if R().CheckCredit(cost) {
L().WithOptions(zap.AddCallerSkip(1)).Info(msg, fields...)
return true
}
return false
}
// RatedWarn print logs at debug level
// it limit log print to avoid too many logs
// return true if log successfully
func RatedWarn(cost float64, msg string, fields ...zap.Field) bool {
if R().CheckCredit(cost) {
L().WithOptions(zap.AddCallerSkip(1)).Warn(msg, fields...)
return true
}
return false
}
// With creates a child logger and adds structured context to it.
// Fields added to the child don't affect the parent, and vice versa.
func With(fields ...zap.Field) *zap.Logger {
......
......@@ -31,21 +31,25 @@ import (
"errors"
"github.com/uber/jaeger-client-go/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
lumberjack "gopkg.in/natefinch/lumberjack.v2"
)
var _globalL, _globalP, _globalS atomic.Value
var _globalL, _globalP, _globalS, _globalR atomic.Value
var rateLimiter *utils.ReconfigurableRateLimiter
func init() {
l, p := newStdLogger()
_globalL.Store(l)
_globalP.Store(p)
s := _globalL.Load().(*zap.Logger).Sugar()
_globalS.Store(s)
r := utils.NewRateLimiter(1.0, 60.0)
_globalR.Store(r)
}
// InitLogger initializes a zap logger.
......@@ -136,6 +140,10 @@ func S() *zap.SugaredLogger {
return _globalS.Load().(*zap.SugaredLogger)
}
func R() *utils.ReconfigurableRateLimiter {
return _globalR.Load().(*utils.ReconfigurableRateLimiter)
}
// ReplaceGlobals replaces the global Logger and SugaredLogger.
// It's safe for concurrent use.
func ReplaceGlobals(logger *zap.Logger, props *ZapProperties) {
......
......@@ -31,6 +31,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
......@@ -128,3 +129,42 @@ func TestSampling(t *testing.T) {
}
}
}
func TestRatedLog(t *testing.T) {
ts := newTestLogSpy(t)
conf := &Config{Level: "debug", DisableTimestamp: true}
logger, p, _ := InitTestLogger(ts, conf)
ReplaceGlobals(logger, p)
time.Sleep(time.Duration(1) * time.Second)
success := RatedDebug(1.0, "test")
assert.True(t, success)
time.Sleep(time.Duration(1) * time.Second)
success = RatedInfo(1.0, "test")
assert.True(t, success)
time.Sleep(time.Duration(1) * time.Second)
success = RatedWarn(1.0, "test")
assert.True(t, success)
time.Sleep(time.Duration(1) * time.Second)
success = RatedInfo(100.0, "test")
assert.False(t, success)
successNum := 0
for i := 0; i < 1000; i++ {
if RatedInfo(1.0, "test") {
successNum++
}
time.Sleep(time.Duration(1) * time.Millisecond)
}
// due to the rate limit, not all
assert.True(t, successNum < 1000)
assert.True(t, successNum > 10)
time.Sleep(time.Duration(3) * time.Second)
success = RatedInfo(3.0, "test")
assert.True(t, success)
Sync()
}
......@@ -116,6 +116,11 @@ func (ms *mqMsgStream) AsProducer(channels []string) {
// Create consumer to receive message from channels
func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
ms.AsConsumerWithPosition(channels, subName, mqclient.SubscriptionPositionEarliest)
}
// Create consumer to receive message from channels, with initial position
func (ms *mqMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
for _, channel := range channels {
if _, ok := ms.consumers[channel]; ok {
continue
......@@ -126,7 +131,7 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) {
Topic: channel,
SubscriptionName: subName,
Type: mqclient.KeyShared,
SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
SubscriptionInitialPosition: position,
MessageChannel: receiveChannel,
})
if err != nil {
......@@ -597,6 +602,10 @@ func (ms *MqTtMsgStream) addConsumer(consumer mqclient.Consumer, channel string)
// AsConsumer subscribes channels as consumer for a MsgStream
func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
ms.AsConsumerWithPosition(channels, subName, mqclient.SubscriptionPositionEarliest)
}
func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
for _, channel := range channels {
if _, ok := ms.consumers[channel]; ok {
continue
......@@ -607,7 +616,7 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
Topic: channel,
SubscriptionName: subName,
Type: mqclient.KeyShared,
SubscriptionInitialPosition: mqclient.SubscriptionPositionEarliest,
SubscriptionInitialPosition: position,
MessageChannel: receiveChannel,
})
if err != nil {
......
......@@ -53,6 +53,7 @@ type MsgStream interface {
Chan() <-chan *MsgPack
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition)
SetRepackFunc(repackFunc RepackFunc)
ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32
GetProduceChannels() []string
......
......@@ -17,17 +17,13 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
)
type mockTimestampAllocatorInterface struct {
......@@ -276,6 +272,9 @@ func (ms *simpleMockMsgStream) AsProducer(channels []string) {
func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) {
}
func (ms *simpleMockMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
}
func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 {
if len(tsMsgs) <= 0 {
return nil
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册