未验证 提交 e7684376 编写于 作者: Z zhenshan.cao 提交者: GitHub

Correct usage of Timer and Ticker (#22228)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 36d41121
......@@ -54,6 +54,7 @@ func (r refresher) stop() {
func (r refresher) refreshPeriodically(name string) {
ticker := time.NewTicker(r.refreshInterval)
defer ticker.Stop()
log.Info("start refreshing configurations", zap.String("source", name))
for {
select {
......
......@@ -92,10 +92,10 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
}
stop := make(chan struct{})
ticker := time.NewTimer(timeout)
timer := time.NewTimer(timeout)
c.removeTimers([]string{channelName})
c.runningTimerStops.Store(channelName, stop)
c.runningTimers.Store(channelName, ticker)
c.runningTimers.Store(channelName, timer)
go func() {
log.Info("timer started",
......@@ -103,10 +103,10 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Duration("check interval", timeout))
defer ticker.Stop()
defer timer.Stop()
select {
case <-ticker.C:
case <-timer.C:
// check tickle at path as :tickle/[prefix]/{channel_name}
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("watch state", watchState.String()),
......
......@@ -248,12 +248,13 @@ func (c *ChannelManager) unwatchDroppedChannels() {
// NOT USED.
func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
timer := time.NewTicker(bgCheckInterval)
ticker := time.NewTicker(bgCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
case <-ticker.C:
c.mu.Lock()
channels := c.store.GetNodesChannels()
......
......@@ -122,16 +122,16 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta
func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
ticker := time.NewTicker(interval)
c.quit = make(chan struct{})
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.quit:
ticker.Stop()
log.Info("compaction handler quit")
return
case <-ticker.C:
......
......@@ -1474,12 +1474,14 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
// should be split into two plans
var plans []*datapb.CompactionPlan
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
WAIT:
for {
select {
case val := <-spy.spyChan:
plans = append(plans, val)
case <-time.After(3 * time.Second):
case <-ticker.C:
break WAIT
}
}
......
......@@ -94,10 +94,11 @@ func (gc *garbageCollector) start() {
// work contains actual looping check logic
func (gc *garbageCollector) work() {
defer gc.wg.Done()
ticker := time.Tick(gc.option.checkInterval)
ticker := time.NewTicker(gc.option.checkInterval)
defer ticker.Stop()
for {
select {
case <-ticker:
case <-ticker.C:
gc.clearEtcd()
gc.recycleUnusedIndexes()
gc.recycleUnusedSegIndexes()
......
......@@ -75,7 +75,6 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
g.Go(func() error {
for err != nil {
select {
case <-gCtx.Done():
log.Warn("ctx done when downloading kvs from blob storage")
return errDownloadFromBlobStorage
......@@ -83,7 +82,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
default:
if err != errStart {
log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths))
<-time.After(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
}
vs, err = b.MultiRead(ctx, paths)
}
......@@ -122,7 +121,7 @@ func (b *binlogIO) uploadSegmentFiles(
log.Warn("save binlog failed, retry in 50ms",
zap.Int64("collectionID", CollectionID),
zap.Int64("segmentID", segID))
<-time.After(50 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
}
err = b.MultiWrite(ctx, kvs)
}
......
......@@ -122,8 +122,9 @@ func (t *tickler) watch() {
}
t.closeWg.Add(1)
ticker := time.NewTicker(t.interval)
go func() {
ticker := time.NewTicker(t.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
......
......@@ -68,7 +68,7 @@ func TestChannelEventManager(t *testing.T) {
select {
case <-ch:
case <-time.NewTimer(time.Second).C:
case <-time.After(time.Second):
t.FailNow()
}
close(em.eventChan)
......
......@@ -344,9 +344,8 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fm.startDropping()
delNode.Operate([]flowgraph.Msg{&msg})
})
timer := time.NewTimer(time.Millisecond)
select {
case <-timer.C:
case <-time.After(time.Millisecond):
t.FailNow()
case <-sig:
}
......
......@@ -44,14 +44,13 @@ func TestMergedTimeTicker_close10000(t *testing.T) {
mt.close()
}(mt)
}
tm := time.NewTimer(10 * time.Second)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-tm.C:
case <-time.After(10 * time.Second):
t.Fatal("wait all timer close, timeout")
case <-done:
}
......
......@@ -378,9 +378,8 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
})
}
timeout := time.NewTimer(time.Second)
select {
case <-timeout.C:
case <-time.After(time.Second):
t.FailNow()
case <-signal:
}
......
......@@ -882,7 +882,6 @@ func withCredential(clientPemPath, clientKeyPath, clientCaPath string) (credenti
// waitForGrpcReady block until service available or panic after times out.
func waitForGrpcReady(opt *WaitOption) {
Params := &paramtable.Get().ProxyGrpcServerCfg
ticker := time.NewTicker(opt.Duration)
ch := make(chan error, 1)
go func() {
......@@ -910,6 +909,8 @@ func waitForGrpcReady(opt *WaitOption) {
}
}()
timer := time.NewTimer(opt.Duration)
select {
case err := <-ch:
if err != nil {
......@@ -918,7 +919,7 @@ func waitForGrpcReady(opt *WaitOption) {
zap.Any("option", opt))
panic(err)
}
case <-ticker.C:
case <-timer.C:
log.Error("grpc service not ready",
zap.Any("option", opt))
panic("grpc service not ready")
......
......@@ -115,12 +115,13 @@ func TestIndexNodeSimple(t *testing.T) {
ClusterID: clusterID,
BuildIDs: []int64{buildID},
}
timeout := time.After(time.Second * 10)
var idxInfo *indexpb.IndexTaskInfo
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Loop:
for {
select {
case <-timeout:
case <-timeoutCtx.Done():
t.Fatal("timeout for querying jobs")
default:
time.Sleep(1 * time.Millisecond)
......@@ -302,11 +303,12 @@ func TestIndexNodeComplex(t *testing.T) {
}(i)
}
testwg.Wait()
timeout := time.After(time.Second * 30)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
Loop:
for {
select {
case <-timeout:
case <-timeoutCtx.Done():
t.Fatal("timeout testing")
default:
jobNumRet, err := in.GetJobStats(ctx, &indexpb.GetJobStatsRequest{})
......
package indexnode
import (
"context"
"time"
"github.com/golang/protobuf/proto"
......@@ -110,15 +111,18 @@ func (i *IndexNode) waitTaskFinish() {
}
gracefulTimeout := Params.IndexNodeCfg.GracefulStopTimeout
timer := time.NewTimer(gracefulTimeout.GetAsDuration(time.Second))
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timeoutCtx, cancel := context.WithTimeout(i.loopCtx, gracefulTimeout.GetAsDuration(time.Second))
defer cancel()
for {
select {
case <-time.Tick(time.Second):
case <-ticker.C:
if !i.hasInProgressTask() {
return
}
case <-timer.C:
case <-timeoutCtx.Done():
log.Warn("timeout, the index node has some progress task")
for _, info := range i.tasks {
if info.state == commonpb.IndexState_InProgress {
......
......@@ -185,12 +185,14 @@ func TestClient_SeekLatest(t *testing.T) {
msgChan = consumer2.Chan()
loop := true
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for loop {
select {
case msg := <-msgChan:
assert.Equal(t, len(msg.Payload), 8)
loop = false
case <-time.After(2 * time.Second):
case <-ticker.C:
msg := &ProducerMessage{
Payload: make([]byte, 8),
}
......
......@@ -203,11 +203,11 @@ func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup) {
func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup, vchannel string) {
defer wg.Done()
var lastTs typeutil.Timestamp
timeoutCtx, cancel := context.WithTimeout(ctx, 5000*time.Millisecond)
defer cancel()
for {
select {
case <-ctx.Done():
return
case <-time.After(5000 * time.Millisecond): // no message to consume
case <-timeoutCtx.Done():
return
case pack := <-suite.vchannels[vchannel].output:
assert.Greater(suite.T(), pack.EndTs, lastTs)
......@@ -231,11 +231,13 @@ func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup
func (suite *SimulationSuite) produceTimeTickOnly(ctx context.Context) {
var tt = 1
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
case <-ticker.C:
ts := uint64(tt * 1000)
err := suite.producer.Produce(&msgstream.MsgPack{
Msgs: []msgstream.TsMsg{genTimeTickMsg(ts)},
......
......@@ -58,9 +58,11 @@ func getSeekPositions(factory msgstream.Factory, pchannel string, maxNum int) ([
defer stream.Close()
stream.AsConsumer([]string{pchannel}, fmt.Sprintf("%d", rand.Int()), mqwrapper.SubscriptionPositionEarliest)
positions := make([]*msgstream.MsgPosition, 0)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for {
select {
case <-time.After(100 * time.Millisecond): // no message to consume
case <-timeoutCtx.Done(): // no message to consume
return positions, nil
case pack := <-stream.Chan():
positions = append(positions, pack.EndPositions[0])
......
......@@ -526,7 +526,8 @@ func TestPulsarClient_SeekLatest(t *testing.T) {
defer consumer.Close()
msgChan := consumer.Chan()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
loop := true
for loop {
select {
......@@ -536,7 +537,7 @@ func TestPulsarClient_SeekLatest(t *testing.T) {
log.Info("RECV", zap.Any("v", v))
assert.Equal(t, v, 4)
loop = false
case <-time.After(2 * time.Second):
case <-ticker.C:
log.Info("after 2 seconds")
msg := &mqwrapper.ProducerMessage{
Payload: IntToBytes(4),
......
......@@ -107,8 +107,8 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
channelTicker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := channelTicker.start()
assert.Equal(t, nil, err)
var wg sync.WaitGroup
......@@ -116,14 +116,15 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
b := make(chan struct{}, 1)
go func() {
defer wg.Done()
timer := time.NewTicker(interval * 40)
ticker := time.NewTicker(interval * 40)
defer ticker.Stop()
for {
select {
case <-b:
return
case <-timer.C:
case <-ticker.C:
for _, pchan := range pchans {
ts, err := ticker.getLastTick(pchan)
ts, err := channelTicker.getLastTick(pchan)
assert.Equal(t, nil, err)
log.Debug("TestChannelsTimeTickerImpl_getLastTick",
zap.Any("pchan", pchan),
......@@ -137,7 +138,7 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
wg.Wait()
defer func() {
err := ticker.close()
err := channelTicker.close()
assert.Equal(t, nil, err)
}()
......@@ -154,8 +155,8 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
channelTicker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := channelTicker.start()
assert.Equal(t, nil, err)
var wg sync.WaitGroup
......@@ -163,13 +164,14 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
b := make(chan struct{}, 1)
go func() {
defer wg.Done()
timer := time.NewTicker(interval * 40)
ticker := time.NewTicker(interval * 40)
defer ticker.Stop()
for {
select {
case <-b:
return
case <-timer.C:
stats, _, err := ticker.getMinTsStatistics()
case <-ticker.C:
stats, _, err := channelTicker.getMinTsStatistics()
assert.Equal(t, nil, err)
for pchan, ts := range stats {
log.Debug("TestChannelsTimeTickerImpl_getLastTick",
......@@ -184,7 +186,7 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
wg.Wait()
defer func() {
err := ticker.close()
err := channelTicker.close()
assert.Equal(t, nil, err)
}()
......@@ -201,8 +203,8 @@ func TestChannelsTimeTickerImpl_getMinTick(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
channelTicker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := channelTicker.start()
assert.Equal(t, nil, err)
var wg sync.WaitGroup
......@@ -211,13 +213,14 @@ func TestChannelsTimeTickerImpl_getMinTick(t *testing.T) {
ts := typeutil.ZeroTimestamp
go func() {
defer wg.Done()
timer := time.NewTicker(interval * 40)
ticker := time.NewTicker(interval * 40)
defer ticker.Stop()
for {
select {
case <-b:
return
case <-timer.C:
minTs := ticker.getMinTick()
case <-ticker.C:
minTs := channelTicker.getMinTick()
assert.GreaterOrEqual(t, minTs, ts)
}
}
......@@ -227,7 +230,7 @@ func TestChannelsTimeTickerImpl_getMinTick(t *testing.T) {
wg.Wait()
defer func() {
err := ticker.close()
err := channelTicker.close()
assert.Equal(t, nil, err)
}()
......
......@@ -268,14 +268,14 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
go func() {
defer node.wg.Done()
timer := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
for {
select {
case <-node.ctx.Done():
log.Info("send channels time tick loop exit")
return
case <-timer.C:
case <-ticker.C:
stats, ts, err := node.chTicker.getMinTsStatistics()
if err != nil {
log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err))
......
......@@ -58,6 +58,7 @@ func (dh *distHandler) start(ctx context.Context) {
logger := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60)
logger.Info("start dist handler")
ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
failures := 0
for {
select {
......
......@@ -71,6 +71,7 @@ func (scheduler *Scheduler) schedule(ctx context.Context) {
go func() {
defer scheduler.wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
......
......@@ -50,6 +50,7 @@ func (o *LeaderObserver) Start(ctx context.Context) {
go func() {
defer o.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-o.closeCh:
......
......@@ -63,6 +63,7 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) {
log.Info("Start check replica loop")
ticker := time.NewTicker(params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
......
......@@ -61,6 +61,7 @@ func (ob *ResourceObserver) schedule(ctx context.Context) {
log.Info("Start check resource group loop")
ticker := time.NewTicker(params.Params.QueryCoordCfg.CheckResourceGroupInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
......
......@@ -89,6 +89,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
log.Info("Start update next target loop")
ticker := time.NewTicker(params.Params.QueryCoordCfg.UpdateNextTargetInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
......
......@@ -100,6 +100,7 @@ func (c *QueryCluster) Stop() {
func (c *QueryCluster) updateLoop() {
defer c.wg.Done()
ticker := time.NewTicker(updateTickerDuration)
defer ticker.Stop()
for {
select {
case <-c.ch:
......
......@@ -215,6 +215,7 @@ func (c *Core) sendMinDdlTsAsTt() {
func (c *Core) startTimeTickLoop() {
defer c.wg.Done()
ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
......
......@@ -44,8 +44,10 @@ import (
// CheckGrpcReady wait for context timeout, or wait 100ms then send nil to targetCh
func CheckGrpcReady(ctx context.Context, targetCh chan error) {
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()
select {
case <-time.After(100 * time.Millisecond):
case <-timer.C:
targetCh <- nil
case <-ctx.Done():
return
......
......@@ -597,12 +597,10 @@ func (suite *SessionWithVersionSuite) TestWatchServicesWithVersionRange() {
}
}()
t := time.NewTimer(time.Second)
defer t.Stop()
select {
case evt := <-ch:
suite.Equal(suite.sessions[1].ServerID, evt.Session.ServerID)
case <-t.C:
case <-time.After(time.Second):
suite.Fail("no event received, failing")
}
})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册