From 368a5f4b090e1b325ff070ddbacf189ac8e4e545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Mon, 23 Apr 2018 22:41:22 +0800 Subject: [PATCH] Add missing tables which need to clean data when timeout. (#1110) * Delete aggregate with second time bucket. * Add missing tables which need to clean data when timeout. * Fixed check style error. --- .../storage/es/DataTTLKeeperTimer.java | 209 ++++++++++++++---- .../es/base/dao/AbstractPersistenceEsDAO.java | 5 +- ...nstanceReferenceAlarmEsPersistenceDAO.java | 4 - ...nceReferenceDayMetricEsPersistenceDAO.java | 4 - ...ceReferenceHourMetricEsPersistenceDAO.java | 4 - ...eReferenceMonthMetricEsPersistenceDAO.java | 4 - .../es/DataTTLKeeperTimerTestCase.java | 68 ++++++ 7 files changed, 233 insertions(+), 65 deletions(-) create mode 100644 apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java index aacb4d707..a43534e25 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimer.java @@ -19,30 +19,33 @@ package org.apache.skywalking.apm.collector.storage.es; import java.util.Calendar; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.skywalking.apm.collector.core.module.ModuleManager; +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; import org.apache.skywalking.apm.collector.storage.StorageModule; -import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingMinutePersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMinuteMetricPersistenceDAO; -import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO; +import org.apache.skywalking.apm.collector.storage.dao.*; +import org.apache.skywalking.apm.collector.storage.dao.acp.*; +import org.apache.skywalking.apm.collector.storage.dao.alarm.*; +import org.apache.skywalking.apm.collector.storage.dao.amp.*; +import org.apache.skywalking.apm.collector.storage.dao.ampp.*; +import org.apache.skywalking.apm.collector.storage.dao.armp.*; +import org.apache.skywalking.apm.collector.storage.dao.cpu.*; +import org.apache.skywalking.apm.collector.storage.dao.gc.*; +import org.apache.skywalking.apm.collector.storage.dao.imp.*; +import org.apache.skywalking.apm.collector.storage.dao.impp.*; +import org.apache.skywalking.apm.collector.storage.dao.irmp.*; +import org.apache.skywalking.apm.collector.storage.dao.memory.*; +import org.apache.skywalking.apm.collector.storage.dao.mpool.*; +import org.apache.skywalking.apm.collector.storage.dao.rtd.*; +import org.apache.skywalking.apm.collector.storage.dao.smp.*; +import org.apache.skywalking.apm.collector.storage.dao.srmp.*; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author peng-yongsheng */ -public class DataTTLKeeperTimer { +class DataTTLKeeperTimer { private static final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class); private final ModuleManager moduleManager; @@ -50,7 +53,7 @@ public class DataTTLKeeperTimer { private final String selfAddress; private final int daysBefore; - public DataTTLKeeperTimer(ModuleManager moduleManager, + DataTTLKeeperTimer(ModuleManager moduleManager, StorageModuleEsNamingListener namingListener, String selfAddress, int daysBefore) { this.moduleManager = moduleManager; this.namingListener = namingListener; @@ -58,13 +61,25 @@ public class DataTTLKeeperTimer { this.daysBefore = daysBefore; } - public void start() { + void start() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( new RunnableWithExceptionProtection(this::delete, t -> logger.error("Remove data in background failure.", t)), 1, 8, TimeUnit.HOURS); } private void delete() { + if (!namingListener.getAddresses().iterator().next().equals(selfAddress)) { + return; + } + + TimeBuckets timeBuckets = convertTimeBucket(); + + deleteJVMRelatedData(timeBuckets); + deleteTraceRelatedData(timeBuckets); + deleteAlarmRelatedData(timeBuckets); + } + + TimeBuckets convertTimeBucket() { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(System.currentTimeMillis()); calendar.add(Calendar.DAY_OF_MONTH, -daysBefore); @@ -77,45 +92,149 @@ public class DataTTLKeeperTimer { calendar.set(Calendar.HOUR_OF_DAY, 23); calendar.set(Calendar.MINUTE, 59); calendar.set(Calendar.SECOND, 59); + long endTimestamp = calendar.getTimeInMillis(); - deleteJVMRelatedData(startTimestamp, endTimestamp); - deleteTraceRelatedData(startTimestamp, endTimestamp); - } + TimeBuckets timeBuckets = new TimeBuckets(); + timeBuckets.startSecondTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp); + timeBuckets.endSecondTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp); - private void deleteJVMRelatedData(long startTimestamp, long endTimestamp) { - moduleManager.find(StorageModule.NAME).getService(ICpuMinuteMetricPersistenceDAO.class).deleteHistory(startTimestamp, endTimestamp); + timeBuckets.startMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp); + timeBuckets.endMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp); - moduleManager.find(StorageModule.NAME).getService(IGCMinuteMetricPersistenceDAO.class).deleteHistory(startTimestamp, endTimestamp); + timeBuckets.startHourTimeBucket = TimeBucketUtils.INSTANCE.minuteToHour(timeBuckets.startMinuteTimeBucket); + timeBuckets.endHourTimeBucket = TimeBucketUtils.INSTANCE.minuteToHour(timeBuckets.endMinuteTimeBucket); - moduleManager.find(StorageModule.NAME).getService(IMemoryMinuteMetricPersistenceDAO.class).deleteHistory(startTimestamp, endTimestamp); + timeBuckets.startDayTimeBucket = TimeBucketUtils.INSTANCE.minuteToDay(timeBuckets.startMinuteTimeBucket); + timeBuckets.endDayTimeBucket = TimeBucketUtils.INSTANCE.minuteToDay(timeBuckets.endMinuteTimeBucket); - moduleManager.find(StorageModule.NAME).getService(IMemoryPoolMinuteMetricPersistenceDAO.class).deleteHistory(startTimestamp, endTimestamp); + timeBuckets.startMonthTimeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(timeBuckets.startMinuteTimeBucket); + timeBuckets.endMonthTimeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(timeBuckets.endMinuteTimeBucket); + + return timeBuckets; } - private void deleteTraceRelatedData(long startTimestamp, long endTimestamp) { - IGlobalTracePersistenceDAO globalTracePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IGlobalTracePersistenceDAO.class); - globalTracePersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + private void deleteAlarmRelatedData(TimeBuckets timeBuckets) { + moduleManager.find(StorageModule.NAME).getService(IApplicationAlarmPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationAlarmListMinutePersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationAlarmListHourPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationAlarmListDayPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationAlarmListMonthPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceAlarmPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceAlarmListPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IInstanceAlarmPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); - IInstanceMinuteMetricPersistenceDAO instanceMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceMinuteMetricPersistenceDAO.class); - instanceMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + moduleManager.find(StorageModule.NAME).getService(IInstanceAlarmListPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); - IApplicationComponentMinutePersistenceDAO applicationComponentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationComponentMinutePersistenceDAO.class); - applicationComponentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + moduleManager.find(StorageModule.NAME).getService(IInstanceReferenceAlarmPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IInstanceReferenceAlarmListPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IServiceAlarmPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IServiceAlarmListPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IServiceReferenceAlarmPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IServiceReferenceAlarmListPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + } + + private void deleteJVMRelatedData(TimeBuckets timeBuckets) { + moduleManager.find(StorageModule.NAME).getService(ICpuMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(ICpuHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(ICpuDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(ICpuMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IGCMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IGCHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IGCDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IGCMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IMemoryMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IMemoryHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IMemoryDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IMemoryMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IMemoryPoolMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IMemoryPoolHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IMemoryPoolDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IMemoryPoolMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + } + + private void deleteTraceRelatedData(TimeBuckets timeBuckets) { + moduleManager.find(StorageModule.NAME).getService(IGlobalTracePersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(ISegmentDurationPersistenceDAO.class).deleteHistory(timeBuckets.startSecondTimeBucket, timeBuckets.endSecondTimeBucket); + moduleManager.find(StorageModule.NAME).getService(ISegmentPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationComponentMinutePersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationComponentHourPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationComponentDayPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationComponentMonthPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationMappingMinutePersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationMappingHourPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationMappingDayPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationMappingMonthPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IInstanceMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IInstanceMappingMinutePersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceMappingHourPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceMappingDayPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceMappingMonthPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IInstanceReferenceMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceReferenceHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceReferenceDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IInstanceReferenceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IResponseTimeDistributionMinutePersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IResponseTimeDistributionHourPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IResponseTimeDistributionDayPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IResponseTimeDistributionMonthPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IServiceMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IServiceHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IServiceDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IServiceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + + moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMinuteMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMinuteTimeBucket, timeBuckets.endMinuteTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IServiceReferenceHourMetricPersistenceDAO.class).deleteHistory(timeBuckets.startHourTimeBucket, timeBuckets.endHourTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IServiceReferenceDayMetricPersistenceDAO.class).deleteHistory(timeBuckets.startDayTimeBucket, timeBuckets.endDayTimeBucket); + moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.startMonthTimeBucket, timeBuckets.endMonthTimeBucket); + } - IApplicationMappingMinutePersistenceDAO applicationMappingPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationMappingMinutePersistenceDAO.class); - applicationMappingPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + class TimeBuckets { + private long startSecondTimeBucket; + private long endSecondTimeBucket; - IApplicationReferenceMinuteMetricPersistenceDAO applicationReferenceMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationReferenceMinuteMetricPersistenceDAO.class); - applicationReferenceMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + private long startMinuteTimeBucket; + private long endMinuteTimeBucket; - ISegmentDurationPersistenceDAO segmentDurationPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentDurationPersistenceDAO.class); - segmentDurationPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + private long startHourTimeBucket; + private long endHourTimeBucket; - ISegmentPersistenceDAO segmentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentPersistenceDAO.class); - segmentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + private long startDayTimeBucket; + private long endDayTimeBucket; - IServiceReferenceMinuteMetricPersistenceDAO serviceReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMinuteMetricPersistenceDAO.class); - serviceReferencePersistenceDAO.deleteHistory(startTimestamp, endTimestamp); + private long startMonthTimeBucket; + private long endMonthTimeBucket; } -} +} \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java index 9647b97b6..c32d6138f 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java @@ -21,7 +21,6 @@ package org.apache.skywalking.apm.collector.storage.es.base.dao; import java.util.Map; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.core.data.StreamData; -import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -75,9 +74,7 @@ public abstract class AbstractPersistenceEsDAO e protected abstract String timeBucketColumnNameForDelete(); @Override - public final void deleteHistory(Long startTimestamp, Long endTimestamp) { - long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp); - long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp); + public final void deleteHistory(Long startTimeBucket, Long endTimeBucket) { BulkByScrollResponse response = getClient().prepareDelete( QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket), tableName()) diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java index ae9752e50..fdda6eaaa 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java @@ -28,16 +28,12 @@ import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReference import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmTable; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class InstanceReferenceAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO implements IInstanceReferenceAlarmPersistenceDAO { - private final Logger logger = LoggerFactory.getLogger(InstanceReferenceAlarmEsPersistenceDAO.class); - public InstanceReferenceAlarmEsPersistenceDAO(ElasticSearchClient client) { super(client); } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java index c9eefd6e3..f7a2fdcd5 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceDayMetricEsPersistenceDAO.java @@ -26,16 +26,12 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class InstanceReferenceDayMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceDayMetricPersistenceDAO { - private final Logger logger = LoggerFactory.getLogger(InstanceReferenceDayMetricEsPersistenceDAO.class); - public InstanceReferenceDayMetricEsPersistenceDAO(ElasticSearchClient client) { super(client); } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java index 8408c0b2f..030cae5dc 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceHourMetricEsPersistenceDAO.java @@ -26,16 +26,12 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class InstanceReferenceHourMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceHourMetricPersistenceDAO { - private static final Logger logger = LoggerFactory.getLogger(InstanceReferenceHourMetricEsPersistenceDAO.class); - public InstanceReferenceHourMetricEsPersistenceDAO(ElasticSearchClient client) { super(client); } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java index b9d763770..e6dd62c0c 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/InstanceReferenceMonthMetricEsPersistenceDAO.java @@ -26,16 +26,12 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class InstanceReferenceMonthMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceMonthMetricPersistenceDAO { - private final Logger logger = LoggerFactory.getLogger(InstanceReferenceMonthMetricEsPersistenceDAO.class); - public InstanceReferenceMonthMetricEsPersistenceDAO(ElasticSearchClient client) { super(client); } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java new file mode 100644 index 000000000..e6426bea7 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/test/java/org/apache/skywalking/apm/collector/storage/es/DataTTLKeeperTimerTestCase.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.storage.es; + +import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.junit.*; +import org.powermock.reflect.Whitebox; + +/** + * @author peng-yongsheng + */ +public class DataTTLKeeperTimerTestCase { + + @Test + public void testConvertTimeBucket() { + DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null, 8); + DataTTLKeeperTimer.TimeBuckets timeBuckets = timer.convertTimeBucket(); + + long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(System.currentTimeMillis()); + long dayTimeBucket = TimeBucketUtils.INSTANCE.minuteToDay(minuteTimeBucket); + + long startSecondTimeBucket = Whitebox.getInternalState(timeBuckets, "startSecondTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) * 1000000, startSecondTimeBucket); + + long endSecondTimeBucket = Whitebox.getInternalState(timeBuckets, "endSecondTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) * 1000000 + 235959, endSecondTimeBucket); + + long startMinuteTimeBucket = Whitebox.getInternalState(timeBuckets, "startMinuteTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) * 10000, startMinuteTimeBucket); + + long endMinuteTimeBucket = Whitebox.getInternalState(timeBuckets, "endMinuteTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) * 10000 + 2359, endMinuteTimeBucket); + + long startHourTimeBucket = Whitebox.getInternalState(timeBuckets, "startHourTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) * 100, startHourTimeBucket); + + long endHourTimeBucket = Whitebox.getInternalState(timeBuckets, "endHourTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) * 100 + 23, endHourTimeBucket); + + long startDayTimeBucket = Whitebox.getInternalState(timeBuckets, "startDayTimeBucket"); + Assert.assertEquals(dayTimeBucket - 8, startDayTimeBucket); + + long endDayTimeBucket = Whitebox.getInternalState(timeBuckets, "endDayTimeBucket"); + Assert.assertEquals(dayTimeBucket - 8, endDayTimeBucket); + + long startMonthTimeBucket = Whitebox.getInternalState(timeBuckets, "startMonthTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) / 100, startMonthTimeBucket); + + long endMonthTimeBucket = Whitebox.getInternalState(timeBuckets, "endMonthTimeBucket"); + Assert.assertEquals((dayTimeBucket - 8) / 100, endMonthTimeBucket); + } +} -- GitLab