From 53691ae261fc78c9cd17ea29cf19a4f08eff67bc Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 27 Aug 2018 18:12:11 +0300 Subject: [PATCH] Simplify SD update throttling (#4523) * simplfied SD updates throtling Signed-off-by: Krasi Georgiev * add default to catch cases when we don't have new updates. Signed-off-by: Krasi Georgiev --- discovery/manager.go | 54 ++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 97468a549..94b8caf98 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -82,10 +82,6 @@ type Manager struct { targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group - // True if updates were received in the last 5 seconds. - recentlyUpdated bool - // Protects recentlyUpdated. - recentlyUpdatedMtx sync.Mutex } // Run starts the background processing @@ -132,43 +128,47 @@ func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Dis go worker.Run(ctx, updates) go m.runProvider(ctx, poolKey, updates) - go m.runUpdater(ctx) } func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + updateReceived := make(chan struct{}, 1) + for { select { case <-ctx.Done(): return case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. + // Handle the case that a target provider(E.g. StaticProvider) exits and + // closes the channel before the context is done. + // This will prevent sending the updates to the receiver so we send them before exiting. if !ok { + select { + case m.syncCh <- m.allGroups(): + default: + level.Debug(m.logger).Log("msg", "discovery receiver's channel was full") + } return } m.updateGroup(poolKey, tgs) - m.recentlyUpdatedMtx.Lock() - m.recentlyUpdated = true - m.recentlyUpdatedMtx.Unlock() - } - } -} -func (m *Manager) runUpdater(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - m.recentlyUpdatedMtx.Lock() - if m.recentlyUpdated { - m.syncCh <- m.allGroups() - m.recentlyUpdated = false + // Signal that there was an update. + select { + case updateReceived <- struct{}{}: + default: + } + case <-ticker.C: // Some discoverers send updates too often so we send these to the receiver once every 5 seconds. + select { + case <-updateReceived: // Send only when there is a new update. + select { + case m.syncCh <- m.allGroups(): + default: + level.Debug(m.logger).Log("msg", "discovery receiver's channel was full") + } + default: } - m.recentlyUpdatedMtx.Unlock() } } } -- GitLab