未验证 提交 21cc39c6 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #616 from OpenSkywalking/feature/history-data-delete

Delete history data
......@@ -37,3 +37,4 @@ storage:
cluster_nodes: {ES_ADDRESSES}
index_shards_number: 2
index_replicas_number: 0
ttl: 7
......@@ -37,3 +37,4 @@ ui:
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
# ttl: 7
\ No newline at end of file
......@@ -37,6 +37,8 @@ import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.ClientException;
......@@ -146,6 +148,10 @@ public class ElasticSearchClient implements Client {
return client.prepareGet(indexName, "type", id);
}
public DeleteByQueryRequestBuilder prepareDelete() {
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
}
public MultiGetRequestBuilder prepareMultiGet() {
return client.prepareMultiGet();
}
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.core.util;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author peng-yongsheng
......@@ -34,10 +35,18 @@ public class CollectionUtils {
return list == null || list.size() == 0;
}
public static boolean isEmpty(Set set) {
return set == null || set.size() == 0;
}
public static boolean isNotEmpty(List list) {
return !isEmpty(list);
}
public static boolean isNotEmpty(Set set) {
return !isEmpty(set);
}
public static boolean isNotEmpty(Map map) {
return !isEmpty(map);
}
......
......@@ -29,4 +29,6 @@ public interface IPersistenceDAO<Insert, Update, DataImpl extends Data> extends
Insert prepareBatchInsert(DataImpl data);
Update prepareBatchUpdate(DataImpl data);
void deleteHistory(Long startTimestamp, Long endTimestamp);
}
......@@ -36,5 +36,10 @@
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es;
import java.util.Calendar;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
/**
* @author peng-yongsheng
*/
public class DataTTLKeeperTimer {
private final ModuleManager moduleManager;
private final StorageModuleEsNamingListener namingListener;
private final String selfAddress;
private final int daysBefore;
public DataTTLKeeperTimer(ModuleManager moduleManager,
StorageModuleEsNamingListener namingListener, String selfAddress, int daysBefore) {
this.moduleManager = moduleManager;
this.namingListener = namingListener;
this.selfAddress = selfAddress;
this.daysBefore = daysBefore;
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::delete, 1, 8, TimeUnit.HOURS);
}
private void delete() {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(System.currentTimeMillis());
calendar.set(Calendar.DAY_OF_MONTH, -daysBefore);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
long startTimestamp = calendar.getTimeInMillis();
calendar.set(Calendar.MINUTE, 59);
calendar.set(Calendar.SECOND, 59);
long endTimestamp = calendar.getTimeInMillis();
deleteJVMRelatedData(startTimestamp, endTimestamp);
deleteTraceRelatedData(startTimestamp, endTimestamp);
}
private void deleteJVMRelatedData(long startTimestamp, long endTimestamp) {
ICpuMetricPersistenceDAO cpuMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ICpuMetricPersistenceDAO.class);
cpuMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IGCMetricPersistenceDAO gcMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IGCMetricPersistenceDAO.class);
gcMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IMemoryMetricPersistenceDAO memoryMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IMemoryMetricPersistenceDAO.class);
memoryMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IMemoryPoolMetricPersistenceDAO memoryPoolMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IMemoryPoolMetricPersistenceDAO.class);
memoryPoolMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
}
private void deleteTraceRelatedData(long startTimestamp, long endTimestamp) {
IGlobalTracePersistenceDAO globalTracePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IGlobalTracePersistenceDAO.class);
globalTracePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IInstPerformancePersistenceDAO instPerformancePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IInstPerformancePersistenceDAO.class);
instPerformancePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
INodeComponentPersistenceDAO nodeComponentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(INodeComponentPersistenceDAO.class);
nodeComponentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
INodeMappingPersistenceDAO nodeMappingPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(INodeMappingPersistenceDAO.class);
nodeMappingPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
INodeReferencePersistenceDAO nodeReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(INodeReferencePersistenceDAO.class);
nodeReferencePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
ISegmentCostPersistenceDAO segmentCostPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentCostPersistenceDAO.class);
segmentCostPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
ISegmentPersistenceDAO segmentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentPersistenceDAO.class);
segmentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IServiceReferencePersistenceDAO serviceReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferencePersistenceDAO.class);
serviceReferencePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.storage.StorageModule;
/**
* @author peng-yongsheng
*/
public class StorageModuleEsNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + StorageModule.NAME + "/" + StorageModuleEsProvider.NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -19,8 +19,12 @@
package org.skywalking.apm.collector.storage.es;
import java.util.Properties;
import java.util.UUID;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
......@@ -107,16 +111,19 @@ public class StorageModuleEsProvider extends ModuleProvider {
private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
public static final String NAME = "elasticsearch";
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
private static final String TIME_TO_LIVE_OF_DATA = "ttl";
private ElasticSearchClient elasticSearchClient;
private DataTTLKeeperTimer deleteTimer;
@Override public String name() {
return "elasticsearch";
return NAME;
}
@Override public Class<? extends Module> module() {
......@@ -147,14 +154,25 @@ public class StorageModuleEsProvider extends ModuleProvider {
} catch (ClientException | StorageException e) {
logger.error(e.getMessage(), e);
}
String uuId = UUID.randomUUID().toString();
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(StorageModule.NAME, this.name(), new StorageModuleEsRegistration(uuId, 0));
StorageModuleEsNamingListener namingListener = new StorageModuleEsNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
Integer beforeDay = (Integer)config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, uuId + 0, beforeDay);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
deleteTimer.start();
}
@Override public String[] requiredModules() {
return new String[0];
return new String[] {ClusterModule.NAME};
}
private void registerCacheDAO() throws ServiceNotProvidedException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.core.util.Const;
/**
* @author peng-yongsheng
*/
public class StorageModuleEsRegistration extends ModuleRegistration {
private final String virtualHost;
private final int virtualPort;
StorageModuleEsRegistration(String virtualHost, int virtualPort) {
this.virtualHost = virtualHost;
this.virtualPort = virtualPort;
}
@Override public Value buildValue() {
return new Value(this.virtualHost, virtualPort, Const.EMPTY_STRING);
}
}
......@@ -22,7 +22,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
......@@ -58,4 +61,16 @@ public class CpuMetricEsPersistenceDAO extends EsDAO implements ICpuMetricPersis
@Override public UpdateRequestBuilder prepareBatchUpdate(CpuMetric cpuMetric) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(CpuMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(CpuMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, CpuMetricTable.TABLE);
}
}
......@@ -22,17 +22,24 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GCMetricEsPersistenceDAO extends EsDAO implements IGCMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GCMetric> {
private final Logger logger = LoggerFactory.getLogger(GCMetricEsPersistenceDAO.class);
public GCMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -55,4 +62,16 @@ public class GCMetricEsPersistenceDAO extends EsDAO implements IGCMetricPersiste
@Override public UpdateRequestBuilder prepareBatchUpdate(GCMetric gcMetric) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(GCMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(GCMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GCMetricTable.TABLE);
}
}
......@@ -22,8 +22,11 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
......@@ -58,4 +61,16 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
logger.debug("global trace source: {}", source.toString());
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(GlobalTraceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GlobalTraceTable.TABLE);
}
}
......@@ -23,7 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
......@@ -80,4 +83,16 @@ public class InstPerformanceEsPersistenceDAO extends EsDAO implements IInstPerfo
return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstPerformanceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstPerformanceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstPerformanceTable.TABLE);
}
}
......@@ -67,4 +67,7 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getHeartBeatTime());
return getClient().prepareUpdate(InstanceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -22,17 +22,24 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class MemoryMetricEsPersistenceDAO extends EsDAO implements IMemoryMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryMetric> {
private final Logger logger = LoggerFactory.getLogger(MemoryMetricEsPersistenceDAO.class);
public MemoryMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -57,4 +64,16 @@ public class MemoryMetricEsPersistenceDAO extends EsDAO implements IMemoryMetric
@Override public UpdateRequestBuilder prepareBatchUpdate(MemoryMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(MemoryMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(MemoryMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, MemoryMetricTable.TABLE);
}
}
......@@ -22,17 +22,24 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class MemoryPoolMetricEsPersistenceDAO extends EsDAO implements IMemoryPoolMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryPoolMetric> {
private final Logger logger = LoggerFactory.getLogger(MemoryPoolMetricEsPersistenceDAO.class);
public MemoryPoolMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -57,4 +64,16 @@ public class MemoryPoolMetricEsPersistenceDAO extends EsDAO implements IMemoryPo
@Override public UpdateRequestBuilder prepareBatchUpdate(MemoryPoolMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(MemoryPoolMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(MemoryPoolMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, MemoryPoolMetricTable.TABLE);
}
}
......@@ -23,17 +23,24 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NodeComponentEsPersistenceDAO extends EsDAO implements INodeComponentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeComponent> {
private final Logger logger = LoggerFactory.getLogger(NodeComponentEsPersistenceDAO.class);
public NodeComponentEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -69,4 +76,16 @@ public class NodeComponentEsPersistenceDAO extends EsDAO implements INodeCompone
return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(NodeComponentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(NodeComponentTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, NodeComponentTable.TABLE);
}
}
......@@ -23,17 +23,24 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NodeMappingEsPersistenceDAO extends EsDAO implements INodeMappingPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeMapping> {
private final Logger logger = LoggerFactory.getLogger(NodeMappingEsPersistenceDAO.class);
public NodeMappingEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -68,4 +75,16 @@ public class NodeMappingEsPersistenceDAO extends EsDAO implements INodeMappingPe
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(NodeMappingTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(NodeMappingTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(NodeMappingTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, NodeMappingTable.TABLE);
}
}
......@@ -23,17 +23,24 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.noderef.NodeReferenceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NodeReferenceEsPersistenceDAO extends EsDAO implements INodeReferencePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeReference> {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceEsPersistenceDAO.class);
public NodeReferenceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -87,4 +94,16 @@ public class NodeReferenceEsPersistenceDAO extends EsDAO implements INodeReferen
return getClient().prepareUpdate(NodeReferenceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(NodeReferenceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(NodeReferenceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, NodeReferenceTable.TABLE);
}
}
......@@ -22,7 +22,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
......@@ -63,4 +66,16 @@ public class SegmentCostEsPersistenceDAO extends EsDAO implements ISegmentCostPe
logger.debug("segment cost source: {}", source.toString());
return getClient().prepareIndex(SegmentCostTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(SegmentCostTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(SegmentCostTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentCostTable.TABLE);
}
}
......@@ -23,7 +23,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.Segment;
......@@ -57,4 +60,16 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
logger.debug("segment source: {}", source.toString());
return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(SegmentTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentTable.TABLE);
}
}
......@@ -74,4 +74,7 @@ public class ServiceEntryEsPersistenceDAO extends EsDAO implements IServiceEntry
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -23,7 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
......@@ -97,4 +100,16 @@ public class ServiceReferenceEsPersistenceDAO extends EsDAO implements IServiceR
return getClient().prepareUpdate(ServiceReferenceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceReferenceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceTable.TABLE);
}
}
......@@ -63,4 +63,7 @@ public class CpuMetricH2PersistenceDAO extends H2DAO implements ICpuMetricPersis
@Override public H2SqlEntity prepareBatchUpdate(CpuMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -60,4 +60,7 @@ public class GCMetricH2PersistenceDAO extends H2DAO implements IGCMetricPersiste
@Override public H2SqlEntity prepareBatchUpdate(GCMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -64,4 +64,7 @@ public class GlobalTraceH2PersistenceDAO extends H2DAO implements IGlobalTracePe
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -97,4 +97,7 @@ public class InstPerformanceH2PersistenceDAO extends H2DAO implements IInstPerfo
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -81,4 +81,7 @@ public class InstanceHeartBeatH2PersistenceDAO extends H2DAO implements IInstanc
entity.setParams(params.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -62,4 +62,7 @@ public class MemoryMetricH2PersistenceDAO extends H2DAO implements IMemoryMetric
@Override public H2SqlEntity prepareBatchUpdate(MemoryMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -62,4 +62,7 @@ public class MemoryPoolMetricH2PersistenceDAO extends H2DAO implements IMemoryPo
@Override public H2SqlEntity prepareBatchUpdate(MemoryPoolMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -94,4 +94,7 @@ public class NodeComponentH2PersistenceDAO extends H2DAO implements INodeCompone
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -92,4 +92,7 @@ public class NodeMappingH2PersistenceDAO extends H2DAO implements INodeMappingPe
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -110,4 +110,7 @@ public class NodeReferenceH2PersistenceDAO extends H2DAO implements INodeReferen
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -69,4 +69,7 @@ public class SegmentCostH2PersistenceDAO extends H2DAO implements ISegmentCostPe
@Override public H2SqlEntity prepareBatchUpdate(SegmentCost data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -62,4 +62,7 @@ public class SegmentH2PersistenceDAO extends H2DAO implements ISegmentPersistenc
@Override public H2SqlEntity prepareBatchUpdate(Segment data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -97,4 +97,7 @@ public class ServiceEntryH2PersistenceDAO extends H2DAO implements IServiceEntry
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -120,4 +120,7 @@ public class ServiceReferenceH2PersistenceDAO extends H2DAO implements IServiceR
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册