提交 6338de39 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Improve OAP server performance. (#3127)

* Improve OAP server performance.
上级 b1dea4c5
......@@ -31,8 +31,6 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
......@@ -42,7 +40,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private final Model model;
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final IMetricsDAO<?, ?> metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
......@@ -99,40 +97,64 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
@Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
long start = System.currentTimeMillis();
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(data -> {
Collection<Metrics> collection = cache.getLast().collection();
int i = 0;
Metrics[] metrics = null;
for (Metrics data : collection) {
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
nextExportWorker.in(event);
}
Metrics dbData = null;
try {
dbData = metricsDAO.get(model, data);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (nonNull(dbData)) {
data.combine(dbData);
data.calculate();
batchCollection.add(metricsDAO.prepareBatchUpdate(model, data));
int batchGetSize = 2000;
int mod = i % batchGetSize;
if (mod == 0) {
int residual = collection.size() - i;
if (residual >= batchGetSize) {
metrics = new Metrics[batchGetSize];
} else {
batchCollection.add(metricsDAO.prepareBatchInsert(model, data));
metrics = new Metrics[residual];
}
if (Objects.nonNull(nextAlarmWorker)) {
nextAlarmWorker.in(data);
}
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL);
nextExportWorker.in(event);
}
metrics[mod] = data;
if (mod == metrics.length - 1) {
try {
Map<String, Metrics> dbMetricsMap = metricsDAO.get(model, metrics);
for (Metrics metric : metrics) {
if (dbMetricsMap.containsKey(metric.id())) {
metric.combine(dbMetricsMap.get(metric.id()));
metric.calculate();
batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric));
} else {
batchCollection.add(metricsDAO.prepareBatchInsert(model, metric));
}
if (Objects.nonNull(nextAlarmWorker)) {
nextAlarmWorker.in(metric);
}
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
nextExportWorker.in(event);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
i++;
}
if (batchCollection.size() > 0) {
logger.debug("prepareBatch model {}, took time: {}", model.getName(), System.currentTimeMillis() - start);
}
return batchCollection;
}
......
......@@ -75,19 +75,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker monthPersistentWorker = null;
if (configService.shouldToHour()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour));
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToDay()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day));
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToMonth()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month));
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute));
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
......
......@@ -65,8 +65,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second));
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO);
Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
......
......@@ -61,7 +61,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second));
Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO);
persistentWorkers.add(persistentWorker);
......
......@@ -56,7 +56,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
streamDataMappingSetter.putIfAbsent(inventoryClass);
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -27,7 +28,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
*/
public interface IMetricsDAO<INSERT, UPDATE> extends DAO {
Metrics get(Model model, Metrics metrics) throws IOException;
Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException;
INSERT prepareBatchInsert(Model model, Metrics metrics) throws IOException;
......
......@@ -61,17 +61,20 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
t -> logger.error("Extract data and save failure.", t)), 1, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
new RunnableWithExceptionProtection(() -> extractDataAndSaveRecord(batchDAO),
t -> logger.error("Extract data and save record failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSaveMetrics(batchDAO),
t -> logger.error("Extract data and save metrics failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
this.isStarted = true;
}
}
@SuppressWarnings("unchecked")
private void extractDataAndSave(IBatchDAO batchDAO) {
private void extractDataAndSaveRecord(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
logger.debug("Extract data and save");
logger.debug("Extract data and save record");
}
long startTime = System.currentTimeMillis();
......@@ -79,36 +82,12 @@ public enum PersistenceTimer {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List records = new LinkedList();
List metrics = new LinkedList();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
if (worker.flushAndSwitch()) {
List<?> batchCollection = worker.buildBatchCollection();
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
}
if (worker instanceof RecordPersistentWorker) {
records.addAll(batchCollection);
} else if (worker instanceof MetricsPersistentWorker) {
metrics.addAll(batchCollection);
} else if (worker instanceof TopNWorker) {
records.addAll(batchCollection);
} else {
logger.error("Missing the worker {}", worker.getClass().getSimpleName());
}
}
});
buildBatchCollection(persistenceWorkers, records);
if (debug) {
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
......@@ -122,6 +101,46 @@ public enum PersistenceTimer {
if (CollectionUtils.isNotEmpty(records)) {
batchDAO.asynchronous(records);
}
} finally {
executeLatencyTimer.finish();
}
} catch (Throwable e) {
errorCounter.inc();
logger.error(e.getMessage(), e);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Persistence data save finish");
}
}
if (debug) {
logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
private void extractDataAndSaveMetrics(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
logger.debug("Extract data and save metrics");
}
long startTime = System.currentTimeMillis();
try {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List metrics = new LinkedList();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers());
buildBatchCollection(persistenceWorkers, metrics);
if (debug) {
logger.info("build metrics batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
}
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
if (CollectionUtils.isNotEmpty(metrics)) {
batchDAO.synchronous(metrics);
}
......@@ -141,4 +160,23 @@ public enum PersistenceTimer {
logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
@SuppressWarnings("unchecked")
private void buildBatchCollection(List<PersistenceWorker> persistenceWorkers, List collection) {
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
if (worker.flushAndSwitch()) {
List<?> batchCollection = worker.buildBatchCollection();
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
}
collection.addAll(batchCollection);
}
});
}
}
......@@ -26,9 +26,9 @@ import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface StorageDAO extends Service {
public interface StorageDAO<INSERT, UPDATE> extends Service {
IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder);
IMetricsDAO<INSERT, UPDATE> newMetricsDao(StorageBuilder<Metrics> storageBuilder);
IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder);
......
......@@ -26,5 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface IModelSetter extends Service {
Model putIfAbsent(Class aClass, int scopeId, Storage storage);
Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record);
}
......@@ -34,13 +34,15 @@ public class Model {
private final boolean deleteHistory;
private final List<ModelColumn> columns;
private final int scopeId;
private final boolean record;
public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling) {
public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling, boolean record) {
this.columns = columns;
this.capableOfTimeSeries = capableOfTimeSeries;
this.downsampling = downsampling;
this.deleteHistory = deleteHistory;
this.scopeId = scopeId;
this.name = ModelName.build(downsampling, name);
this.record = record;
}
}
......@@ -37,7 +37,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
this.models = new LinkedList<>();
}
@Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage) {
@Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record) {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId);
......@@ -50,7 +50,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, storage.getModelName(), modelColumns);
Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling());
Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling(), record);
models.add(model);
return model;
......
......@@ -40,10 +40,9 @@ import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.common.unit.*;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
......@@ -227,29 +226,13 @@ public class ElasticSearchClient implements Client {
return client.get(request);
}
public SearchResponse idQuery(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(id));
return client.search(searchRequest);
}
public Map<String, Map<String, Object>> ids(String indexName, String... ids) throws IOException {
public SearchResponse ids(String indexName, String[] ids) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
SearchResponse response = client.search(searchRequest);
Map<String, Map<String, Object>> result = new HashMap<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
result.put(hit.getId(), hit.getSourceAsMap());
}
return result;
return client.search(searchRequest);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
......@@ -312,7 +295,7 @@ public class ElasticSearchClient implements Client {
}
}
public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
......@@ -325,7 +308,7 @@ public class ElasticSearchClient implements Client {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook().getMillis(), request.requests().size());
}
}
......@@ -337,7 +320,6 @@ public class ElasticSearchClient implements Client {
return BulkProcessor.builder(client::bulkAsync, listener)
.setBulkActions(bulkActions)
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
......
......@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
......@@ -27,26 +29,13 @@ import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.client.*;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -188,7 +177,7 @@ public class ITElasticSearchClient {
@Test
public void bulk() throws InterruptedException {
BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2);
BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2);
Map<String, String> source = new HashMap<>();
source.put("column1", "value1");
......@@ -246,7 +235,7 @@ public class ITElasticSearchClient {
}
private RestHighLevelClient getRestHighLevelClient() {
return (RestHighLevelClient) Whitebox.getInternalState(client, "client");
return (RestHighLevelClient)Whitebox.getInternalState(client, "client");
}
private JsonObject undoFormatIndexName(JsonObject index) {
......
......@@ -75,8 +75,7 @@ storage:
# otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
# monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
# metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
......
......@@ -75,8 +75,7 @@ storage:
otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
......
......@@ -32,7 +32,6 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter private int indexReplicasNumber = 0;
@Setter private int indexRefreshInterval = 2;
@Setter private int bulkActions = 2000;
@Setter private int bulkSize = 20;
@Setter private int flushInterval = 10;
@Setter private int concurrentRequests = 2;
@Setter private int syncBulkActions = 3;
......
......@@ -68,7 +68,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
}
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
......
......@@ -36,22 +36,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
private BulkProcessor bulkProcessor;
private final int bulkActions;
private final int bulkSize;
private final int flushInterval;
private final int concurrentRequests;
public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int flushInterval,
int concurrentRequests) {
super(client);
this.bulkActions = bulkActions;
this.bulkSize = bulkSize;
this.flushInterval = flushInterval;
this.concurrentRequests = concurrentRequests;
}
@Override public void asynchronous(List<?> collection) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
if (logger.isDebugEnabled()) {
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -40,13 +41,20 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO<IndexRequest, Upd
this.storageBuilder = storageBuilder;
}
@Override public Metrics get(Model model, Metrics metrics) throws IOException {
SearchResponse response = getClient().idQuery(model.getName(), metrics.id());
if (response.getHits().totalHits > 0) {
return storageBuilder.map2Data(response.getHits().getAt(0).getSourceAsMap());
} else {
return null;
@Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
Map<String, Metrics> result = new HashMap<>();
String[] ids = new String[metrics.length];
for (int i = 0; i < metrics.length; i++) {
ids[i] = metrics[i].id();
}
SearchResponse response = getClient().ids(model.getName(), ids);
for (int i = 0; i < response.getHits().totalHits; i++) {
Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap());
result.put(source.id(), source);
}
return result;
}
@Override public IndexRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
......
......@@ -23,17 +23,19 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
/**
* @author peng-yongsheng
*/
public class StorageEsDAO extends EsDAO implements StorageDAO {
public class StorageEsDAO extends EsDAO implements StorageDAO<IndexRequest, UpdateRequest> {
public StorageEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
@Override public IMetricsDAO<IndexRequest, UpdateRequest> newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
return new MetricsEsDAO(getClient(), storageBuilder);
}
......
......@@ -64,7 +64,7 @@ public class StorageEsInstaller extends ModelInstaller {
@Override protected void createTable(Client client, Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
JsonObject settings = createSetting();
JsonObject settings = createSetting(model.isRecord());
JsonObject mapping = createMapping(model);
logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping.toString());
......@@ -97,13 +97,12 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
private JsonObject createSetting() {
private JsonObject createSetting(boolean record) {
JsonObject setting = new JsonObject();
setting.addProperty("index.number_of_shards", indexShardsNumber);
setting.addProperty("index.number_of_replicas", indexReplicasNumber);
setting.addProperty("index.refresh_interval", TimeValue.timeValueSeconds(indexRefreshInterval).toString());
setting.addProperty("index.refresh_interval", record ? TimeValue.timeValueSeconds(10).toString() : TimeValue.timeValueSeconds(indexRefreshInterval).toString());
setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
TimeValue.timeValueSeconds(3);
return setting;
}
......
......@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
......@@ -102,15 +103,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
@Override public IntValues getLinearIntValues(String indName, Downsampling downsampling, List<String> ids, String valueCName) throws IOException {
String indexName = ModelName.build(downsampling, indName);
Map<String, Map<String, Object>> response = getClient().ids(indexName, ids.toArray(new String[0]));
SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0]));
Map<String, Map<String, Object>> idMap = toMap(response);
IntValues intValues = new IntValues();
for (String id : ids) {
KVInt kvInt = new KVInt();
kvInt.setId(id);
kvInt.setValue(0);
if (response.containsKey(id)) {
Map<String, Object> source = response.get(id);
if (idMap.containsKey(id)) {
Map<String, Object> source = idMap.get(id);
kvInt.setValue(((Number)source.getOrDefault(valueCName, 0)).longValue());
}
intValues.getValues().add(kvInt);
......@@ -125,11 +127,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
Thermodynamic thermodynamic = new Thermodynamic();
List<List<Long>> thermodynamicValueMatrix = new ArrayList<>();
Map<String, Map<String, Object>> response = getClient().ids(indexName, ids.toArray(new String[0]));
SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0]));
Map<String, Map<String, Object>> idMap = toMap(response);
int numOfSteps = 0;
for (String id : ids) {
Map<String, Object> source = response.get(id);
Map<String, Object> source = idMap.get(id);
if (source == null) {
// add empty list to represent no data exist for this time bucket
thermodynamicValueMatrix.add(new ArrayList<>());
......@@ -159,4 +162,13 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
return thermodynamic;
}
private Map<String, Map<String, Object>> toMap(SearchResponse response) {
Map<String, Map<String, Object>> result = new HashMap<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
result.put(hit.getId(), hit.getSourceAsMap());
}
return result;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -29,6 +30,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
* @author wusheng
*/
public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecutor, SQLExecutor> {
private JDBCHikariCPClient h2Client;
private StorageBuilder<Metrics> storageBuilder;
......@@ -37,8 +39,9 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecut
this.storageBuilder = storageBuilder;
}
@Override public Metrics get(Model model, Metrics metrics) throws IOException {
return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
@Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
// return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
return null;
}
@Override public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
......
......@@ -19,26 +19,17 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
import org.slf4j.*;
/**
* @author wusheng
......@@ -52,9 +43,7 @@ public class H2SQLExecutor {
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE id = ?", id)) {
return toStorageData(rs, modelName, storageBuilder);
}
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
} catch (SQLException | JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
......@@ -65,9 +54,7 @@ public class H2SQLExecutor {
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE " + columnName + " = ?", value)) {
return toStorageData(rs, modelName, storageBuilder);
}
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
} catch (JDBCClientException e) {
} catch (SQLException | JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
......@@ -92,9 +79,7 @@ public class H2SQLExecutor {
return rs.getInt(ServiceInstanceInventory.SEQUENCE);
}
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} catch (JDBCClientException e) {
} catch (SQLException | JDBCClientException e) {
logger.error(e.getMessage(), e);
}
return Const.NONE;
......
......@@ -21,24 +21,22 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
/**
* @author wusheng
*/
public class H2StorageDAO implements StorageDAO {
public class H2StorageDAO implements StorageDAO<SQLExecutor, SQLExecutor> {
private JDBCHikariCPClient h2Client;
public H2StorageDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
@Override public IMetricsDAO<SQLExecutor, SQLExecutor> newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
return new H2MetricsDAO(h2Client, storageBuilder);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册