diff --git a/storage/remote/intern.go b/storage/remote/intern.go index 3dffaeacbdf4c064d40c64149845e6625729ddc4..98eec34141a69ba52242704050162783dcaca5f4 100644 --- a/storage/remote/intern.go +++ b/storage/remote/intern.go @@ -27,7 +27,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -var interner = newPool() var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, diff --git a/storage/remote/intern_test.go b/storage/remote/intern_test.go index 1124ef0db2fbd01c8d3ba62e15226c2ece62dce5..56a908811489c1a3cf599ac83b698460e5d1388b 100644 --- a/storage/remote/intern_test.go +++ b/storage/remote/intern_test.go @@ -27,6 +27,7 @@ import ( ) func TestIntern(t *testing.T) { + interner := newPool() testString := "TestIntern" interner.intern(testString) interned, ok := interner.pool[testString] @@ -36,6 +37,7 @@ func TestIntern(t *testing.T) { } func TestIntern_MultiRef(t *testing.T) { + interner := newPool() testString := "TestIntern_MultiRef" interner.intern(testString) @@ -52,6 +54,7 @@ func TestIntern_MultiRef(t *testing.T) { } func TestIntern_DeleteRef(t *testing.T) { + interner := newPool() testString := "TestIntern_DeleteRef" interner.intern(testString) @@ -66,6 +69,7 @@ func TestIntern_DeleteRef(t *testing.T) { } func TestIntern_MultiRef_Concurrent(t *testing.T) { + interner := newPool() testString := "TestIntern_MultiRef_Concurrent" interner.intern(testString) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index cc6994eff39768dfb422601cca6d248a051c3f17..1a79d6c0ce96c96b0515c6400014ff260c6cbdcc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -260,7 +260,9 @@ type QueueManager struct { samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate - metrics *queueManagerMetrics + metrics *queueManagerMetrics + interner *pool + highestRecvTimestamp *maxGauge } // NewQueueManager builds a new QueueManager. @@ -276,6 +278,8 @@ func NewQueueManager( relabelConfigs []*relabel.Config, client WriteClient, flushDeadline time.Duration, + interner *pool, + highestRecvTimestamp *maxGauge, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -303,7 +307,9 @@ func NewQueueManager( samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), - metrics: metrics, + metrics: metrics, + interner: interner, + highestRecvTimestamp: highestRecvTimestamp, } t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) @@ -392,7 +398,7 @@ func (t *QueueManager) Stop() { // On shutdown, release the strings in the labels from the intern pool. t.seriesMtx.Lock() for _, labels := range t.seriesLabels { - releaseLabels(labels) + t.releaseLabels(labels) } t.seriesMtx.Unlock() t.metrics.unregister() @@ -410,13 +416,13 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { continue } t.seriesSegmentIndexes[s.Ref] = index - internLabels(lbls) + t.internLabels(lbls) // We should not ever be replacing a series labels in the map, but just // in case we do we need to ensure we do not leak the replaced interned // strings. if orig, ok := t.seriesLabels[s.Ref]; ok { - releaseLabels(orig) + t.releaseLabels(orig) } t.seriesLabels[s.Ref] = lbls } @@ -433,7 +439,7 @@ func (t *QueueManager) SeriesReset(index int) { for k, v := range t.seriesSegmentIndexes { if v < index { delete(t.seriesSegmentIndexes, k) - releaseLabels(t.seriesLabels[k]) + t.releaseLabels(t.seriesLabels[k]) delete(t.seriesLabels, k) delete(t.droppedSeries, k) } @@ -454,17 +460,17 @@ func (t *QueueManager) client() WriteClient { return t.storeClient } -func internLabels(lbls labels.Labels) { +func (t *QueueManager) internLabels(lbls labels.Labels) { for i, l := range lbls { - lbls[i].Name = interner.intern(l.Name) - lbls[i].Value = interner.intern(l.Value) + lbls[i].Name = t.interner.intern(l.Name) + lbls[i].Value = t.interner.intern(l.Value) } } -func releaseLabels(ls labels.Labels) { +func (t *QueueManager) releaseLabels(ls labels.Labels) { for _, l := range ls { - interner.release(l.Name) - interner.release(l.Value) + t.interner.release(l.Name) + t.interner.release(l.Value) } } @@ -564,7 +570,7 @@ func (t *QueueManager) calculateDesiredShards() int { samplesOutDuration = t.samplesOutDuration.rate() / float64(time.Second) samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate highestSent = t.metrics.highestSentTimestamp.Get() - highestRecv = highestTimestamp.Get() + highestRecv = t.highestRecvTimestamp.Get() delay = highestRecv - highestSent samplesPending = delay * samplesInRate * samplesKeptRatio ) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c7ee934b1976723960fcc358ac5e17b2d83e9cda..d8927d9e46711068a4e94206addbe09098403874 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -47,6 +47,17 @@ import ( const defaultFlushDeadline = 1 * time.Minute +func newHighestTimestampMetric() *maxGauge { + return &maxGauge{ + Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "highest_timestamp_in_seconds", + Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", + }), + } +} + func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. @@ -117,7 +128,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -160,7 +171,7 @@ func TestSampleDeliveryOrder(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), nil) m.StoreSeries(series, 0) m.Start() @@ -182,7 +193,7 @@ func TestShutdown(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric()) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -222,7 +233,7 @@ func TestSeriesReset(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric()) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -254,7 +265,7 @@ func TestReshard(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) m.StoreSeries(series, 0) m.Start() @@ -287,7 +298,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) m.Start() h.Unlock() h.Lock() @@ -305,7 +316,7 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) m.Start() for i := 1; i < 1000; i++ { @@ -353,7 +364,7 @@ func TestShouldReshard(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) @@ -560,7 +571,7 @@ func BenchmarkSampleDelivery(b *testing.B) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) m.StoreSeries(series, 0) // These should be received by the client. @@ -604,7 +615,7 @@ func BenchmarkStartup(b *testing.B) { c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) + config.DefaultQueueConfig, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric()) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -655,7 +666,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -675,7 +686,7 @@ func TestCalculateDesiredShards(t *testing.T) { samplesIn.incr(s) samplesIn.tick() - highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) + m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) } // helper function for sending samples. diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 73ac6f08beef29059aa321d45b9f118945ea6a6d..cafb579e74ab11a31e8bd00b6811a914438a9b07 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" @@ -100,6 +101,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { RemoteReadConfigs: tc.cfgs, } err := s.ApplyConfig(conf) + prometheus.Unregister(s.rws.highestTimestamp) gotError := err != nil testutil.Equals(t, tc.err, gotError) testutil.Ok(t, s.Close()) diff --git a/storage/remote/write.go b/storage/remote/write.go index 77a0ab7cedba2d5ed02f9008552eec961673c333..8b0bf7622f3a22c3d9a0d0b0859f216e4b43061a 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -35,14 +35,6 @@ var ( Name: "samples_in_total", Help: "Samples in to remote storage, compare to samples out for queue managers.", }) - highestTimestamp = maxGauge{ - Gauge: promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "highest_timestamp_in_seconds", - Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", - }), - } ) // WriteStorage represents all the remote write storage. @@ -58,6 +50,10 @@ type WriteStorage struct { queues map[string]*QueueManager samplesIn *ewmaRate flushDeadline time.Duration + interner *pool + + // For timestampTracker. + highestTimestamp *maxGauge } // NewWriteStorage creates and runs a WriteStorage. @@ -74,6 +70,18 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string flushDeadline: flushDeadline, samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), walDir: walDir, + interner: newPool(), + highestTimestamp: &maxGauge{ + Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "highest_timestamp_in_seconds", + Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", + }), + }, + } + if reg != nil { + reg.MustRegister(rws.highestTimestamp) } go rws.run() return rws @@ -150,6 +158,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rwConf.WriteRelabelConfigs, c, rws.flushDeadline, + rws.interner, + rws.highestTimestamp, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) @@ -173,7 +183,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { // Appender implements storage.Storage. func (rws *WriteStorage) Appender(_ context.Context) storage.Appender { return ×tampTracker{ - writeStorage: rws, + writeStorage: rws, + highestRecvTimestamp: rws.highestTimestamp, } } @@ -188,9 +199,10 @@ func (rws *WriteStorage) Close() error { } type timestampTracker struct { - writeStorage *WriteStorage - samples int64 - highestTimestamp int64 + writeStorage *WriteStorage + samples int64 + highestTimestamp int64 + highestRecvTimestamp *maxGauge } // Add implements storage.Appender. @@ -213,7 +225,7 @@ func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples) samplesIn.Add(float64(t.samples)) - highestTimestamp.Set(float64(t.highestTimestamp / 1000)) + t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000)) return nil }