未验证 提交 0e60d40d 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support ES type/table namespace feature (#956)

* Proposal the code structure for table name prefix. Related to #932

* Fix style check.

* Keep ESDao table name in control.

* Finish all codes about ES namespace feature.
上级 11ea4be8
......@@ -19,12 +19,9 @@
package org.apache.skywalking.apm.collector.client.elasticsearch;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
......@@ -33,19 +30,25 @@ import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.apache.skywalking.apm.collector.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
/**
* @author peng-yongsheng
*/
......@@ -55,37 +58,42 @@ public class ElasticSearchClient implements Client {
private org.elasticsearch.client.Client client;
private final String namespace;
private final String clusterName;
private final Boolean clusterTransportSniffer;
private final String clusterNodes;
public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
this.namespace = namespace;
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
}
@Override public void initialize() throws ClientException {
@Override
public void initialize() throws ClientException {
Settings settings = Settings.builder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", clusterTransportSniffer)
.build();
.put("cluster.name", clusterName)
.put("client.transport.sniff", clusterTransportSniffer)
.build();
client = new PreBuiltTransportClient(settings);
List<AddressPairs> pairsList = parseClusterNodes(clusterNodes);
for (AddressPairs pairs : pairsList) {
try {
((PreBuiltTransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
((PreBuiltTransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
} catch (UnknownHostException e) {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
}
@Override public void shutdown() {
@Override
public void shutdown() {
}
......@@ -114,12 +122,14 @@ public class ElasticSearchClient implements Client {
public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
IndicesAdminClient adminClient = client.admin().indices();
indexName = formatIndexName(indexName);
CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
logger.info("create {} index with type of {} finished, isAcknowledged: {}", indexName, indexType, response.isAcknowledged());
return response.isShardsAcked();
}
public boolean deleteIndex(String indexName) {
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
......@@ -127,44 +137,84 @@ public class ElasticSearchClient implements Client {
}
public boolean isExistsIndex(String indexName) {
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
return response.isExists();
}
public SearchRequestBuilder prepareSearch(String indexName) {
indexName = formatIndexName(indexName);
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, "type", id);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
indexName = formatIndexName(indexName);
return client.prepareUpdate(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
indexName = formatIndexName(indexName);
return client.prepareGet(indexName, "type", id);
}
public DeleteByQueryRequestBuilder prepareDelete() {
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
indexName = formatIndexName(indexName);
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
}
public MultiGetRequestBuilder prepareMultiGet() {
return client.prepareMultiGet();
public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
rowHandler.setPrepareMultiGet(prepareMultiGet);
rowHandler.setNamespace(namespace);
rows.forEach(row -> {
rowHandler.accept(row);
});
return rowHandler.getPrepareMultiGet();
}
public abstract static class MultiGetRowHandler<T> implements Consumer<T> {
private MultiGetRequestBuilder prepareMultiGet;
private String namespace;
public void setPrepareMultiGet(MultiGetRequestBuilder prepareMultiGet) {
this.prepareMultiGet = prepareMultiGet;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public void add(String indexName, @Nullable String type, String id) {
indexName = formatIndexName(namespace, indexName);
prepareMultiGet = prepareMultiGet.add(indexName, type, id);
}
private MultiGetRequestBuilder getPrepareMultiGet() {
return prepareMultiGet;
}
}
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
public void update(UpdateRequest updateRequest) {
try {
client.update(updateRequest).get();
} catch (InterruptedException | ExecutionException e) {
logger.error(e.getMessage(), e);
private String formatIndexName(String indexName) {
return formatIndexName(this.namespace, indexName);
}
private static String formatIndexName(String namespace, String indexName) {
if (StringUtils.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
}
return indexName;
}
}
......@@ -39,5 +39,10 @@
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>collector-configuration-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.base.dao;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -31,6 +30,8 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author peng-yongsheng
*/
......@@ -46,7 +47,8 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
protected abstract String tableName();
@Override public final STREAM_DATA get(String id) {
@Override
public final STREAM_DATA get(String id) {
GetResponse getResponse = getClient().prepareGet(tableName(), id).get();
if (getResponse.isExists()) {
STREAM_DATA streamData = esDataToStreamData(getResponse.getSource());
......@@ -59,25 +61,28 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA streamData);
@Override public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
@Override
public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareIndex(tableName(), streamData.getId()).setSource(source);
}
@Override public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
@Override
public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareUpdate(tableName(), streamData.getId()).setDoc(source);
}
protected abstract String timeBucketColumnNameForDelete();
@Override public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket))
.source(tableName())
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
tableName())
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, tableName());
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -45,15 +46,18 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
super(client);
}
@Override public GlobalTrace get(String id) {
@Override
public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
@Override
public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
Map<String, Object> source = new HashMap<>();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getSegmentId());
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getGlobalTraceId());
......@@ -62,13 +66,14 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(GlobalTraceTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
GlobalTraceTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GlobalTraceTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
......@@ -44,15 +45,18 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
super(client);
}
@Override public SegmentDuration get(String id) {
@Override
public SegmentDuration get(String id) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
@Override
public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
logger.debug("segment cost prepareBatchInsert, getApplicationId: {}", data.getId());
Map<String, Object> source = new HashMap<>();
source.put(SegmentDurationTable.COLUMN_SEGMENT_ID, data.getSegmentId());
......@@ -67,13 +71,14 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
return getClient().prepareIndex(SegmentDurationTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(SegmentDurationTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
SegmentDurationTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentDurationTable.TABLE);
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
......@@ -45,15 +46,18 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
super(client);
}
@Override public Segment get(String id) {
@Override
public Segment get(String id) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Segment data) {
@Override
public IndexRequestBuilder prepareBatchInsert(Segment data) {
Map<String, Object> source = new HashMap<>();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBinary())));
source.put(SegmentTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
......@@ -61,13 +65,14 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(SegmentTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
SegmentTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentTable.TABLE);
......
......@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
......@@ -34,6 +32,9 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
*/
......@@ -45,27 +46,29 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
super(client);
}
@Override public ApplicationAlarm get(String id) {
@Override
public ApplicationAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ApplicationAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationAlarm instanceAlarm = new ApplicationAlarm();
instanceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
instanceAlarm.setApplicationId(((Number)source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
instanceAlarm.setSourceValue(((Number)source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
instanceAlarm.setApplicationId(((Number) source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
instanceAlarm.setSourceValue(((Number) source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
instanceAlarm.setAlarmType(((Number)source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
instanceAlarm.setAlarmContent((String)source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
instanceAlarm.setAlarmType(((Number) source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
instanceAlarm.setAlarmContent((String) source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
instanceAlarm.setLastTimeBucket(((Number)source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
instanceAlarm.setLastTimeBucket(((Number) source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceAlarm;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
......@@ -78,7 +81,8 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
return getClient().prepareIndex(ApplicationAlarmTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
......@@ -91,13 +95,14 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
return getClient().prepareUpdate(ApplicationAlarmTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ApplicationAlarmTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ApplicationAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ApplicationAlarmTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
......@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
super(client);
}
@Override public ApplicationReferenceAlarm get(String id) {
@Override
public ApplicationReferenceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ApplicationReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationReferenceAlarm applicationReferenceAlarm = new ApplicationReferenceAlarm();
applicationReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
applicationReferenceAlarm.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
applicationReferenceAlarm.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
applicationReferenceAlarm.setSourceValue(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
applicationReferenceAlarm.setFrontApplicationId(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
applicationReferenceAlarm.setBehindApplicationId(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
applicationReferenceAlarm.setSourceValue(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
applicationReferenceAlarm.setAlarmType(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
applicationReferenceAlarm.setAlarmContent((String)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
applicationReferenceAlarm.setAlarmType(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
applicationReferenceAlarm.setAlarmContent((String) source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
applicationReferenceAlarm.setLastTimeBucket(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
applicationReferenceAlarm.setLastTimeBucket(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return applicationReferenceAlarm;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
return getClient().prepareIndex(ApplicationReferenceAlarmTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
return getClient().prepareUpdate(ApplicationReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ApplicationReferenceAlarmTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ApplicationReferenceAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ApplicationReferenceAlarmTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
......@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
super(client);
}
@Override public ApplicationReferenceAlarmList get(String id) {
@Override
public ApplicationReferenceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(ApplicationReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationReferenceAlarmList applicationReferenceAlarmList = new ApplicationReferenceAlarmList();
applicationReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
applicationReferenceAlarmList.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
applicationReferenceAlarmList.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
applicationReferenceAlarmList.setSourceValue(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
applicationReferenceAlarmList.setFrontApplicationId(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
applicationReferenceAlarmList.setBehindApplicationId(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
applicationReferenceAlarmList.setSourceValue(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
applicationReferenceAlarmList.setAlarmType(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
applicationReferenceAlarmList.setAlarmContent((String)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
applicationReferenceAlarmList.setAlarmType(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
applicationReferenceAlarmList.setAlarmContent((String) source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
applicationReferenceAlarmList.setTimeBucket(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
applicationReferenceAlarmList.setTimeBucket(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return applicationReferenceAlarmList;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarmList data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
return getClient().prepareIndex(ApplicationReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarmList data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
return getClient().prepareUpdate(ApplicationReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ApplicationReferenceAlarmListTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ApplicationReferenceAlarmListTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ApplicationReferenceAlarmListTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
......@@ -45,27 +46,29 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
super(client);
}
@Override public InstanceAlarm get(String id) {
@Override
public InstanceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceAlarm instanceAlarm = new InstanceAlarm();
instanceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
instanceAlarm.setApplicationId(((Number)source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
instanceAlarm.setInstanceId(((Number)source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
instanceAlarm.setSourceValue(((Number)source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
instanceAlarm.setApplicationId(((Number) source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
instanceAlarm.setInstanceId(((Number) source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
instanceAlarm.setSourceValue(((Number) source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
instanceAlarm.setAlarmType(((Number)source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
instanceAlarm.setAlarmContent((String)source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
instanceAlarm.setAlarmType(((Number) source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
instanceAlarm.setAlarmContent((String) source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
instanceAlarm.setLastTimeBucket(((Number)source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
instanceAlarm.setLastTimeBucket(((Number) source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceAlarm;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
@Override
public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -79,7 +82,8 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
return getClient().prepareIndex(InstanceAlarmTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -93,13 +97,14 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
return getClient().prepareUpdate(InstanceAlarmTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstanceAlarmTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
InstanceAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceAlarmTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
......@@ -45,27 +46,29 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
super(client);
}
@Override public InstanceAlarmList get(String id) {
@Override
public InstanceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceAlarmList instanceAlarmList = new InstanceAlarmList();
instanceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
instanceAlarmList.setApplicationId(((Number)source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
instanceAlarmList.setInstanceId(((Number)source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
instanceAlarmList.setSourceValue(((Number)source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
instanceAlarmList.setApplicationId(((Number) source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
instanceAlarmList.setInstanceId(((Number) source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
instanceAlarmList.setSourceValue(((Number) source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
instanceAlarmList.setAlarmType(((Number)source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
instanceAlarmList.setAlarmContent((String)source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
instanceAlarmList.setAlarmType(((Number) source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
instanceAlarmList.setAlarmContent((String) source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
instanceAlarmList.setTimeBucket(((Number)source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
instanceAlarmList.setTimeBucket(((Number) source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return instanceAlarmList;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
@Override
public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -79,7 +82,8 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
return getClient().prepareIndex(InstanceAlarmListTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -93,13 +97,14 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
return getClient().prepareUpdate(InstanceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstanceAlarmListTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
InstanceAlarmListTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceAlarmListTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
......@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
super(client);
}
@Override public InstanceReferenceAlarm get(String id) {
@Override
public InstanceReferenceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceReferenceAlarm instanceReferenceAlarm = new InstanceReferenceAlarm();
instanceReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
instanceReferenceAlarm.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
instanceReferenceAlarm.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
instanceReferenceAlarm.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
instanceReferenceAlarm.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
instanceReferenceAlarm.setSourceValue(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
instanceReferenceAlarm.setFrontApplicationId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
instanceReferenceAlarm.setBehindApplicationId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
instanceReferenceAlarm.setFrontInstanceId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
instanceReferenceAlarm.setBehindInstanceId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
instanceReferenceAlarm.setSourceValue(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
instanceReferenceAlarm.setAlarmType(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
instanceReferenceAlarm.setAlarmContent((String)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
instanceReferenceAlarm.setAlarmType(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
instanceReferenceAlarm.setAlarmContent((String) source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
instanceReferenceAlarm.setLastTimeBucket(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
instanceReferenceAlarm.setLastTimeBucket(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceReferenceAlarm;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data) {
@Override
public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
return getClient().prepareIndex(InstanceReferenceAlarmTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
return getClient().prepareUpdate(InstanceReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstanceReferenceAlarmTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
InstanceReferenceAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceAlarmTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
......@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
super(client);
}
@Override public InstanceReferenceAlarmList get(String id) {
@Override
public InstanceReferenceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceReferenceAlarmList serviceReferenceAlarmList = new InstanceReferenceAlarmList();
serviceReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setSourceValue(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceAlarmList.setFrontApplicationId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setBehindApplicationId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setFrontInstanceId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setBehindInstanceId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setSourceValue(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceAlarmList.setAlarmType(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
serviceReferenceAlarmList.setAlarmContent((String)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
serviceReferenceAlarmList.setAlarmType(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
serviceReferenceAlarmList.setAlarmContent((String) source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
serviceReferenceAlarmList.setTimeBucket(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
serviceReferenceAlarmList.setTimeBucket(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceAlarmList;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList data) {
@Override
public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareIndex(InstanceReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareUpdate(InstanceReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstanceReferenceAlarmListTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
InstanceReferenceAlarmListTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceAlarmListTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
......@@ -45,28 +46,30 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
super(client);
}
@Override public ServiceAlarm get(String id) {
@Override
public ServiceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceAlarm serviceAlarm = new ServiceAlarm();
serviceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
serviceAlarm.setApplicationId(((Number)source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
serviceAlarm.setInstanceId(((Number)source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
serviceAlarm.setServiceId(((Number)source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
serviceAlarm.setSourceValue(((Number)source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
serviceAlarm.setApplicationId(((Number) source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
serviceAlarm.setInstanceId(((Number) source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
serviceAlarm.setServiceId(((Number) source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
serviceAlarm.setSourceValue(((Number) source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
serviceAlarm.setAlarmType(((Number)source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
serviceAlarm.setAlarmContent((String)source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
serviceAlarm.setAlarmType(((Number) source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
serviceAlarm.setAlarmContent((String) source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
serviceAlarm.setLastTimeBucket(((Number)source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
serviceAlarm.setLastTimeBucket(((Number) source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return serviceAlarm;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -81,7 +84,8 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
return getClient().prepareIndex(ServiceAlarmTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -96,13 +100,14 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
return getClient().prepareUpdate(ServiceAlarmTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceAlarmTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ServiceAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceAlarmTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
......@@ -45,28 +46,30 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
super(client);
}
@Override public ServiceAlarmList get(String id) {
@Override
public ServiceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceAlarmList serviceAlarmList = new ServiceAlarmList();
serviceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
serviceAlarmList.setApplicationId(((Number)source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
serviceAlarmList.setInstanceId(((Number)source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
serviceAlarmList.setServiceId(((Number)source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
serviceAlarmList.setSourceValue(((Number)source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
serviceAlarmList.setApplicationId(((Number) source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
serviceAlarmList.setInstanceId(((Number) source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
serviceAlarmList.setServiceId(((Number) source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
serviceAlarmList.setSourceValue(((Number) source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
serviceAlarmList.setAlarmType(((Number)source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
serviceAlarmList.setAlarmContent((String)source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
serviceAlarmList.setAlarmType(((Number) source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
serviceAlarmList.setAlarmContent((String) source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
serviceAlarmList.setTimeBucket(((Number)source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
serviceAlarmList.setTimeBucket(((Number) source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceAlarmList;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -81,7 +84,8 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
return getClient().prepareIndex(ServiceAlarmListTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
......@@ -96,13 +100,14 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
return getClient().prepareUpdate(ServiceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceAlarmListTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ServiceAlarmListTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceAlarmListTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
......@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
super(client);
}
@Override public ServiceReferenceAlarm get(String id) {
@Override
public ServiceReferenceAlarm get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceAlarm serviceReferenceAlarm = new ServiceReferenceAlarm();
serviceReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
serviceReferenceAlarm.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceAlarm.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceAlarm.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceAlarm.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceAlarm.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceAlarm.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceAlarm.setSourceValue(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceAlarm.setAlarmType(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
serviceReferenceAlarm.setAlarmContent((String)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
serviceReferenceAlarm.setLastTimeBucket(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
serviceReferenceAlarm.setFrontApplicationId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceAlarm.setBehindApplicationId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceAlarm.setFrontInstanceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceAlarm.setBehindInstanceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceAlarm.setFrontServiceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceAlarm.setBehindServiceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceAlarm.setSourceValue(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceAlarm.setAlarmType(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
serviceReferenceAlarm.setAlarmContent((String) source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
serviceReferenceAlarm.setLastTimeBucket(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return serviceReferenceAlarm;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
return getClient().prepareIndex(ServiceReferenceAlarmTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
return getClient().prepareUpdate(ServiceReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceReferenceAlarmTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ServiceReferenceAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceAlarmTable.TABLE);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
......@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
super(client);
}
@Override public ServiceReferenceAlarmList get(String id) {
@Override
public ServiceReferenceAlarmList get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceAlarmList serviceReferenceAlarmList = new ServiceReferenceAlarmList();
serviceReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceAlarmList.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceAlarmList.setSourceValue(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceAlarmList.setAlarmType(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
serviceReferenceAlarmList.setAlarmContent((String)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
serviceReferenceAlarmList.setTimeBucket(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
serviceReferenceAlarmList.setFrontApplicationId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setBehindApplicationId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceAlarmList.setFrontInstanceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setBehindInstanceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceAlarmList.setFrontServiceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceAlarmList.setBehindServiceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceAlarmList.setSourceValue(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceAlarmList.setAlarmType(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
serviceReferenceAlarmList.setAlarmContent((String) source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
serviceReferenceAlarmList.setTimeBucket(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceAlarmList;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList data) {
@Override
public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareIndex(ServiceReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList data) {
@Override
public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
......@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
return getClient().prepareUpdate(ServiceReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceReferenceAlarmListTable.TABLE)
.get();
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
ServiceReferenceAlarmListTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceAlarmListTable.TABLE);
......
......@@ -18,13 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
*/
......@@ -34,28 +35,31 @@ public abstract class AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
super(client);
}
@Override protected final String timeBucketColumnNameForDelete() {
@Override
protected final String timeBucketColumnNameForDelete() {
return MemoryPoolMetricTable.COLUMN_TIME_BUCKET;
}
@Override protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> source) {
@Override
protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> source) {
MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric();
memoryPoolMetric.setMetricId((String)source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
memoryPoolMetric.setMetricId((String) source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
memoryPoolMetric.setInstanceId(((Number)source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
memoryPoolMetric.setPoolType(((Number)source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
memoryPoolMetric.setInstanceId(((Number) source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
memoryPoolMetric.setPoolType(((Number) source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
memoryPoolMetric.setInit(((Number)source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
memoryPoolMetric.setMax(((Number)source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
memoryPoolMetric.setUsed(((Number)source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
memoryPoolMetric.setCommitted(((Number)source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
memoryPoolMetric.setTimes(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
memoryPoolMetric.setInit(((Number) source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
memoryPoolMetric.setMax(((Number) source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
memoryPoolMetric.setUsed(((Number) source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
memoryPoolMetric.setCommitted(((Number) source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
memoryPoolMetric.setTimes(((Number) source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
memoryPoolMetric.setTimeBucket(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
memoryPoolMetric.setTimeBucket(((Number) source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
return memoryPoolMetric;
}
@Override protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
@Override
protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_METRIC_ID, streamData.getMetricId());
......
......@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.register;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
......@@ -28,10 +26,13 @@ import org.apache.skywalking.apm.collector.storage.table.register.Instance;
import org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
*/
......@@ -43,15 +44,18 @@ public class InstanceRegisterEsDAO extends EsDAO implements IInstanceRegisterDAO
super(client);
}
@Override public int getMaxInstanceId() {
@Override
public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public int getMinInstanceId() {
@Override
public int getMinInstanceId() {
return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public void save(Instance instance) {
@Override
public void save(Instance instance) {
logger.debug("save instance register info, application getApplicationId: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
......@@ -69,18 +73,14 @@ public class InstanceRegisterEsDAO extends EsDAO implements IInstanceRegisterDAO
logger.debug("save instance register info, application getApplicationId: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
}
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
ElasticSearchClient client = getClient();
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(InstanceTable.TABLE);
updateRequest.type(InstanceTable.TABLE_TYPE);
updateRequest.id(String.valueOf(instanceId));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@Override
public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
UpdateRequestBuilder updateRequestBuilder = getClient().prepareUpdate(InstanceTable.TABLE, String.valueOf(instanceId));
updateRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartbeatTime));
updateRequestBuilder.setDoc(source);
updateRequest.doc(source);
client.update(updateRequest);
updateRequestBuilder.get();
}
}
......@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
......@@ -32,6 +30,9 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import java.util.LinkedList;
import java.util.List;
/**
* @author peng-yongsheng
*/
......@@ -41,22 +42,26 @@ public class CpuMetricEsUIDAO extends EsDAO implements ICpuMetricUIDAO {
super(client);
}
@Override public List<Integer> getCPUTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
@Override
public List<Integer> getCPUTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, CpuMetricTable.TABLE);
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
prepareMultiGet.add(tableName, CpuMetricTable.TABLE_TYPE, id);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
this.add(tableName, CpuMetricTable.TABLE_TYPE, id);
}
});
List<Integer> cpuTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
double cpuUsed = ((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
long times = ((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
cpuTrends.add((int)((cpuUsed / times) * 100));
double cpuUsed = ((Number) response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
long times = ((Number) response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
cpuTrends.add((int) ((cpuUsed / times) * 100));
} else {
cpuTrends.add(0);
}
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
......@@ -42,30 +43,34 @@ public class GCMetricEsUIDAO extends EsDAO implements IGCMetricUIDAO {
super(client);
}
@Override public List<Integer> getYoungGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
@Override
public List<Integer> getYoungGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getGCTrend(instanceId, step, durationPoints, GCPhrase.NEW_VALUE);
}
@Override public List<Integer> getOldGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
@Override
public List<Integer> getOldGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getGCTrend(instanceId, step, durationPoints, GCPhrase.OLD_VALUE);
}
private List<Integer> getGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints, int gcPhrase) {
String tableName = TimePyramidTableNameBuilder.build(step, GCMetricTable.TABLE);
MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet();
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + gcPhrase;
youngPrepareMultiGet.add(tableName, GCMetricTable.TABLE_TYPE, id);
MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + gcPhrase;
add(tableName, GCMetricTable.TABLE_TYPE, id);
}
});
List<Integer> gcTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = youngPrepareMultiGet.get();
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
if (itemResponse.getResponse().isExists()) {
long count = ((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
long times = ((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
gcTrends.add((int)(count / times));
long count = ((Number) itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
long times = ((Number) itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
gcTrends.add((int) (count / times));
} else {
gcTrends.add(0);
}
......
......@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
......@@ -44,6 +42,9 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import java.util.LinkedList;
import java.util.List;
/**
* @author peng-yongsheng
*/
......@@ -53,8 +54,9 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
super(client);
}
@Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
@Override
public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
......@@ -82,8 +84,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
instanceIdTerms.getBuckets().forEach(instanceIdTerm -> {
int instanceId = instanceIdTerm.getKeyAsNumber().intValue();
Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
long calls = (long)callSum.getValue();
int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween);
long calls = (long) callSum.getValue();
int callsPerSec = (int) (secondBetween == 0 ? 0 : calls / secondBetween);
AppServerInfo appServerInfo = new AppServerInfo();
appServerInfo.setId(instanceId);
......@@ -103,13 +105,15 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
}
}
@Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
@Override
public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
add(tableName, InstanceMetricTable.TABLE_TYPE, id);
}
});
List<Integer> throughputTrend = new LinkedList<>();
......@@ -118,8 +122,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
for (int i = 0; i < multiGetResponse.getResponses().length; i++) {
MultiGetItemResponse response = multiGetResponse.getResponses()[i];
if (response.getResponse().isExists()) {
long callTimes = ((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
throughputTrend.add((int)(callTimes / durationPoints.get(i).getSecondsBetween()));
long callTimes = ((Number) response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
throughputTrend.add((int) (callTimes / durationPoints.get(i).getSecondsBetween()));
} else {
throughputTrend.add(0);
}
......@@ -127,15 +131,19 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
return throughputTrend;
}
@Override public List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
@Override
public List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
add(tableName, InstanceMetricTable.TABLE_TYPE, id);
}
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
});
List<Integer> responseTimeTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
......@@ -32,6 +31,8 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import java.util.List;
/**
* @author peng-yongsheng
*/
......@@ -41,38 +42,42 @@ public class MemoryMetricEsUIDAO extends EsDAO implements IMemoryMetricUIDAO {
super(client);
}
@Override public Trend getHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
@Override
public Trend getHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getMemoryTrend(instanceId, step, durationPoints, true);
}
@Override public Trend getNoHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
@Override
public Trend getNoHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
return getMemoryTrend(instanceId, step, durationPoints, false);
}
private Trend getMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints,
boolean isHeap) {
boolean isHeap) {
String tableName = TimePyramidTableNameBuilder.build(step, MemoryMetricTable.TABLE);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
add(tableName, MemoryMetricTable.TABLE_TYPE, id);
}
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
prepareMultiGet.add(tableName, MemoryMetricTable.TABLE_TYPE, id);
});
Trend trend = new Trend();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
long max = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
long used = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
long times = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
long max = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
long used = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
long times = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
trend.getMetrics().add((int)(used / times));
trend.getMetrics().add((int) (used / times));
if (max < 0) {
trend.getMaxMetrics().add((int)(used / times));
trend.getMaxMetrics().add((int) (used / times));
} else {
trend.getMaxMetrics().add((int)(max / times));
trend.getMaxMetrics().add((int) (max / times));
}
} else {
trend.getMetrics().add(0);
......
......@@ -18,17 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
/**
* @author peng-yongsheng
......@@ -39,15 +36,16 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricU
super(client);
}
@Override public JsonObject getMetric(int instanceId, long timeBucket, int poolType) {
@Override
public JsonObject getMetric(int instanceId, long timeBucket, int poolType) {
String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType;
GetResponse getResponse = getClient().prepareGet(MemoryPoolMetricTable.TABLE, id).get();
JsonObject metric = new JsonObject();
if (getResponse.isExists()) {
metric.addProperty("max", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
metric.addProperty("init", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
metric.addProperty("used", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
metric.addProperty("max", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
metric.addProperty("init", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
metric.addProperty("used", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
} else {
metric.addProperty("max", 0);
metric.addProperty("init", 0);
......@@ -56,32 +54,8 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricU
return metric;
}
@Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
long timeBucket = startTimeBucket;
do {
// timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND, timeBucket, 1);
String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType;
prepareMultiGet.add(MemoryPoolMetricTable.TABLE, MemoryPoolMetricTable.TABLE_TYPE, id);
}
while (timeBucket <= endTimeBucket);
JsonObject metric = new JsonObject();
JsonArray usedMetric = new JsonArray();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
metric.addProperty("max", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
metric.addProperty("init", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
usedMetric.add(((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
} else {
metric.addProperty("max", 0);
metric.addProperty("init", 0);
usedMetric.add(0);
}
}
metric.add("used", usedMetric);
return metric;
@Override
public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) {
throw new UnexpectedException("Not implement methodø");
}
}
......@@ -18,11 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
......@@ -51,6 +46,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import java.util.*;
/**
* @author peng-yongsheng
*/
......@@ -62,23 +59,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<Integer> getServiceResponseTimeTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
add(tableName, ServiceMetricTable.TABLE_TYPE, id);
}
});
List<Integer> trends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long errorCalls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
long durationSum = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
long errorDurationSum = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
trends.add((int)((durationSum - errorDurationSum) / (calls - errorCalls)));
long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long errorCalls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
long durationSum = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
long errorDurationSum = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
trends.add((int) ((durationSum - errorDurationSum) / (calls - errorCalls)));
} else {
trends.add(0);
}
......@@ -86,13 +84,15 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
return trends;
}
@Override public List<Integer> getServiceTPSTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
@Override
public List<Integer> getServiceTPSTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
add(tableName, ServiceMetricTable.TABLE_TYPE, id);
}
});
List<Integer> trends = new LinkedList<>();
......@@ -101,9 +101,9 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
int index = 0;
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long secondBetween = durationPoints.get(index).getSecondsBetween();
trends.add((int)(calls / secondBetween));
trends.add((int) (calls / secondBetween));
} else {
trends.add(0);
}
......@@ -112,22 +112,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
return trends;
}
@Override public List<Integer> getServiceSLATrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
@Override
public List<Integer> getServiceSLATrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
durationPoints.forEach(durationPoint -> {
String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
@Override
public void accept(DurationPoint durationPoint) {
String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
add(tableName, ServiceMetricTable.TABLE_TYPE, id);
}
});
List<Integer> trends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long errorCalls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
trends.add((int)(((calls - errorCalls) / calls)) * 10000);
long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long errorCalls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
trends.add((int) (((calls - errorCalls) / calls)) * 10000);
} else {
trends.add(10000);
}
......@@ -137,7 +139,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<Node> getServicesMetric(Step step, long startTime, long endTime, MetricSource metricSource,
Collection<Integer> serviceIds) {
Collection<Integer> serviceIds) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
......@@ -169,8 +171,8 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
ServiceNode serviceNode = new ServiceNode();
serviceNode.setId(serviceId);
serviceNode.setCalls((long)callsSum.getValue());
serviceNode.setSla((int)(((callsSum.getValue() - errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
serviceNode.setCalls((long) callsSum.getValue());
serviceNode.setSla((int) (((callsSum.getValue() - errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
nodes.add(serviceNode);
});
return nodes;
......@@ -178,7 +180,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
@Override
public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
Integer topN, MetricSource metricSource) {
Integer topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
......@@ -202,12 +204,12 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
Set<Integer> serviceIds = new HashSet<>();
List<ServiceMetric> serviceMetrics = new LinkedList<>();
for (SearchHit searchHit : searchHits) {
int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
int serviceId = ((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
if (!serviceIds.contains(serviceId)) {
ServiceMetric serviceMetric = new ServiceMetric();
serviceMetric.setId(serviceId);
serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
serviceMetric.setCalls(((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
serviceMetric.setAvgResponseTime(((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
serviceMetrics.add(serviceMetric);
serviceIds.add(serviceId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册