提交 c4387a3e 编写于 作者: P peng-yongsheng

refactor es persistence DAO.

上级 506c00b4
......@@ -16,20 +16,19 @@
*
*/
package org.apache.skywalking.apm.collector.storage.base.dao;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
/**
* @author peng-yongsheng
*/
public interface IPersistenceDAO<Insert, Update, DataImpl extends AbstractData> extends DAO {
DataImpl get(String id);
public interface IPersistenceDAO<Insert, Update, STREAM_DATA extends StreamData> extends DAO {
STREAM_DATA get(String id);
Insert prepareBatchInsert(DataImpl data);
Insert prepareBatchInsert(STREAM_DATA data);
Update prepareBatchUpdate(DataImpl data);
Update prepareBatchUpdate(STREAM_DATA data);
void deleteHistory(Long startTimestamp, Long endTimestamp);
}
......@@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric
/**
* @author peng-yongsheng
*/
public interface IInstanceMetricPersistenceDAO<Insert, Update, DataImpl extends InstanceMetric> extends IPersistenceDAO<Insert, Update, DataImpl> {
public interface IInstanceMetricPersistenceDAO<INSERT, UPDATE, STREAM_DATA extends InstanceMetric> extends IPersistenceDAO<INSERT, UPDATE, STREAM_DATA> {
}
......@@ -16,18 +16,17 @@
*
*/
package org.apache.skywalking.apm.collector.storage.table.application;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class ApplicationComponent extends AbstractData {
public class ApplicationComponent extends StreamData {
private static final Column[] STRING_COLUMNS = {
new Column(ApplicationComponentTable.COLUMN_ID, new NonOperation()),
......@@ -45,8 +44,24 @@ public class ApplicationComponent extends AbstractData {
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public ApplicationComponent(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
public ApplicationComponent() {
super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override public String getId() {
return null;
}
@Override public void setId(String id) {
}
@Override public String getMetricId() {
return null;
}
@Override public void setMetricId(String metricId) {
}
public Long getTimeBucket() {
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.apm.collector.storage.table.instance;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
......@@ -28,10 +28,11 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
/**
* @author peng-yongsheng
*/
public class InstanceMetric extends AbstractData implements Metric {
public class InstanceMetric extends StreamData implements Metric {
private static final Column[] STRING_COLUMNS = {
new Column(InstanceMetricTable.COLUMN_ID, new NonOperation()),
new Column(InstanceMetricTable.COLUMN_METRIC_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
......@@ -62,8 +63,24 @@ public class InstanceMetric extends AbstractData implements Metric {
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public InstanceMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
public InstanceMetric() {
super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override public String getId() {
return getDataString(0);
}
@Override public void setId(String id) {
setDataString(0, id);
}
@Override public String getMetricId() {
return getDataString(1);
}
@Override public void setMetricId(String metricId) {
setDataString(1, metricId);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.base.dao;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> extends EsDAO implements IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, STREAM_DATA> {
private final Logger logger = LoggerFactory.getLogger(AbstractPersistenceEsDAO.class);
protected AbstractPersistenceEsDAO(ElasticSearchClient client) {
super(client);
}
protected abstract String tableName();
protected abstract STREAM_DATA esDataToStreamData(Map<String, Object> source);
@Override public final STREAM_DATA get(String id) {
GetResponse getResponse = getClient().prepareGet(tableName(), id).get();
if (getResponse.isExists()) {
return esDataToStreamData(getResponse.getSource());
} else {
return null;
}
}
protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA streamData);
@Override public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareIndex(tableName(), streamData.getId()).setSource(source);
}
@Override public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareUpdate(tableName(), streamData.getId()).setDoc(source);
}
protected abstract String timeBucketColumnNameForDelete();
@Override public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket))
.source(tableName())
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, tableName());
}
}
......@@ -16,17 +16,16 @@
*
*/
package org.apache.skywalking.apm.collector.storage.es.base.dao;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,15 +45,14 @@ public class BatchEsDAO extends EsDAO implements IBatchDAO {
logger.debug("bulk data size: {}", batchCollection.size());
if (CollectionUtils.isNotEmpty(batchCollection)) {
for (int i = 0; i < batchCollection.size(); i++) {
Object builder = batchCollection.get(i);
batchCollection.forEach(builder -> {
if (builder instanceof IndexRequestBuilder) {
bulkRequest.add((IndexRequestBuilder)builder);
}
if (builder instanceof UpdateRequestBuilder) {
bulkRequest.add((UpdateRequestBuilder)builder);
}
}
});
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
......
......@@ -16,9 +16,10 @@
*
*/
package org.apache.skywalking.apm.collector.storage.es.base.dao;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.base.dao.AbstractDAO;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
......@@ -26,8 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.base.dao.AbstractDAO;
/**
* @author peng-yongsheng
......@@ -38,7 +37,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
super(client);
}
public final int getMaxId(String indexName, String columnName) {
protected final int getMaxId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
searchRequestBuilder.setTypes("type");
......@@ -57,7 +56,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
}
}
public final int getMinId(String indexName, String columnName) {
protected final int getMinId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
searchRequestBuilder.setTypes("type");
......
......@@ -21,124 +21,82 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceMetricEsPersistenceDAO extends EsDAO implements IInstanceMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceMetric> {
private final Logger logger = LoggerFactory.getLogger(InstanceMetricEsPersistenceDAO.class);
public class InstanceMetricEsPersistenceDAO extends AbstractPersistenceEsDAO<InstanceMetric> implements IInstanceMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceMetric> {
public InstanceMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public InstanceMetric get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceMetricTable.TABLE, id).get();
if (getResponse.isExists()) {
logger.debug("getId: {} is exist", id);
InstanceMetric instanceMetric = new InstanceMetric(id);
Map<String, Object> source = getResponse.getSource();
instanceMetric.setApplicationId((Integer)source.get(InstanceMetricTable.COLUMN_APPLICATION_ID));
instanceMetric.setInstanceId((Integer)source.get(InstanceMetricTable.COLUMN_INSTANCE_ID));
instanceMetric.setSourceValue((Integer)source.get(InstanceMetricTable.COLUMN_SOURCE_VALUE));
instanceMetric.setTransactionCalls(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
instanceMetric.setTransactionErrorCalls(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue());
instanceMetric.setTransactionDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue());
instanceMetric.setTransactionErrorDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue());
instanceMetric.setBusinessTransactionCalls(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS)).longValue());
instanceMetric.setBusinessTransactionErrorCalls(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS)).longValue());
instanceMetric.setBusinessTransactionDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM)).longValue());
instanceMetric.setBusinessTransactionErrorDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue());
instanceMetric.setMqTransactionCalls(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_CALLS)).longValue());
instanceMetric.setMqTransactionErrorCalls(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS)).longValue());
instanceMetric.setMqTransactionDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM)).longValue());
instanceMetric.setMqTransactionErrorDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM)).longValue());
instanceMetric.setTimeBucket(((Number)source.get(InstanceMetricTable.COLUMN_TIME_BUCKET)).longValue());
return instanceMetric;
} else {
return null;
}
@Override protected String tableName() {
return InstanceMetricTable.TABLE;
}
@Override public IndexRequestBuilder prepareBatchInsert(InstanceMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceMetricTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceMetricTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(InstanceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, data.getTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, data.getTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, data.getBusinessTransactionCalls());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, data.getBusinessTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, data.getBusinessTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, data.getBusinessTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, data.getMqTransactionCalls());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, data.getMqTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, data.getMqTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, data.getMqTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
@Override protected String timeBucketColumnNameForDelete() {
return InstanceMetricTable.COLUMN_TIME_BUCKET;
}
return getClient().prepareIndex(InstanceMetricTable.TABLE, data.getId()).setSource(source);
@Override protected InstanceMetric esDataToStreamData(Map<String, Object> source) {
InstanceMetric instanceMetric = new InstanceMetric();
instanceMetric.setId((String)source.get(InstanceMetricTable.COLUMN_ID));
instanceMetric.setMetricId((String)source.get(InstanceMetricTable.COLUMN_METRIC_ID));
instanceMetric.setApplicationId((Integer)source.get(InstanceMetricTable.COLUMN_APPLICATION_ID));
instanceMetric.setInstanceId((Integer)source.get(InstanceMetricTable.COLUMN_INSTANCE_ID));
instanceMetric.setSourceValue((Integer)source.get(InstanceMetricTable.COLUMN_SOURCE_VALUE));
instanceMetric.setTransactionCalls(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
instanceMetric.setTransactionErrorCalls(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue());
instanceMetric.setTransactionDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue());
instanceMetric.setTransactionErrorDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue());
instanceMetric.setBusinessTransactionCalls(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS)).longValue());
instanceMetric.setBusinessTransactionErrorCalls(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS)).longValue());
instanceMetric.setBusinessTransactionDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM)).longValue());
instanceMetric.setBusinessTransactionErrorDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue());
instanceMetric.setMqTransactionCalls(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_CALLS)).longValue());
instanceMetric.setMqTransactionErrorCalls(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS)).longValue());
instanceMetric.setMqTransactionDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM)).longValue());
instanceMetric.setMqTransactionErrorDurationSum(((Number)source.get(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM)).longValue());
instanceMetric.setTimeBucket(((Number)source.get(InstanceMetricTable.COLUMN_TIME_BUCKET)).longValue());
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(InstanceMetric data) {
@Override protected Map<String, Object> esStreamDataToEsData(InstanceMetric streamData) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceMetricTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceMetricTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(InstanceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(InstanceMetricTable.COLUMN_METRIC_ID, streamData.getMetricId());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, data.getTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, data.getTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_APPLICATION_ID, streamData.getApplicationId());
source.put(InstanceMetricTable.COLUMN_INSTANCE_ID, streamData.getInstanceId());
source.put(InstanceMetricTable.COLUMN_SOURCE_VALUE, streamData.getSourceValue());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, data.getBusinessTransactionCalls());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, data.getBusinessTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, data.getBusinessTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, data.getBusinessTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, streamData.getTransactionCalls());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, streamData.getTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, streamData.getTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, streamData.getTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, data.getMqTransactionCalls());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, data.getMqTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, data.getMqTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, data.getMqTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, streamData.getBusinessTransactionCalls());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, streamData.getBusinessTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, streamData.getBusinessTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, streamData.getBusinessTransactionErrorDurationSum());
source.put(InstanceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(InstanceMetricTable.TABLE, data.getId()).setDoc(source);
}
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, streamData.getMqTransactionCalls());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, streamData.getMqTransactionErrorCalls());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, streamData.getMqTransactionDurationSum());
source.put(InstanceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, streamData.getMqTransactionErrorDurationSum());
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstanceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstanceMetricTable.TABLE)
.get();
source.put(InstanceMetricTable.COLUMN_TIME_BUCKET, streamData.getTimeBucket());
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceMetricTable.TABLE);
return source;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册