diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index b8ad13567429a81f02efce5d92466a41f5a0a0cf..1d95b503883ba85a7e7a5a498b86e6a6e7dc03b8 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -128,7 +128,9 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact lock: sync.Mutex{}, sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg), - sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16), + + // 1 is the most reasonable capacity. In fact, Milvus can only focus on the latest time tick. + sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 1), syncedTtHistogram: newTtHistogram(), } @@ -169,7 +171,17 @@ func (t *timetickSync) sendToChannel() bool { ptt[k] = v t.sess2ChanTsMap[k] = nil } - t.sendChan <- ptt + + select { + case t.sendChan <- ptt: + default: + // The consumer of `sendChan` haven't completed its operation. If we send the `ptt` here, the consumer will + // always get an older time tick. The older time tick in `sendChan` will block newer time tick in next window. + // However, in fact the consumer can only focus on the newest. + + // TODO: maybe a metric should be here. + } + return true }