未验证 提交 368a5f4b 编写于 作者: 彭勇升 pengys 提交者: GitHub

Add missing tables which need to clean data when timeout. (#1110)

* Delete aggregate with second time bucket.

* Add missing tables which need to clean data when timeout.

* Fixed check style error.
上级 1d0992ee
......@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.collector.storage.es.base.dao;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
......@@ -75,9 +74,7 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
protected abstract String timeBucketColumnNameForDelete();
@Override
public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
public final void deleteHistory(Long startTimeBucket, Long endTimeBucket) {
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
tableName())
......
......@@ -28,16 +28,12 @@ import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReference
import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<InstanceReferenceAlarm> implements IInstanceReferenceAlarmPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceReferenceAlarm> {
private final Logger logger = LoggerFactory.getLogger(InstanceReferenceAlarmEsPersistenceDAO.class);
public InstanceReferenceAlarmEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......
......@@ -26,16 +26,12 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceDayMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceDayMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceReferenceMetric> {
private final Logger logger = LoggerFactory.getLogger(InstanceReferenceDayMetricEsPersistenceDAO.class);
public InstanceReferenceDayMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......
......@@ -26,16 +26,12 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceHourMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceHourMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceReferenceMetric> {
private static final Logger logger = LoggerFactory.getLogger(InstanceReferenceHourMetricEsPersistenceDAO.class);
public InstanceReferenceHourMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......
......@@ -26,16 +26,12 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceMonthMetricEsPersistenceDAO extends AbstractInstanceReferenceMetricEsPersistenceDAO implements IInstanceReferenceMonthMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceReferenceMetric> {
private final Logger logger = LoggerFactory.getLogger(InstanceReferenceMonthMetricEsPersistenceDAO.class);
public InstanceReferenceMonthMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.junit.*;
import org.powermock.reflect.Whitebox;
/**
* @author peng-yongsheng
*/
public class DataTTLKeeperTimerTestCase {
@Test
public void testConvertTimeBucket() {
DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null, 8);
DataTTLKeeperTimer.TimeBuckets timeBuckets = timer.convertTimeBucket();
long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(System.currentTimeMillis());
long dayTimeBucket = TimeBucketUtils.INSTANCE.minuteToDay(minuteTimeBucket);
long startSecondTimeBucket = Whitebox.getInternalState(timeBuckets, "startSecondTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) * 1000000, startSecondTimeBucket);
long endSecondTimeBucket = Whitebox.getInternalState(timeBuckets, "endSecondTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) * 1000000 + 235959, endSecondTimeBucket);
long startMinuteTimeBucket = Whitebox.getInternalState(timeBuckets, "startMinuteTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) * 10000, startMinuteTimeBucket);
long endMinuteTimeBucket = Whitebox.getInternalState(timeBuckets, "endMinuteTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) * 10000 + 2359, endMinuteTimeBucket);
long startHourTimeBucket = Whitebox.getInternalState(timeBuckets, "startHourTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) * 100, startHourTimeBucket);
long endHourTimeBucket = Whitebox.getInternalState(timeBuckets, "endHourTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) * 100 + 23, endHourTimeBucket);
long startDayTimeBucket = Whitebox.getInternalState(timeBuckets, "startDayTimeBucket");
Assert.assertEquals(dayTimeBucket - 8, startDayTimeBucket);
long endDayTimeBucket = Whitebox.getInternalState(timeBuckets, "endDayTimeBucket");
Assert.assertEquals(dayTimeBucket - 8, endDayTimeBucket);
long startMonthTimeBucket = Whitebox.getInternalState(timeBuckets, "startMonthTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) / 100, startMonthTimeBucket);
long endMonthTimeBucket = Whitebox.getInternalState(timeBuckets, "endMonthTimeBucket");
Assert.assertEquals((dayTimeBucket - 8) / 100, endMonthTimeBucket);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册