diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 2c62a30948bcc6af5bf910d402b9938050f013b4..a280cda2a4eee95890b9e6d38c8ed25c84efda98 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -720,14 +720,11 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, len(core.chanTimeTick.proxyTimeTick), 2) pt, ok := core.chanTimeTick.proxyTimeTick[core.session.ServerID] assert.True(t, ok) - assert.Equal(t, shardsNum, int32(len(pt.in.ChannelNames))) - assert.Equal(t, shardsNum, int32(len(pt.in.Timestamps))) - assert.Equal(t, shardsNum, int32(len(pt.timeTick))) - assert.ElementsMatch(t, pt.in.ChannelNames, createMeta.PhysicalChannelNames) - assert.Equal(t, pt.in.Timestamps[0], pt.in.Timestamps[1]) - assert.Equal(t, pt.in.Timestamps[0], pt.in.DefaultTimestamp) - assert.Equal(t, pt.timeTick[pt.in.ChannelNames[0]], pt.in.DefaultTimestamp) - assert.Equal(t, pt.timeTick[pt.in.ChannelNames[1]], pt.in.DefaultTimestamp) + assert.Equal(t, shardsNum, int32(len(pt.chanTs))) + for chanName, ts := range pt.chanTs { + assert.Contains(t, createMeta.PhysicalChannelNames, chanName) + assert.Equal(t, pt.defaultTs, ts) + } core.chanTimeTick.lock.Unlock() // check DD operation info diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 2edaf27f1eed683ff06bd0a03b25cefdbea41b60..c6cd25214947161b2060cba944841e2c81dcc363 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -37,8 +37,8 @@ type timetickSync struct { deltaChannels *dmlChannels // used for delete lock sync.Mutex - proxyTimeTick map[typeutil.UniqueID]*channelTimeTickMsg - sendChan chan map[typeutil.UniqueID]*channelTimeTickMsg + proxyTimeTick map[typeutil.UniqueID]*chanTsMsg + sendChan chan map[typeutil.UniqueID]*chanTsMsg // record ddl timetick info ddlLock sync.RWMutex @@ -46,28 +46,29 @@ type timetickSync struct { ddlTsSet map[typeutil.Timestamp]struct{} } -type channelTimeTickMsg struct { - in *internalpb.ChannelTimeTickMsg - timeTick map[string]typeutil.Timestamp +type chanTsMsg struct { + chanTs map[string]typeutil.Timestamp + defaultTs typeutil.Timestamp + cnt int64 } -func newChannelTimeTickMsg(in *internalpb.ChannelTimeTickMsg) *channelTimeTickMsg { - msg := &channelTimeTickMsg{ - in: in, - timeTick: make(map[string]typeutil.Timestamp), +func newChanTsMsg(in *internalpb.ChannelTimeTickMsg, cnt int64) *chanTsMsg { + msg := &chanTsMsg{ + chanTs: make(map[string]typeutil.Timestamp), + defaultTs: in.DefaultTimestamp, + cnt: cnt, } for idx := range in.ChannelNames { - msg.timeTick[in.ChannelNames[idx]] = in.Timestamps[idx] + msg.chanTs[in.ChannelNames[idx]] = in.Timestamps[idx] } return msg } -func (c *channelTimeTickMsg) getTimetick(channelName string) typeutil.Timestamp { - tt, ok := c.timeTick[channelName] - if ok { - return tt +func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { + if ts, ok := c.chanTs[channelName]; ok { + return ts } - return c.in.DefaultTimestamp + return c.defaultTs } func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { @@ -102,8 +103,8 @@ func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory deltaChannels: deltaChannels, lock: sync.Mutex{}, - proxyTimeTick: make(map[typeutil.UniqueID]*channelTimeTickMsg), - sendChan: make(chan map[typeutil.UniqueID]*channelTimeTickMsg, 16), + proxyTimeTick: make(map[typeutil.UniqueID]*chanTsMsg), + sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16), ddlLock: sync.RWMutex{}, ddlMinTs: typeutil.Timestamp(math.MaxUint64), @@ -117,13 +118,31 @@ func (t *timetickSync) sendToChannel() { if len(t.proxyTimeTick) == 0 { return } - for _, v := range t.proxyTimeTick { + + // detect whether rootcoord receives ttMsg from all proxy nodes + maxCnt := int64(0) + idleProxyList := make([]typeutil.UniqueID, 0) + for id, v := range t.proxyTimeTick { if v == nil { - return + idleProxyList = append(idleProxyList, id) + } else { + if maxCnt < v.cnt { + maxCnt = v.cnt + } } } + + if len(idleProxyList) > 0 { + // give warning every 2 second if not get ttMsg from proxy nodes + if maxCnt%10 == 0 { + log.Warn("proxy idle for long time", zap.Any("proxy list", idleProxyList), + zap.Int64("idle time", int64(Params.TimeTickInterval)*maxCnt)) + } + return + } + // clear proxyTimeTick and send a clone - ptt := make(map[typeutil.UniqueID]*channelTimeTickMsg) + ptt := make(map[typeutil.UniqueID]*chanTsMsg) for k, v := range t.proxyTimeTick { ptt[k] = v t.proxyTimeTick[k] = nil @@ -204,24 +223,20 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason } if in.Base.SourceID == t.session.ServerID { - if prev != nil && in.DefaultTimestamp <= prev.in.DefaultTimestamp { + if prev != nil && in.DefaultTimestamp <= prev.defaultTs { log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), zap.Uint64("curr ts", in.DefaultTimestamp), - zap.Uint64("prev ts", prev.in.DefaultTimestamp), + zap.Uint64("prev ts", prev.defaultTs), zap.String("reason", reason)) return nil } } - if in.DefaultTimestamp == 0 { - mints := minTimeTick(in.Timestamps...) - log.Debug("default time stamp is zero, set it to the min value of inputs", - zap.Int64("proxy id", in.Base.SourceID), zap.Uint64("min ts", mints)) - in.DefaultTimestamp = mints - } - t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) - //log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), - // zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) + if prev == nil { + t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, 1) + } else { + t.proxyTimeTick[in.Base.SourceID] = newChanTsMsg(in, prev.cnt+1) + } t.sendToChannel() return nil @@ -268,36 +283,35 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { // reduce each channel to get min timestamp local := proxyTimetick[t.session.ServerID] - if len(local.in.ChannelNames) == 0 { + if len(local.chanTs) == 0 { continue } - hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames)) + hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTs)) tr := timerecord.NewTimeRecorder(hdr) wg := sync.WaitGroup{} - for _, chanName := range local.in.ChannelNames { + for chanName, ts := range local.chanTs { wg.Add(1) - go func(chanName string) { - mints := local.getTimetick(chanName) + go func(chanName string, ts typeutil.Timestamp) { + mints := ts for _, tt := range proxyTimetick { - ts := tt.getTimetick(chanName) - if ts < mints { - mints = ts + currTs := tt.getTimetick(chanName) + if currTs < mints { + mints = currTs } } if err := t.sendTimeTickToChannel([]string{chanName}, mints); err != nil { log.Debug("SendTimeTickToChannel fail", zap.Error(err)) } wg.Done() - }(chanName) + }(chanName, ts) } wg.Wait() span := tr.ElapseSpan() // rootcoord send tt msg to all channels every 200ms by default - if span.Milliseconds() > 200 { + if span.Milliseconds() > int64(Params.TimeTickInterval) { log.Warn("rootcoord send tt to all channels too slowly", - zap.Int("chanNum", len(local.in.ChannelNames)), - zap.Int64("span", span.Milliseconds())) + zap.Int("chanNum", len(local.chanTs)), zap.Int64("span", span.Milliseconds())) } } } diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index 0609cfc51dc2b4d14fefcca251199b8182ef473b..63ae0f8d9df13b987abf5628678792aee5e96cce 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -58,7 +58,7 @@ func TestTimetickSync(t *testing.T) { MsgType: commonpb.MsgType_TimeTick, }, } - ttSync.proxyTimeTick[1] = newChannelTimeTickMsg(msg) + ttSync.proxyTimeTick[1] = newChanTsMsg(msg, 1) ttSync.sendToChannel() }) @@ -87,7 +87,7 @@ func TestTimetickSync(t *testing.T) { msg.Timestamps = append(msg.Timestamps, uint64(2)) msg.DefaultTimestamp = uint64(200) - cttMsg := newChannelTimeTickMsg(msg) + cttMsg := newChanTsMsg(msg, 1) ttSync.proxyTimeTick[msg.Base.SourceID] = cttMsg ttSync.ddlMinTs = uint64(100)