未验证 提交 dc0f6530 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Fix TTL issue. (#3292)

* Fix TTL issue.

* Fix issue.

* Rename `getSecondTimeBucket` to `getRecordTimeBucket`.
上级 37f367a3
...@@ -45,7 +45,7 @@ public class AlarmStandardPersistence implements AlarmCallback { ...@@ -45,7 +45,7 @@ public class AlarmStandardPersistence implements AlarmCallback {
record.setName(message.getName()); record.setName(message.getName());
record.setAlarmMessage(message.getAlarmMessage()); record.setAlarmMessage(message.getAlarmMessage());
record.setStartTime(message.getStartTime()); record.setStartTime(message.getStartTime());
record.setTimeBucket(TimeBucket.getSecondTimeBucket(message.getStartTime())); record.setTimeBucket(TimeBucket.getRecordTimeBucket(message.getStartTime()));
RecordStreamProcessor.getInstance().in(record); RecordStreamProcessor.getInstance().in(record);
}); });
......
...@@ -25,7 +25,13 @@ import org.apache.skywalking.oap.server.core.UnexpectedException; ...@@ -25,7 +25,13 @@ import org.apache.skywalking.oap.server.core.UnexpectedException;
*/ */
public class TimeBucket { public class TimeBucket {
public static long getSecondTimeBucket(long time) { /**
* Record time bucket format in Second Unit.
*
* @param time Timestamp
* @return time in second format.
*/
public static long getRecordTimeBucket(long time) {
return getTimeBucket(time, Downsampling.Second); return getTimeBucket(time, Downsampling.Second);
} }
......
...@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.analysis.Downsampling; ...@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.analysis.Downsampling;
*/ */
public class GeneralStorageTTL implements StorageTTL { public class GeneralStorageTTL implements StorageTTL {
@Override public TTLCalculator calculator(Downsampling downsampling) { @Override public TTLCalculator metricsCalculator(Downsampling downsampling) {
switch (downsampling) { switch (downsampling) {
case Hour: case Hour:
return new HourTTLCalculator(); return new HourTTLCalculator();
...@@ -38,4 +38,8 @@ public class GeneralStorageTTL implements StorageTTL { ...@@ -38,4 +38,8 @@ public class GeneralStorageTTL implements StorageTTL {
return new MinuteTTLCalculator(); return new MinuteTTLCalculator();
} }
} }
@Override public TTLCalculator recordCalculator() {
return new RecordTTLCalculator();
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.ttl;
import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.joda.time.DateTime;
/**
* Calculate TTL for record.
*
* @author wusheng
*/
public class RecordTTLCalculator implements TTLCalculator {
@Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
return Long.valueOf(currentTime.plusMinutes(0 - dataTTLConfig.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
}
}
\ No newline at end of file
...@@ -23,5 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.Downsampling; ...@@ -23,5 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.Downsampling;
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface StorageTTL { public interface StorageTTL {
TTLCalculator calculator(Downsampling downsampling); TTLCalculator metricsCalculator(Downsampling downsampling);
TTLCalculator recordCalculator();
} }
...@@ -99,7 +99,7 @@ public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImpl ...@@ -99,7 +99,7 @@ public class JaegerGRPCHandler extends CollectorServiceGrpc.CollectorServiceImpl
long duration = span.getDuration().getNanos() / 1_000_000; long duration = span.getDuration().getNanos() / 1_000_000;
jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli()); jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(), span.getStartTime().getNanos()).toEpochMilli());
long timeBucket = TimeBucket.getSecondTimeBucket(jaegerSpan.getStartTime()); long timeBucket = TimeBucket.getRecordTimeBucket(jaegerSpan.getStartTime());
jaegerSpan.setTimeBucket(timeBucket); jaegerSpan.setTimeBucket(timeBucket);
jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration); jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
jaegerSpan.setLatency((int)duration); jaegerSpan.setLatency((int)duration);
......
...@@ -157,7 +157,7 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe ...@@ -157,7 +157,7 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId()); statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
statement.setDatabaseServiceId(sourceBuilder.getDestServiceId()); statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
statement.setLatency(sourceBuilder.getLatency()); statement.setLatency(sourceBuilder.getLatency());
statement.setTimeBucket(TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime())); statement.setTimeBucket(TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime()));
statement.setTraceId(traceId); statement.setTraceId(traceId);
for (KeyStringValuePair tag : spanDecorator.getAllTags()) { for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
if (SpanTags.DB_STATEMENT.equals(tag.getKey())) { if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
......
...@@ -68,7 +68,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener ...@@ -68,7 +68,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
return; return;
} }
long timeBucket = TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime()); long timeBucket = TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime());
segment.setSegmentId(segmentCoreInfo.getSegmentId()); segment.setSegmentId(segmentCoreInfo.getSegmentId());
segment.setServiceId(segmentCoreInfo.getServiceId()); segment.setServiceId(segmentCoreInfo.getServiceId());
......
...@@ -93,7 +93,7 @@ public class SpanForward { ...@@ -93,7 +93,7 @@ public class SpanForward {
long startTime = span.timestampAsLong() / 1000; long startTime = span.timestampAsLong() / 1000;
zipkinSpan.setStartTime(startTime); zipkinSpan.setStartTime(startTime);
if (startTime != 0) { if (startTime != 0) {
long timeBucket = TimeBucket.getSecondTimeBucket(zipkinSpan.getStartTime()); long timeBucket = TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime());
zipkinSpan.setTimeBucket(timeBucket); zipkinSpan.setTimeBucket(timeBucket);
} }
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.config.ConfigService; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL; import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL;
import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.joda.time.DateTime; import org.joda.time.DateTime;
...@@ -51,7 +52,13 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO { ...@@ -51,7 +52,13 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
ConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ConfigService.class); ConfigService configService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ConfigService.class);
ElasticSearchClient client = getClient(); ElasticSearchClient client = getClient();
long timeBefore = storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(), configService.getDataTTLConfig()); TTLCalculator ttlCalculator;
if (model.isRecord()) {
ttlCalculator = storageTTL.recordCalculator();
} else {
ttlCalculator = storageTTL.metricsCalculator(model.getDownsampling());
}
long timeBefore = ttlCalculator.timeBefore(new DateTime(), configService.getDataTTLConfig());
if (model.isCapableOfTimeSeries()) { if (model.isCapableOfTimeSeries()) {
List<String> indexes = client.retrievalIndexByAliases(model.getName()); List<String> indexes = client.retrievalIndexByAliases(model.getName());
......
...@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.ttl.*; ...@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.ttl.*;
*/ */
public class ElasticsearchStorageTTL implements StorageTTL { public class ElasticsearchStorageTTL implements StorageTTL {
@Override public TTLCalculator calculator(Downsampling downsampling) { @Override public TTLCalculator metricsCalculator(Downsampling downsampling) {
switch (downsampling) { switch (downsampling) {
case Month: case Month:
return new MonthTTLCalculator(); return new MonthTTLCalculator();
...@@ -37,4 +37,8 @@ public class ElasticsearchStorageTTL implements StorageTTL { ...@@ -37,4 +37,8 @@ public class ElasticsearchStorageTTL implements StorageTTL {
return new DayTTLCalculator(); return new DayTTLCalculator();
} }
} }
@Override public TTLCalculator recordCalculator() {
return new EsRecordTTLCalculator();
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl;
import org.apache.skywalking.oap.server.core.DataTTLConfig;
import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import org.joda.time.DateTime;
/**
* Calculate TTL for record.
*
* @author wusheng
*/
public class EsRecordTTLCalculator implements TTLCalculator {
@Override public long timeBefore(DateTime currentTime, DataTTLConfig dataTTLConfig) {
return Long.valueOf(currentTime.plusDays(0 - dataTTLConfig.getRecordDataTTL()).toString("yyyyMMdd"));
}
}
\ No newline at end of file
...@@ -19,12 +19,14 @@ ...@@ -19,12 +19,14 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException; import java.io.IOException;
import java.sql.*; import java.sql.Connection;
import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL; import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL;
import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -53,7 +55,13 @@ public class H2HistoryDeleteDAO implements IHistoryDeleteDAO { ...@@ -53,7 +55,13 @@ public class H2HistoryDeleteDAO implements IHistoryDeleteDAO {
SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " + model.getName() + " where ").append(timeBucketColumnName).append("<= ?"); SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " + model.getName() + " where ").append(timeBucketColumnName).append("<= ?");
try (Connection connection = client.getConnection()) { try (Connection connection = client.getConnection()) {
long timeBefore = storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(), configService.getDataTTLConfig()); TTLCalculator ttlCalculator;
if (model.isRecord()) {
ttlCalculator = storageTTL.recordCalculator();
} else {
ttlCalculator = storageTTL.metricsCalculator(model.getDownsampling());
}
long timeBefore = ttlCalculator.timeBefore(new DateTime(), configService.getDataTTLConfig());
client.execute(connection, dataDeleteSQL.toString(), timeBefore); client.execute(connection, dataDeleteSQL.toString(), timeBefore);
} catch (JDBCClientException | SQLException e) { } catch (JDBCClientException | SQLException e) {
throw new IOException(e.getMessage(), e); throw new IOException(e.getMessage(), e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册