提交 a9eaa2ee 编写于 作者: P peng-yongsheng

1. The collectors provider vote about leader which collector execute the delete operation.

2. Delete the history data before the setting that you can find the property name of history_delete_before_days in the application.yml file.
上级 b4b9835a
......@@ -37,3 +37,4 @@ ui:
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
# history_delete_before_days: 3
\ No newline at end of file
......@@ -37,6 +37,8 @@ import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.ClientException;
......@@ -146,6 +148,10 @@ public class ElasticSearchClient implements Client {
return client.prepareGet(indexName, "type", id);
}
public DeleteByQueryRequestBuilder prepareDelete() {
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
}
public MultiGetRequestBuilder prepareMultiGet() {
return client.prepareMultiGet();
}
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.core.util;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author peng-yongsheng
......@@ -34,10 +35,18 @@ public class CollectionUtils {
return list == null || list.size() == 0;
}
public static boolean isEmpty(Set set) {
return set == null || set.size() == 0;
}
public static boolean isNotEmpty(List list) {
return !isEmpty(list);
}
public static boolean isNotEmpty(Set set) {
return !isEmpty(set);
}
public static boolean isNotEmpty(Map map) {
return !isEmpty(map);
}
......
......@@ -29,4 +29,6 @@ public interface IPersistenceDAO<Insert, Update, DataImpl extends Data> extends
Insert prepareBatchInsert(DataImpl data);
Update prepareBatchUpdate(DataImpl data);
void deleteHistory(Long startTimestamp, Long endTimestamp);
}
......@@ -36,5 +36,10 @@
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es;
import java.util.Calendar;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
/**
* @author peng-yongsheng
*/
public class HistoryDataDeleteTimer {
private final ModuleManager moduleManager;
private final StorageModuleEsNamingListener namingListener;
private final String selfAddress;
private final int daysBefore;
public HistoryDataDeleteTimer(ModuleManager moduleManager,
StorageModuleEsNamingListener namingListener, String selfAddress, int daysBefore) {
this.moduleManager = moduleManager;
this.namingListener = namingListener;
this.selfAddress = selfAddress;
this.daysBefore = daysBefore;
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::delete, 1, 8, TimeUnit.HOURS);
}
private void tryDelete() {
if (CollectionUtils.isNotEmpty(namingListener.getAddresses())) {
String firstAddress = namingListener.getAddresses().iterator().next();
if (firstAddress.equals(selfAddress)) {
delete();
}
}
}
private void delete() {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(System.currentTimeMillis());
calendar.set(Calendar.DAY_OF_MONTH, -daysBefore);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
long startTimestamp = calendar.getTimeInMillis();
calendar.set(Calendar.MINUTE, 59);
calendar.set(Calendar.SECOND, 59);
long endTimestamp = calendar.getTimeInMillis();
deleteJVMMetricData(startTimestamp, endTimestamp);
deleteTraceMetricData(startTimestamp, endTimestamp);
}
private void deleteJVMMetricData(long startTimestamp, long endTimestamp) {
ICpuMetricPersistenceDAO cpuMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ICpuMetricPersistenceDAO.class);
cpuMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IGCMetricPersistenceDAO gcMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IGCMetricPersistenceDAO.class);
gcMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IMemoryMetricPersistenceDAO memoryMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IMemoryMetricPersistenceDAO.class);
memoryMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IMemoryPoolMetricPersistenceDAO memoryPoolMetricPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IMemoryPoolMetricPersistenceDAO.class);
memoryPoolMetricPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
}
private void deleteTraceMetricData(long startTimestamp, long endTimestamp) {
IGlobalTracePersistenceDAO globalTracePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IGlobalTracePersistenceDAO.class);
globalTracePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IInstPerformancePersistenceDAO instPerformancePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IInstPerformancePersistenceDAO.class);
instPerformancePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
INodeComponentPersistenceDAO nodeComponentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(INodeComponentPersistenceDAO.class);
nodeComponentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
INodeMappingPersistenceDAO nodeMappingPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(INodeMappingPersistenceDAO.class);
nodeMappingPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
INodeReferencePersistenceDAO nodeReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(INodeReferencePersistenceDAO.class);
nodeReferencePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
ISegmentCostPersistenceDAO segmentCostPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentCostPersistenceDAO.class);
segmentCostPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
ISegmentPersistenceDAO segmentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentPersistenceDAO.class);
segmentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IServiceReferencePersistenceDAO serviceReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferencePersistenceDAO.class);
serviceReferencePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.storage.StorageModule;
/**
* @author peng-yongsheng
*/
public class StorageModuleEsNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + StorageModule.NAME + "/" + StorageModuleEsProvider.NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -19,8 +19,12 @@
package org.skywalking.apm.collector.storage.es;
import java.util.Properties;
import java.util.UUID;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
......@@ -107,16 +111,19 @@ public class StorageModuleEsProvider extends ModuleProvider {
private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
public static final String NAME = "elasticsearch";
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
private static final String HISTORY_DELETE_BEFORE_DAYS = "history_delete_before_days";
private ElasticSearchClient elasticSearchClient;
private HistoryDataDeleteTimer deleteTimer;
@Override public String name() {
return "elasticsearch";
return NAME;
}
@Override public Class<? extends Module> module() {
......@@ -147,14 +154,25 @@ public class StorageModuleEsProvider extends ModuleProvider {
} catch (ClientException | StorageException e) {
logger.error(e.getMessage(), e);
}
String uuId = UUID.randomUUID().toString();
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(StorageModule.NAME, this.name(), new StorageModuleEsRegistration(uuId, 0));
StorageModuleEsNamingListener namingListener = new StorageModuleEsNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
Integer beforeDay = (Integer)config.getOrDefault(HISTORY_DELETE_BEFORE_DAYS, 3);
deleteTimer = new HistoryDataDeleteTimer(getManager(), namingListener, uuId + 0, beforeDay);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
deleteTimer.start();
}
@Override public String[] requiredModules() {
return new String[0];
return new String[] {ClusterModule.NAME};
}
private void registerCacheDAO() throws ServiceNotProvidedException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.core.util.Const;
/**
* @author peng-yongsheng
*/
public class StorageModuleEsRegistration extends ModuleRegistration {
private final String virtualHost;
private final int virtualPort;
StorageModuleEsRegistration(String virtualHost, int virtualPort) {
this.virtualHost = virtualHost;
this.virtualPort = virtualPort;
}
@Override public Value buildValue() {
return new Value(this.virtualHost, virtualPort, Const.EMPTY_STRING);
}
}
......@@ -22,7 +22,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
......@@ -58,4 +61,16 @@ public class CpuMetricEsPersistenceDAO extends EsDAO implements ICpuMetricPersis
@Override public UpdateRequestBuilder prepareBatchUpdate(CpuMetric cpuMetric) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(CpuMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(CpuMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, CpuMetricTable.TABLE);
}
}
......@@ -22,17 +22,24 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GCMetricEsPersistenceDAO extends EsDAO implements IGCMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GCMetric> {
private final Logger logger = LoggerFactory.getLogger(GCMetricEsPersistenceDAO.class);
public GCMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -55,4 +62,16 @@ public class GCMetricEsPersistenceDAO extends EsDAO implements IGCMetricPersiste
@Override public UpdateRequestBuilder prepareBatchUpdate(GCMetric gcMetric) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(GCMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(GCMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GCMetricTable.TABLE);
}
}
......@@ -22,8 +22,11 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
......@@ -58,4 +61,16 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
logger.debug("global trace source: {}", source.toString());
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(GlobalTraceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GlobalTraceTable.TABLE);
}
}
......@@ -23,7 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
......@@ -80,4 +83,16 @@ public class InstPerformanceEsPersistenceDAO extends EsDAO implements IInstPerfo
return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstPerformanceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstPerformanceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstPerformanceTable.TABLE);
}
}
......@@ -67,4 +67,7 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getHeartBeatTime());
return getClient().prepareUpdate(InstanceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -22,17 +22,24 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class MemoryMetricEsPersistenceDAO extends EsDAO implements IMemoryMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryMetric> {
private final Logger logger = LoggerFactory.getLogger(MemoryMetricEsPersistenceDAO.class);
public MemoryMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -57,4 +64,16 @@ public class MemoryMetricEsPersistenceDAO extends EsDAO implements IMemoryMetric
@Override public UpdateRequestBuilder prepareBatchUpdate(MemoryMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(MemoryMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(MemoryMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, MemoryMetricTable.TABLE);
}
}
......@@ -22,17 +22,24 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class MemoryPoolMetricEsPersistenceDAO extends EsDAO implements IMemoryPoolMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryPoolMetric> {
private final Logger logger = LoggerFactory.getLogger(MemoryPoolMetricEsPersistenceDAO.class);
public MemoryPoolMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -57,4 +64,16 @@ public class MemoryPoolMetricEsPersistenceDAO extends EsDAO implements IMemoryPo
@Override public UpdateRequestBuilder prepareBatchUpdate(MemoryPoolMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(MemoryPoolMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(MemoryPoolMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, MemoryPoolMetricTable.TABLE);
}
}
......@@ -23,17 +23,24 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NodeComponentEsPersistenceDAO extends EsDAO implements INodeComponentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeComponent> {
private final Logger logger = LoggerFactory.getLogger(NodeComponentEsPersistenceDAO.class);
public NodeComponentEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -69,4 +76,16 @@ public class NodeComponentEsPersistenceDAO extends EsDAO implements INodeCompone
return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(NodeComponentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(NodeComponentTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, NodeComponentTable.TABLE);
}
}
......@@ -23,17 +23,24 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NodeMappingEsPersistenceDAO extends EsDAO implements INodeMappingPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeMapping> {
private final Logger logger = LoggerFactory.getLogger(NodeMappingEsPersistenceDAO.class);
public NodeMappingEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -68,4 +75,16 @@ public class NodeMappingEsPersistenceDAO extends EsDAO implements INodeMappingPe
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(NodeMappingTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(NodeMappingTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(NodeMappingTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, NodeMappingTable.TABLE);
}
}
......@@ -23,17 +23,24 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.noderef.NodeReferenceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NodeReferenceEsPersistenceDAO extends EsDAO implements INodeReferencePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeReference> {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceEsPersistenceDAO.class);
public NodeReferenceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
......@@ -87,4 +94,16 @@ public class NodeReferenceEsPersistenceDAO extends EsDAO implements INodeReferen
return getClient().prepareUpdate(NodeReferenceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(NodeReferenceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(NodeReferenceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, NodeReferenceTable.TABLE);
}
}
......@@ -22,7 +22,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
......@@ -63,4 +66,16 @@ public class SegmentCostEsPersistenceDAO extends EsDAO implements ISegmentCostPe
logger.debug("segment cost source: {}", source.toString());
return getClient().prepareIndex(SegmentCostTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(SegmentCostTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(SegmentCostTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentCostTable.TABLE);
}
}
......@@ -23,7 +23,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.Segment;
......@@ -57,4 +60,16 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
logger.debug("segment source: {}", source.toString());
return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(SegmentTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentTable.TABLE);
}
}
......@@ -74,4 +74,7 @@ public class ServiceEntryEsPersistenceDAO extends EsDAO implements IServiceEntry
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -23,7 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
......@@ -97,4 +100,16 @@ public class ServiceReferenceEsPersistenceDAO extends EsDAO implements IServiceR
return getClient().prepareUpdate(ServiceReferenceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceReferenceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceTable.TABLE);
}
}
......@@ -63,4 +63,7 @@ public class CpuMetricH2PersistenceDAO extends H2DAO implements ICpuMetricPersis
@Override public H2SqlEntity prepareBatchUpdate(CpuMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -60,4 +60,7 @@ public class GCMetricH2PersistenceDAO extends H2DAO implements IGCMetricPersiste
@Override public H2SqlEntity prepareBatchUpdate(GCMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -64,4 +64,7 @@ public class GlobalTraceH2PersistenceDAO extends H2DAO implements IGlobalTracePe
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -97,4 +97,7 @@ public class InstPerformanceH2PersistenceDAO extends H2DAO implements IInstPerfo
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -81,4 +81,7 @@ public class InstanceHeartBeatH2PersistenceDAO extends H2DAO implements IInstanc
entity.setParams(params.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -62,4 +62,7 @@ public class MemoryMetricH2PersistenceDAO extends H2DAO implements IMemoryMetric
@Override public H2SqlEntity prepareBatchUpdate(MemoryMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -62,4 +62,7 @@ public class MemoryPoolMetricH2PersistenceDAO extends H2DAO implements IMemoryPo
@Override public H2SqlEntity prepareBatchUpdate(MemoryPoolMetric data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -94,4 +94,7 @@ public class NodeComponentH2PersistenceDAO extends H2DAO implements INodeCompone
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -92,4 +92,7 @@ public class NodeMappingH2PersistenceDAO extends H2DAO implements INodeMappingPe
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -110,4 +110,7 @@ public class NodeReferenceH2PersistenceDAO extends H2DAO implements INodeReferen
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -69,4 +69,7 @@ public class SegmentCostH2PersistenceDAO extends H2DAO implements ISegmentCostPe
@Override public H2SqlEntity prepareBatchUpdate(SegmentCost data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -62,4 +62,7 @@ public class SegmentH2PersistenceDAO extends H2DAO implements ISegmentPersistenc
@Override public H2SqlEntity prepareBatchUpdate(Segment data) {
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -97,4 +97,7 @@ public class ServiceEntryH2PersistenceDAO extends H2DAO implements IServiceEntry
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
......@@ -120,4 +120,7 @@ public class ServiceReferenceH2PersistenceDAO extends H2DAO implements IServiceR
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册