From dc0f6530cb05b656be46f8146e0af9ae58b9bb3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Tue, 20 Aug 2019 15:36:31 +0800 Subject: [PATCH] Fix TTL issue. (#3292) * Fix TTL issue. * Fix issue. * Rename `getSecondTimeBucket` to `getRecordTimeBucket`. --- .../core/alarm/AlarmStandardPersistence.java | 2 +- .../oap/server/core/analysis/TimeBucket.java | 8 ++++- .../core/storage/ttl/GeneralStorageTTL.java | 6 +++- .../core/storage/ttl/RecordTTLCalculator.java | 34 ++++++++++++++++++ .../server/core/storage/ttl/StorageTTL.java | 4 ++- .../receiver/jaeger/JaegerGRPCHandler.java | 2 +- .../endpoint/MultiScopesSpanListener.java | 2 +- .../listener/segment/SegmentSpanListener.java | 2 +- .../receiver/zipkin/trace/SpanForward.java | 2 +- .../base/HistoryDeleteEsDAO.java | 9 ++++- .../ttl/ElasticsearchStorageTTL.java | 6 +++- .../ttl/EsRecordTTLCalculator.java | 35 +++++++++++++++++++ .../jdbc/h2/dao/H2HistoryDeleteDAO.java | 12 +++++-- 13 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java index 48dcc8a03a..6237bfd2af 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java @@ -45,7 +45,7 @@ public class AlarmStandardPersistence implements AlarmCallback { record.setName(message.getName()); record.setAlarmMessage(message.getAlarmMessage()); record.setStartTime(message.getStartTime()); - record.setTimeBucket(TimeBucket.getSecondTimeBucket(message.getStartTime())); + record.setTimeBucket(TimeBucket.getRecordTimeBucket(message.getStartTime())); RecordStreamProcessor.getInstance().in(record); }); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java index 18ff6f976a..3df3470fb8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java @@ -25,7 +25,13 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; */ public class TimeBucket { - public static long getSecondTimeBucket(long time) { + /** + * Record time bucket format in Second Unit. + * + * @param time Timestamp + * @return time in second format. + */ + public static long getRecordTimeBucket(long time) { return getTimeBucket(time, Downsampling.Second); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java index aefaa53c61..971bcb7641 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java @@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.analysis.Downsampling; */ public class GeneralStorageTTL implements StorageTTL { - @Override public TTLCalculator calculator(Downsampling downsampling) { + @Override public TTLCalculator metricsCalculator(Downsampling downsampling) { switch (downsampling) { case Hour: return new HourTTLCalculator(); @@ -38,4 +38,8 @@ public class GeneralStorageTTL implements StorageTTL { return new MinuteTTLCalculator(); } } + + @Override public TTLCalculator recordCalculator() { + return new RecordTTLCalculator(); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java new file mode 100644 index 0000000000..c06f26f5f9 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage.ttl; + +import org.apache.skywalking.oap.server.core.DataTTLConfig; +import org.joda.time.DateTime; + +/** + * Calculate TTL for record. + * + * @author wusheng + */ +public class RecordTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) { + return Long.valueOf(currentTime.plusMinutes(0 - dataTTLConfig.getRecordDataTTL()).toString("yyyyMMddHHmmss")); + } +} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java index 0f386597f1..7a0b4e9c80 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java @@ -23,5 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.Downsampling; * @author peng-yongsheng */ public interface StorageTTL { - TTLCalculator calculator(Downsampling downsampling); + TTLCalculator metricsCalculator(Downsampling downsampling); + + TTLCalculator recordCalculator(); } diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java index cfcb26d926..defcf4ed22 100644 --- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java +++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java @@ -99,7 +99,7 @@ public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImpl long duration = span.getDuration().getNanos() / 1_000_000; jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli()); - long timeBucket = TimeBucket.getSecondTimeBucket(jaegerSpan.getStartTime()); + long timeBucket = TimeBucket.getRecordTimeBucket(jaegerSpan.getStartTime()); jaegerSpan.setTimeBucket(timeBucket); jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration); jaegerSpan.setLatency((int)duration); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java index 20f3b9b7e8..7459abd1fa 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java @@ -157,7 +157,7 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId()); statement.setDatabaseServiceId(sourceBuilder.getDestServiceId()); statement.setLatency(sourceBuilder.getLatency()); - statement.setTimeBucket(TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime())); + statement.setTimeBucket(TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime())); statement.setTraceId(traceId); for (KeyStringValuePair tag : spanDecorator.getAllTags()) { if (SpanTags.DB_STATEMENT.equals(tag.getKey())) { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java index 1b637ceb3a..48c2618274 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java @@ -68,7 +68,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener return; } - long timeBucket = TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime()); + long timeBucket = TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime()); segment.setSegmentId(segmentCoreInfo.getSegmentId()); segment.setServiceId(segmentCoreInfo.getServiceId()); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java index a05e005b23..84353e8c6a 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java @@ -93,7 +93,7 @@ public class SpanForward { long startTime = span.timestampAsLong() / 1000; zipkinSpan.setStartTime(startTime); if (startTime != 0) { - long timeBucket = TimeBucket.getSecondTimeBucket(zipkinSpan.getStartTime()); + long timeBucket = TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime()); zipkinSpan.setTimeBucket(timeBucket); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java index 1299cc6732..e780e1fe1c 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL; +import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.joda.time.DateTime; @@ -51,7 +52,13 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO { ConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ConfigService.class); ElasticSearchClient client = getClient(); - long timeBefore = storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(), configService.getDataTTLConfig()); + TTLCalculator ttlCalculator; + if (model.isRecord()) { + ttlCalculator = storageTTL.recordCalculator(); + } else { + ttlCalculator = storageTTL.metricsCalculator(model.getDownsampling()); + } + long timeBefore = ttlCalculator.timeBefore(new DateTime(), configService.getDataTTLConfig()); if (model.isCapableOfTimeSeries()) { List indexes = client.retrievalIndexByAliases(model.getName()); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java index 5779de42e3..46af52d8dd 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java @@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.ttl.*; */ public class ElasticsearchStorageTTL implements StorageTTL { - @Override public TTLCalculator calculator(Downsampling downsampling) { + @Override public TTLCalculator metricsCalculator(Downsampling downsampling) { switch (downsampling) { case Month: return new MonthTTLCalculator(); @@ -37,4 +37,8 @@ public class ElasticsearchStorageTTL implements StorageTTL { return new DayTTLCalculator(); } } + + @Override public TTLCalculator recordCalculator() { + return new EsRecordTTLCalculator(); + } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java new file mode 100644 index 0000000000..19ce59d836 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl; + +import org.apache.skywalking.oap.server.core.DataTTLConfig; +import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator; +import org.joda.time.DateTime; + +/** + * Calculate TTL for record. + * + * @author wusheng + */ +public class EsRecordTTLCalculator implements TTLCalculator { + + @Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) { + return Long.valueOf(currentTime.plusDays(0 - dataTTLConfig.getRecordDataTTL()).toString("yyyyMMdd")); + } +} \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java index 69fcd40f84..f709566dcc 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java @@ -19,12 +19,14 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.io.IOException; -import java.sql.*; +import java.sql.Connection; +import java.sql.SQLException; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL; +import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -53,7 +55,13 @@ public class H2HistoryDeleteDAO implements IHistoryDeleteDAO { SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " + model.getName() + " where ").append(timeBucketColumnName).append("<= ?"); try (Connection connection = client.getConnection()) { - long timeBefore = storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(), configService.getDataTTLConfig()); + TTLCalculator ttlCalculator; + if (model.isRecord()) { + ttlCalculator = storageTTL.recordCalculator(); + } else { + ttlCalculator = storageTTL.metricsCalculator(model.getDownsampling()); + } + long timeBefore = ttlCalculator.timeBefore(new DateTime(), configService.getDataTTLConfig()); client.execute(connection, dataDeleteSQL.toString(), timeBefore); } catch (JDBCClientException | SQLException e) { throw new IOException(e.getMessage(), e); -- GitLab