diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index b4b4fd1283f182f30627fe4b02536b74cdb87b21..d9b27f892fda748355436a3d92b0108ac3ba9a77 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -74,7 +74,7 @@ public class MetricsPersistentWorker extends PersistenceWorker collection = buildBatchCollection(); - batchDAO.batchPersistence(collection); + batchDAO.asynchronous(collection); } } finally { getCache().trySwitchPointerFinally(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java index 5c7a88a342075974aa251efe9a5841b369ed8b39..2bd3a5655cd27e481c53f80a7ffd9b7990819b7c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java @@ -25,5 +25,7 @@ import java.util.List; */ public interface IBatchDAO extends DAO { - void batchPersistence(List batchCollection); + void asynchronous(List collection); + + void synchronous(List collection); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index c26e11bef9d7254f9f06a48789e2e2a291fc6aa5..08cced4518f6e1b2af32faa5e92ad3d5d2f5fb96 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.analysis.worker.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; import org.slf4j.*; @@ -77,7 +78,8 @@ public enum PersistenceTimer { try { HistogramMetrics.Timer timer = prepareLatency.createTimer(); - List batchAllCollection = new LinkedList(); + List records = new LinkedList(); + List metrics = new LinkedList(); try { List persistenceWorkers = new ArrayList<>(); persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); @@ -95,7 +97,16 @@ public enum PersistenceTimer { if (logger.isDebugEnabled()) { logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size()); } - batchAllCollection.addAll(batchCollection); + + if (worker instanceof RecordPersistentWorker) { + records.addAll(batchCollection); + } else if (worker instanceof MetricsPersistentWorker) { + metrics.addAll(batchCollection); + } else if (worker instanceof TopNWorker) { + records.addAll(batchCollection); + } else { + logger.error("Missing the worker {}", worker.getClass().getSimpleName()); + } } }); @@ -108,7 +119,12 @@ public enum PersistenceTimer { HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); try { - batchDAO.batchPersistence(batchAllCollection); + if (CollectionUtils.isNotEmpty(records)) { + batchDAO.asynchronous(records); + } + if (CollectionUtils.isNotEmpty(metrics)) { + batchDAO.synchronous(metrics); + } } finally { executeLatencyTimer.finish(); } @@ -117,12 +133,12 @@ public enum PersistenceTimer { logger.error(e.getMessage(), e); } finally { if (logger.isDebugEnabled()) { - logger.debug("persistence data save finish"); + logger.debug("Persistence data save finish"); } } if (debug) { - logger.info("batch persistence duration: {} ms", System.currentTimeMillis() - startTime); + logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime); } } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 337b96b5e16e396f247f7ec8a0b660d2669405a3..a4a4ec6585dc4e7baff552a0d58bb26b12b9e948 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; import com.google.gson.*; import java.io.*; import java.util.*; - import org.apache.commons.lang3.StringUtils; import org.apache.http.*; import org.apache.http.auth.*; @@ -38,7 +37,7 @@ import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.get.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.*; -import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.*; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.*; import org.elasticsearch.common.unit.*; @@ -60,7 +59,7 @@ public class ElasticSearchClient implements Client { private final String namespace; private final String user; private final String password; - private RestHighLevelClient client; + protected RestHighLevelClient client; public ElasticSearchClient(String clusterNodes, String namespace, String user, String password) { this.clusterNodes = clusterNodes; @@ -130,26 +129,24 @@ public class ElasticSearchClient implements Client { logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson); return new ArrayList<>(responseJson.keySet()); } - return Collections.EMPTY_LIST; + return Collections.emptyList(); } /** - * If your indexName is retrieved from elasticsearch through {@link #retrievalIndexByAliases(String)} or some other method and it already contains namespace. - * Then you should delete the index by this method, this method will no longer concatenate namespace. + * If your indexName is retrieved from elasticsearch through {@link #retrievalIndexByAliases(String)} or some other method and it already contains namespace. + * Then you should delete the index by this method, this method will no longer concatenate namespace. * * https://github.com/apache/skywalking/pull/3017 - * */ public boolean deleteByIndexName(String indexName) throws IOException { return deleteIndex(indexName, false); } /** - * If your indexName is obtained from metadata or configuration and without namespace. - * Then you should delete the index by this method, this method automatically concatenates namespace. - * - * https://github.com/apache/skywalking/pull/3017 + * If your indexName is obtained from metadata or configuration and without namespace. + * Then you should delete the index by this method, this method automatically concatenates namespace. * + * https://github.com/apache/skywalking/pull/3017 */ public boolean deleteByModelName(String modelName) throws IOException { return deleteIndex(modelName, true); @@ -302,11 +299,17 @@ public class ElasticSearchClient implements Client { return response.getStatusLine().getStatusCode(); } - public String formatIndexName(String indexName) { - if (StringUtils.isNotEmpty(namespace)) { - return namespace + "_" + indexName; + public void synchronousBulk(BulkRequest request) { + request.timeout(TimeValue.timeValueMinutes(2)); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + request.waitForActiveShards(ActiveShardCount.ONE); + try { + int size = request.requests().size(); + BulkResponse responses = client.bulk(request); + logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size); + } catch (IOException e) { + logger.error(e.getMessage(), e); } - return indexName; } public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) { @@ -340,4 +343,11 @@ public class ElasticSearchClient implements Client { .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); } + + public String formatIndexName(String indexName) { + if (StringUtils.isNotEmpty(namespace)) { + return namespace + "_" + indexName; + } + return indexName; + } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index c1596981bc943ea818aa75e1d2498650077f21b8..bb5671ecbb1b15eb95d171a386cc84f3e8569b9f 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -28,13 +28,14 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; public class StorageModuleElasticsearchConfig extends ModuleConfig { @Setter private String nameSpace; @Setter private String clusterNodes; - @Setter private int indexShardsNumber; - @Setter private int indexReplicasNumber; - @Setter private boolean highPerformanceMode; + @Setter private int indexShardsNumber = 2; + @Setter private int indexReplicasNumber = 0; + @Setter private int indexRefreshInterval = 2; @Setter private int bulkActions = 2000; @Setter private int bulkSize = 20; @Setter private int flushInterval = 10; @Setter private int concurrentRequests = 2; + @Setter private int syncBulkActions = 3; @Setter private String user; @Setter private String password; @Setter private int metadataQueryMaxSize = 5000; diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 6b90910160a3289ba0971d6eaae93e22b1e92ed2..58e83835e3a0cb53142343db7a01de3a5ad00f82 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -95,7 +95,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { try { elasticSearchClient.connect(); - StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber()); + StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber(), config.getIndexRefreshInterval()); installer.install(elasticSearchClient); RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java index c00a7755d3588caba5120cce6f9b70be1dd27427..a3d52e9d9dddccf483876e8db41a80592a2f73bd 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.util.CollectionUtils; -import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.slf4j.*; @@ -49,17 +49,17 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { this.concurrentRequests = concurrentRequests; } - @Override public void batchPersistence(List batchCollection) { + @Override public void asynchronous(List collection) { if (bulkProcessor == null) { this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests); } if (logger.isDebugEnabled()) { - logger.debug("bulk data size: {}", batchCollection.size()); + logger.debug("Asynchronous batch persistent data collection size: {}", collection.size()); } - if (CollectionUtils.isNotEmpty(batchCollection)) { - batchCollection.forEach(builder -> { + if (CollectionUtils.isNotEmpty(collection)) { + collection.forEach(builder -> { if (builder instanceof IndexRequest) { this.bulkProcessor.add((IndexRequest)builder); } @@ -67,8 +67,23 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { this.bulkProcessor.add((UpdateRequest)builder); } }); + this.bulkProcessor.flush(); } + } + + @Override public void synchronous(List collection) { + if (CollectionUtils.isNotEmpty(collection)) { + BulkRequest request = new BulkRequest(); - this.bulkProcessor.flush(); + for (Object builder : collection) { + if (builder instanceof IndexRequest) { + request.add((IndexRequest)builder); + } + if (builder instanceof UpdateRequest) { + request.add((UpdateRequest)builder); + } + } + getClient().synchronousBulk(request); + } } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java index bc22597f033e7e8ea676fe341b551af952245506..38225eee503915106bfd6b3b1586ed3b1edb082b 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java @@ -58,7 +58,7 @@ public abstract class EsDAO extends AbstractDAO { sourceBuilder.size(0); } - XContentBuilder map2builder(Map objectMap) throws IOException { + protected XContentBuilder map2builder(Map objectMap) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); for (String key : objectMap.keySet()) { Object value = objectMap.get(key); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java index 2661c8f18fc657fe4c1958f658b51fa2dbf6e89c..7883049f2e8b28a873c3a1e2a75c8547182320ba 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.elasticsearch.common.unit.TimeValue; import org.slf4j.*; /** @@ -36,12 +37,14 @@ public class StorageEsInstaller extends ModelInstaller { private final int indexShardsNumber; private final int indexReplicasNumber; + private final int indexRefreshInterval; private final ColumnTypeEsMapping columnTypeEsMapping; - public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) { + public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber, int indexRefreshInterval) { super(moduleManager); this.indexShardsNumber = indexShardsNumber; this.indexReplicasNumber = indexReplicasNumber; + this.indexRefreshInterval = indexRefreshInterval; this.columnTypeEsMapping = new ColumnTypeEsMapping(); } @@ -98,8 +101,9 @@ public class StorageEsInstaller extends ModelInstaller { JsonObject setting = new JsonObject(); setting.addProperty("index.number_of_shards", indexShardsNumber); setting.addProperty("index.number_of_replicas", indexReplicasNumber); - setting.addProperty("index.refresh_interval", "3s"); + setting.addProperty("index.refresh_interval", TimeValue.timeValueSeconds(indexRefreshInterval).toString()); setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop"); + TimeValue.timeValueSeconds(3); return setting; } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java index 3c4a6a3a0655764eedf68d0a3c844ad3c3b63f36..a227c7d729e2c75f52793773cdd68f4f84dcb823 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java @@ -18,20 +18,20 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; -import java.sql.Connection; -import java.sql.SQLException; +import java.sql.*; import java.util.List; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** - * @author wusheng + * @author wusheng, peng-yongsheng */ public class H2BatchDAO implements IBatchDAO { + private static final Logger logger = LoggerFactory.getLogger(H2BatchDAO.class); private JDBCHikariCPClient h2Client; @@ -40,24 +40,26 @@ public class H2BatchDAO implements IBatchDAO { this.h2Client = h2Client; } - @Override public void batchPersistence(List batchCollection) { - if (batchCollection.size() == 0) { + @Override public void synchronous(List collection) { + if (CollectionUtils.isEmpty(collection)) { return; } if (logger.isDebugEnabled()) { - logger.debug("batch sql statements execute, data size: {}", batchCollection.size()); + logger.debug("batch sql statements execute, data size: {}", collection.size()); } try (Connection connection = h2Client.getConnection()) { - for (Object exe : batchCollection) { + for (Object exe : collection) { SQLExecutor sqlExecutor = (SQLExecutor)exe; sqlExecutor.invoke(connection); } - } catch (SQLException e) { - logger.error(e.getMessage(), e); - } catch (JDBCClientException e) { + } catch (SQLException | JDBCClientException e) { logger.error(e.getMessage(), e); } } + + @Override public void asynchronous(List collection) { + synchronous(collection); + } }