diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index d9b27f892fda748355436a3d92b0108ac3ba9a77..4e819b40eaa1a8934f4b668cbc318a78c068a4ca 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -31,8 +31,6 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; -import static java.util.Objects.nonNull; - /** * @author peng-yongsheng */ @@ -42,7 +40,7 @@ public class MetricsPersistentWorker extends PersistenceWorker mergeDataCache; - private final IMetricsDAO metricsDAO; + private final IMetricsDAO metricsDAO; private final AbstractWorker nextAlarmWorker; private final AbstractWorker nextExportWorker; private final DataCarrier dataCarrier; @@ -99,40 +97,64 @@ public class MetricsPersistentWorker extends PersistenceWorker prepareBatch(MergeDataCache cache) { + long start = System.currentTimeMillis(); + List batchCollection = new LinkedList<>(); - cache.getLast().collection().forEach(data -> { + + Collection collection = cache.getLast().collection(); + + int i = 0; + Metrics[] metrics = null; + for (Metrics data : collection) { if (Objects.nonNull(nextExportWorker)) { ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT); nextExportWorker.in(event); } - Metrics dbData = null; - try { - dbData = metricsDAO.get(model, data); - } catch (Throwable t) { - logger.error(t.getMessage(), t); - } - try { - if (nonNull(dbData)) { - data.combine(dbData); - data.calculate(); - - batchCollection.add(metricsDAO.prepareBatchUpdate(model, data)); + int batchGetSize = 2000; + int mod = i % batchGetSize; + if (mod == 0) { + int residual = collection.size() - i; + if (residual >= batchGetSize) { + metrics = new Metrics[batchGetSize]; } else { - batchCollection.add(metricsDAO.prepareBatchInsert(model, data)); + metrics = new Metrics[residual]; } - - if (Objects.nonNull(nextAlarmWorker)) { - nextAlarmWorker.in(data); - } - if (Objects.nonNull(nextExportWorker)) { - ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL); - nextExportWorker.in(event); + } + metrics[mod] = data; + + if (mod == metrics.length - 1) { + try { + Map dbMetricsMap = metricsDAO.get(model, metrics); + + for (Metrics metric : metrics) { + if (dbMetricsMap.containsKey(metric.id())) { + metric.combine(dbMetricsMap.get(metric.id())); + metric.calculate(); + batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric)); + } else { + batchCollection.add(metricsDAO.prepareBatchInsert(model, metric)); + } + + if (Objects.nonNull(nextAlarmWorker)) { + nextAlarmWorker.in(metric); + } + if (Objects.nonNull(nextExportWorker)) { + ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL); + nextExportWorker.in(event); + } + } + } catch (Throwable t) { + logger.error(t.getMessage(), t); } - } catch (Throwable t) { - logger.error(t.getMessage(), t); } - }); + + i++; + } + + if (batchCollection.size() > 0) { + logger.debug("prepareBatch model {}, took time: {}", model.getName(), System.currentTimeMillis() - start); + } return batchCollection; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 20d09bf7f972bac3b79588f30f750b904fca163b..dfd4b20be8aefb3ee087cccbca40650a5e3a79d3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -75,19 +75,19 @@ public class MetricsStreamProcessor implements StreamProcessor { MetricsPersistentWorker monthPersistentWorker = null; if (configService.shouldToHour()) { - Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour)); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false); hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); } if (configService.shouldToDay()) { - Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day)); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false); dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); } if (configService.shouldToMonth()) { - Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month)); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false); monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); } - Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute)); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false); MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model); MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java index 6483a118f739b0a29e29fd716e692389e9b8b3e6..b84d99b63b7b728a9da87c8a637dd9d37151cf63 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java @@ -65,8 +65,8 @@ public class RecordStreamProcessor implements StreamProcessor { } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); - Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second)); - RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO); + Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true); + RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO); persistentWorkers.add(persistentWorker); workers.put(recordClass, persistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java index 03ae693ef5cf3401bd726eb7ba031f77853f79ac..fcb3a68808abcbeda3d833d121973004c48342df 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java @@ -61,7 +61,7 @@ public class TopNStreamProcessor implements StreamProcessor { } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); - Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second)); + Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true); TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO); persistentWorkers.add(persistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java index 596ab817192d2650aff94dfd4bee8d6fe78745b8..4f9f3f7af8c3c37c3486e74e16f42499e0db9cb6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java @@ -56,7 +56,7 @@ public class InventoryStreamProcessor implements StreamProcessor } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); - Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None)); + Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false); StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class); streamDataMappingSetter.putIfAbsent(inventoryClass); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java index 29ec61718fd8ca79b14d2df9dca83ad8cfb153cb..5c2e2462f6d84b7a284d8115e50fe419aa161912 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.storage; import java.io.IOException; +import java.util.Map; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -27,7 +28,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model; */ public interface IMetricsDAO extends DAO { - Metrics get(Model model, Metrics metrics) throws IOException; + Map get(Model model, Metrics[] metrics) throws IOException; INSERT prepareBatchInsert(Model model, Metrics metrics) throws IOException; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index 08cced4518f6e1b2af32faa5e92ad3d5d2f5fb96..ffe3480ca9f37aaa2ce558736707bb55478279b4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -61,17 +61,20 @@ public enum PersistenceTimer { if (!isStarted) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( - new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), - t -> logger.error("Extract data and save failure.", t)), 1, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS); + new RunnableWithExceptionProtection(() -> extractDataAndSaveRecord(batchDAO), + t -> logger.error("Extract data and save record failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS); + + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + new RunnableWithExceptionProtection(() -> extractDataAndSaveMetrics(batchDAO), + t -> logger.error("Extract data and save metrics failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS); this.isStarted = true; } } - @SuppressWarnings("unchecked") - private void extractDataAndSave(IBatchDAO batchDAO) { + private void extractDataAndSaveRecord(IBatchDAO batchDAO) { if (logger.isDebugEnabled()) { - logger.debug("Extract data and save"); + logger.debug("Extract data and save record"); } long startTime = System.currentTimeMillis(); @@ -79,36 +82,12 @@ public enum PersistenceTimer { HistogramMetrics.Timer timer = prepareLatency.createTimer(); List records = new LinkedList(); - List metrics = new LinkedList(); try { List persistenceWorkers = new ArrayList<>(); - persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers()); persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); - persistenceWorkers.forEach(worker -> { - if (logger.isDebugEnabled()) { - logger.debug("extract {} worker data and save", worker.getClass().getName()); - } - - if (worker.flushAndSwitch()) { - List batchCollection = worker.buildBatchCollection(); - - if (logger.isDebugEnabled()) { - logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size()); - } - - if (worker instanceof RecordPersistentWorker) { - records.addAll(batchCollection); - } else if (worker instanceof MetricsPersistentWorker) { - metrics.addAll(batchCollection); - } else if (worker instanceof TopNWorker) { - records.addAll(batchCollection); - } else { - logger.error("Missing the worker {}", worker.getClass().getSimpleName()); - } - } - }); + buildBatchCollection(persistenceWorkers, records); if (debug) { logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime); @@ -122,6 +101,46 @@ public enum PersistenceTimer { if (CollectionUtils.isNotEmpty(records)) { batchDAO.asynchronous(records); } + } finally { + executeLatencyTimer.finish(); + } + } catch (Throwable e) { + errorCounter.inc(); + logger.error(e.getMessage(), e); + } finally { + if (logger.isDebugEnabled()) { + logger.debug("Persistence data save finish"); + } + } + + if (debug) { + logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime); + } + } + + private void extractDataAndSaveMetrics(IBatchDAO batchDAO) { + if (logger.isDebugEnabled()) { + logger.debug("Extract data and save metrics"); + } + + long startTime = System.currentTimeMillis(); + try { + HistogramMetrics.Timer timer = prepareLatency.createTimer(); + + List metrics = new LinkedList(); + try { + List persistenceWorkers = new ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers()); + buildBatchCollection(persistenceWorkers, metrics); + + if (debug) { + logger.info("build metrics batch persistence duration: {} ms", System.currentTimeMillis() - startTime); + } + } finally { + timer.finish(); + } + + HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); + try { if (CollectionUtils.isNotEmpty(metrics)) { batchDAO.synchronous(metrics); } @@ -141,4 +160,23 @@ public enum PersistenceTimer { logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime); } } + + @SuppressWarnings("unchecked") + private void buildBatchCollection(List persistenceWorkers, List collection) { + persistenceWorkers.forEach(worker -> { + if (logger.isDebugEnabled()) { + logger.debug("extract {} worker data and save", worker.getClass().getName()); + } + + if (worker.flushAndSwitch()) { + List batchCollection = worker.buildBatchCollection(); + + if (logger.isDebugEnabled()) { + logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size()); + } + + collection.addAll(batchCollection); + } + }); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java index 4d3f26d60dc24ee01197c66732cb4b58d85f55a2..35d178d1eb3aa05d18dce134541e6512402ecf39 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java @@ -26,9 +26,9 @@ import org.apache.skywalking.oap.server.library.module.Service; /** * @author peng-yongsheng */ -public interface StorageDAO extends Service { +public interface StorageDAO extends Service { - IMetricsDAO newMetricsDao(StorageBuilder storageBuilder); + IMetricsDAO newMetricsDao(StorageBuilder storageBuilder); IRegisterDAO newRegisterDao(StorageBuilder storageBuilder); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java index a4712124ec10729a6b628ecbc928d0feaeb57f59..b30c457ce2b96e8acbfb90bd6ca7b26b768bf182 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java @@ -26,5 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service; */ public interface IModelSetter extends Service { - Model putIfAbsent(Class aClass, int scopeId, Storage storage); + Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java index b2dccc94358e133050b1a4003172f8652ae25a00..9693ef7f5395aa3e9106253d933047f3ea205365 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java @@ -34,13 +34,15 @@ public class Model { private final boolean deleteHistory; private final List columns; private final int scopeId; + private final boolean record; - public Model(String name, List columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling) { + public Model(String name, List columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling, boolean record) { this.columns = columns; this.capableOfTimeSeries = capableOfTimeSeries; this.downsampling = downsampling; this.deleteHistory = deleteHistory; this.scopeId = scopeId; this.name = ModelName.build(downsampling, name); + this.record = record; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java index 2642523f1d7e940b5937e91f765849b0c9e71cbb..3ccf594e2290e56a6b32310a1d21874ed929e0cf 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java @@ -37,7 +37,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride this.models = new LinkedList<>(); } - @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage) { + @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record) { // Check this scope id is valid. DefaultScopeDefine.nameOf(scopeId); @@ -50,7 +50,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride List modelColumns = new LinkedList<>(); retrieval(aClass, storage.getModelName(), modelColumns); - Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling()); + Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling(), record); models.add(model); return model; diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index a4a4ec6585dc4e7baff552a0d58bb26b12b9e948..cbee9e2e6d37f0151d1ff3d5aa7d317d1891ab81 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -40,10 +40,9 @@ import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.*; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.*; -import org.elasticsearch.common.unit.*; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.*; @@ -227,29 +226,13 @@ public class ElasticSearchClient implements Client { return client.get(request); } - public SearchResponse idQuery(String indexName, String id) throws IOException { - indexName = formatIndexName(indexName); - - SearchRequest searchRequest = new SearchRequest(indexName); - searchRequest.types(TYPE); - searchRequest.source().query(QueryBuilders.idsQuery().addIds(id)); - return client.search(searchRequest); - } - - public Map> ids(String indexName, String... ids) throws IOException { + public SearchResponse ids(String indexName, String[] ids) throws IOException { indexName = formatIndexName(indexName); SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(TYPE); searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length); - SearchResponse response = client.search(searchRequest); - - Map> result = new HashMap<>(); - SearchHit[] hits = response.getHits().getHits(); - for (SearchHit hit : hits) { - result.put(hit.getId(), hit.getSourceAsMap()); - } - return result; + return client.search(searchRequest); } public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { @@ -312,7 +295,7 @@ public class ElasticSearchClient implements Client { } } - public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) { + public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { @@ -325,7 +308,7 @@ public class ElasticSearchClient implements Client { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { - logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); + logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook().getMillis(), request.requests().size()); } } @@ -337,7 +320,6 @@ public class ElasticSearchClient implements Client { return BulkProcessor.builder(client::bulkAsync, listener) .setBulkActions(bulkActions) - .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(flushInterval)) .setConcurrentRequests(concurrentRequests) .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java index c0c085499db2d4da656afdb59a8592f81ab7529a..6beba6b46367708936ac4307fb00ab8ca2530283 100644 --- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java @@ -18,8 +18,10 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; -import com.google.gson.Gson; -import com.google.gson.JsonObject; +import com.google.gson.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.HttpGet; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -27,26 +29,13 @@ import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.client.*; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.powermock.reflect.Whitebox; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import org.slf4j.*; /** * @author peng-yongsheng @@ -188,7 +177,7 @@ public class ITElasticSearchClient { @Test public void bulk() throws InterruptedException { - BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2); + BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2); Map source = new HashMap<>(); source.put("column1", "value1"); @@ -246,7 +235,7 @@ public class ITElasticSearchClient { } private RestHighLevelClient getRestHighLevelClient() { - return (RestHighLevelClient) Whitebox.getInternalState(client, "client"); + return (RestHighLevelClient)Whitebox.getInternalState(client, "client"); } private JsonObject undoFormatIndexName(JsonObject index) { diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml index c5b01a363fd0ee5107b516a8ada176e37c32fff0..bc4b74556389421c91ade1aa5fd9c2ed6f1b56c1 100644 --- a/oap-server/server-starter/src/main/assembly/application.yml +++ b/oap-server/server-starter/src/main/assembly/application.yml @@ -75,8 +75,7 @@ storage: # otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day # monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month # # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html -# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests -# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb +# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests # flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests # concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests # metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index 665a852915e1e87e193d27ac7c2f3d8535e35269..ec79843f4dcdbc32aded4332e9f4507675457b61 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -75,8 +75,7 @@ storage: otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests - bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index bb5671ecbb1b15eb95d171a386cc84f3e8569b9f..48f4e0cb5edfcf852a4b20301e02b4fb16838e01 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -32,7 +32,6 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { @Setter private int indexReplicasNumber = 0; @Setter private int indexRefreshInterval = 2; @Setter private int bulkActions = 2000; - @Setter private int bulkSize = 20; @Setter private int flushInterval = 10; @Setter private int concurrentRequests = 2; @Setter private int syncBulkActions = 3; diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 58e83835e3a0cb53142343db7a01de3a5ad00f82..63d936b14401bbe1895b5882e6a964bb6cf51818 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -68,7 +68,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { } elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword()); - this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests())); + this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests())); this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient)); this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient)); this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL())); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java index a3d52e9d9dddccf483876e8db41a80592a2f73bd..d4d118d477095f941abddb4f902909c59f881cfc 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -36,22 +36,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { private BulkProcessor bulkProcessor; private final int bulkActions; - private final int bulkSize; private final int flushInterval; private final int concurrentRequests; - public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval, + public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int flushInterval, int concurrentRequests) { super(client); this.bulkActions = bulkActions; - this.bulkSize = bulkSize; this.flushInterval = flushInterval; this.concurrentRequests = concurrentRequests; } @Override public void asynchronous(List collection) { if (bulkProcessor == null) { - this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests); + this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests); } if (logger.isDebugEnabled()) { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index e313c6012a491d678c21aadbb6b242a692802d08..a0c8212c27a532e416496176895940bfaa1d6cf4 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; +import java.util.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -40,13 +41,20 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO 0) { - return storageBuilder.map2Data(response.getHits().getAt(0).getSourceAsMap()); - } else { - return null; + @Override public Map get(Model model, Metrics[] metrics) throws IOException { + Map result = new HashMap<>(); + + String[] ids = new String[metrics.length]; + for (int i = 0; i < metrics.length; i++) { + ids[i] = metrics[i].id(); + } + + SearchResponse response = getClient().ids(model.getName(), ids); + for (int i = 0; i < response.getHits().totalHits; i++) { + Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap()); + result.put(source.id(), source); } + return result; } @Override public IndexRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java index 52fe3fb97aeca46349ede49ac734e3b5d83c73a3..318b9a8f466fbf996c1282942b6d31c49752f772 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java @@ -23,17 +23,19 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** * @author peng-yongsheng */ -public class StorageEsDAO extends EsDAO implements StorageDAO { +public class StorageEsDAO extends EsDAO implements StorageDAO { public StorageEsDAO(ElasticSearchClient client) { super(client); } - @Override public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { + @Override public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { return new MetricsEsDAO(getClient(), storageBuilder); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java index 7883049f2e8b28a873c3a1e2a75c8547182320ba..0246cede6664ef00571cbc7c7d52f0ca923c7416 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -64,7 +64,7 @@ public class StorageEsInstaller extends ModelInstaller { @Override protected void createTable(Client client, Model model) throws StorageException { ElasticSearchClient esClient = (ElasticSearchClient)client; - JsonObject settings = createSetting(); + JsonObject settings = createSetting(model.isRecord()); JsonObject mapping = createMapping(model); logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping.toString()); @@ -97,13 +97,12 @@ public class StorageEsInstaller extends ModelInstaller { } } - private JsonObject createSetting() { + private JsonObject createSetting(boolean record) { JsonObject setting = new JsonObject(); setting.addProperty("index.number_of_shards", indexShardsNumber); setting.addProperty("index.number_of_replicas", indexReplicasNumber); - setting.addProperty("index.refresh_interval", TimeValue.timeValueSeconds(indexRefreshInterval).toString()); + setting.addProperty("index.refresh_interval", record ? TimeValue.timeValueSeconds(10).toString() : TimeValue.timeValueSeconds(indexRefreshInterval).toString()); setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop"); - TimeValue.timeValueSeconds(3); return setting; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java index 7df8ba6cc619676de7faeb31eecfd6ad76212667..dd36e41b15620ed4a8913d1b5d226b71c9c74409 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java @@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.*; import org.elasticsearch.search.aggregations.metrics.avg.Avg; @@ -102,15 +103,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { @Override public IntValues getLinearIntValues(String indName, Downsampling downsampling, List ids, String valueCName) throws IOException { String indexName = ModelName.build(downsampling, indName); - Map> response = getClient().ids(indexName, ids.toArray(new String[0])); + SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0])); + Map> idMap = toMap(response); IntValues intValues = new IntValues(); for (String id : ids) { KVInt kvInt = new KVInt(); kvInt.setId(id); kvInt.setValue(0); - if (response.containsKey(id)) { - Map source = response.get(id); + if (idMap.containsKey(id)) { + Map source = idMap.get(id); kvInt.setValue(((Number)source.getOrDefault(valueCName, 0)).longValue()); } intValues.getValues().add(kvInt); @@ -125,11 +127,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { Thermodynamic thermodynamic = new Thermodynamic(); List> thermodynamicValueMatrix = new ArrayList<>(); - Map> response = getClient().ids(indexName, ids.toArray(new String[0])); + SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0])); + Map> idMap = toMap(response); int numOfSteps = 0; for (String id : ids) { - Map source = response.get(id); + Map source = idMap.get(id); if (source == null) { // add empty list to represent no data exist for this time bucket thermodynamicValueMatrix.add(new ArrayList<>()); @@ -159,4 +162,13 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO { return thermodynamic; } + + private Map> toMap(SearchResponse response) { + Map> result = new HashMap<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + result.put(hit.getId(), hit.getSourceAsMap()); + } + return result; + } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java index 70318cc7d0c76eb41b14743082e0f36eac6074dd..1943c94284c3a5369bc1cd91dc037bb9a8680cc1 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.io.IOException; +import java.util.Map; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -29,6 +30,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor; * @author wusheng */ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO { + private JDBCHikariCPClient h2Client; private StorageBuilder storageBuilder; @@ -37,8 +39,9 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO get(Model model, Metrics[] metrics) throws IOException { + // return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder); + return null; } @Override public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException { diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java index 21d3d3c26f933b31c8b181f375b098931147404a..3ab7d291d21a6cb605bbf3a7e74702fcd2c39090 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java @@ -19,26 +19,17 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.io.IOException; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.sql.*; +import java.util.*; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; -import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.StorageData; +import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.type.StorageDataType; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; -import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder; -import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor; -import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.*; +import org.slf4j.*; /** * @author wusheng @@ -52,9 +43,7 @@ public class H2SQLExecutor { try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE id = ?", id)) { return toStorageData(rs, modelName, storageBuilder); } - } catch (SQLException e) { - throw new IOException(e.getMessage(), e); - } catch (JDBCClientException e) { + } catch (SQLException | JDBCClientException e) { throw new IOException(e.getMessage(), e); } } @@ -65,9 +54,7 @@ public class H2SQLExecutor { try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE " + columnName + " = ?", value)) { return toStorageData(rs, modelName, storageBuilder); } - } catch (SQLException e) { - throw new IOException(e.getMessage(), e); - } catch (JDBCClientException e) { + } catch (SQLException | JDBCClientException e) { throw new IOException(e.getMessage(), e); } } @@ -92,9 +79,7 @@ public class H2SQLExecutor { return rs.getInt(ServiceInstanceInventory.SEQUENCE); } } - } catch (SQLException e) { - logger.error(e.getMessage(), e); - } catch (JDBCClientException e) { + } catch (SQLException | JDBCClientException e) { logger.error(e.getMessage(), e); } return Const.NONE; diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java index d23001dee075cf4c27e1eeecb349667f251be552..d8ef8398a19866d9338efca93280aed4db0f237d 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java @@ -21,24 +21,22 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.register.RegisterSource; -import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; -import org.apache.skywalking.oap.server.core.storage.IRecordDAO; -import org.apache.skywalking.oap.server.core.storage.IRegisterDAO; -import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; +import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor; /** * @author wusheng */ -public class H2StorageDAO implements StorageDAO { +public class H2StorageDAO implements StorageDAO { + private JDBCHikariCPClient h2Client; public H2StorageDAO(JDBCHikariCPClient h2Client) { this.h2Client = h2Client; } - @Override public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { + @Override public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { return new H2MetricsDAO(h2Client, storageBuilder); }