From de975c7e2bb3c58fa1813f16c73bf61ef3455d7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Wed, 14 Jul 2021 17:19:49 +0800 Subject: [PATCH] Remove synchronous persistence mechanism in API level and ElasticSearch implementation (#7293) * Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced persistent session mechanism, don't require the data queryable immediately after the insert and update anymore. * Performance: share `flushInterval` setting for both metrics and record data, due to `synchronous persistence mechanism` removed. Record flush interval used to be hardcoded as 10s. * Remove `syncBulkActions` in ElasticSearch storage option. * Increase the default bulkActions(env, SW_STORAGE_ES_BULK_ACTIONS) to 5000(from 1000). * Increase the flush interval of ElasticSearch indices to 15s(from 10s) Add these 2 references. According to these, **(same _index, _type and _id) in same bulk will be in order** 1. https://github.com/elastic/elasticsearch/issues/50199 2. https://discuss.elastic.co/t/order-of--bulk-request-operations/98124 Notice, the order of different bulks is not guaranteed by the ElasticSearch cluster. We are going to have the risk of dirty write. But consider we set over 20s period between flush, and index flush period is 10, we should be safe. Recommend 5000 bulk size and 15s flush interval only. The persistent period has been set to 25s. --- CHANGES.md | 13 +++++-- docs/en/FAQ/README.md | 1 + docs/en/FAQ/es-version-conflict.md | 35 +++++++++++++++++++ .../setup/backend/configuration-vocabulary.md | 8 ++--- .../src/main/resources/application.yml | 13 ++++--- .../oap/server/core/CoreModuleConfig.java | 2 +- .../worker/RecordPersistentWorker.java | 2 +- .../oap/server/core/storage/IBatchDAO.java | 14 ++++---- .../server/core/storage/PersistenceTimer.java | 2 +- .../core/storage/PersistenceTimerTest.java | 4 +-- .../elasticsearch/ElasticSearchClient.java | 4 +++ .../StorageModuleElasticsearchConfig.java | 14 ++++++-- .../elasticsearch/base/BatchProcessEsDAO.java | 22 ++++++------ .../base/StorageEsInstaller.java | 4 +-- .../client/ElasticSearch7Client.java | 4 +++ .../plugin/influxdb/base/BatchDAO.java | 4 +-- .../plugin/jdbc/h2/dao/H2BatchDAO.java | 6 ++-- .../skywalking/e2e/metrics/MetricsQuery.java | 4 +-- 18 files changed, 103 insertions(+), 53 deletions(-) create mode 100644 docs/en/FAQ/es-version-conflict.md diff --git a/CHANGES.md b/CHANGES.md index 0c536f4c19..fc932f6205 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,18 +72,24 @@ Release Notes. * Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages. * Re-implement storage session mechanism, cached metrics are removed only according to their last access timestamp, rather than first time. This makes sure hot data never gets removed unexpectedly. -* Support session expired threshold configurable. +* Support session expired threshold configurable. * Fix InfluxDB storage-plugin Metrics#multiGet issue. * Replace zuul proxy with spring cloud gateway 2.x. in webapp module. * Upgrade etcd cluster coordinator and dynamic configuration to v3.x. -* Configuration: Allow to configure server maximum request header size. +* Configuration: Allow configuring server maximum request header size. * Add thread state metric and class loaded info metric to JVMMetric. * Performance: compile LAL DSL statically and run with type checked. * Add pagination to event query protocol. * Performance: optimize Envoy error logs persistence performance. +* Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced + persistent session mechanism, don't require the data queryable immediately after the insert and update anymore. +* Performance: share `flushInterval` setting for both metrics and record data, due + to `synchronous persistence mechanism` removed. Record flush interval used to be hardcoded as 10s. +* Remove `syncBulkActions` in ElasticSearch storage option. +* Increase the default bulkActions(env, SW_STORAGE_ES_BULK_ACTIONS) to 5000(from 1000). +* Increase the flush interval of ElasticSearch indices to 15s(from 10s) #### UI - * Fix the date component for log conditions. * Fix selector keys for duplicate options. * Add Python celery plugin. @@ -93,6 +99,7 @@ Release Notes. * Fix chart types for setting metrics configure. #### Documentation +* Add FAQ about `Elasticsearch exception type=version_conflict_engine_exception since 8.7.0` All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/90?closed=1) diff --git a/docs/en/FAQ/README.md b/docs/en/FAQ/README.md index 806b965557..49ec708662 100644 --- a/docs/en/FAQ/README.md +++ b/docs/en/FAQ/README.md @@ -11,6 +11,7 @@ These are known and frequently asked questions about SkyWalking. We welcome you * [Compiling issues on Mac's M1 chip](How-to-build-with-mac-m1.md) ## Runtime +* [Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0](es-version-conflict.md) * [Version 8.x+ upgrade](v8-version-upgrade.md) * [Why do metrics indexes with Hour and Day precisions stop updating after upgrade to 7.x?](Hour-Day-Metrics-Stopping.md) * [Version 6.x upgrade](v6-version-upgrade.md) diff --git a/docs/en/FAQ/es-version-conflict.md b/docs/en/FAQ/es-version-conflict.md new file mode 100644 index 0000000000..2cde770824 --- /dev/null +++ b/docs/en/FAQ/es-version-conflict.md @@ -0,0 +1,35 @@ +# Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0 + +Since 8.7.0, we did the following optimization to reduce Elasticsearch load. + +```markdown +Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced +persistent session mechanism, don't require the data queryable immediately after the insert and update anymore. +``` + +Due to this, we flush the metrics into Elasticsearch without using `WriteRequest.RefreshPolicy.WAIT_UNTIL`. This reduces +the load of persistent works in OAP server and load of Elasticsearch CPU dramatically. + +Meanwhile, there is little chance you could see following **warn**s in your logs. + +``` +{ + "timeMillis": 1626247722647, + "thread": "I/O dispatcher 4", + "level": "WARN", + "loggerName": "org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient", + "message": "Bulk [70] executed with failures:[failure in bulk execution:\n[18875]: index [sw8_service_relation_client_side-20210714], type [_doc], id [20210714_b3BlcmF0aW9uLXJ1bGUtc2VydmVyQDExNDgx.1-bWFya2V0LXJlZmVycmFsLXNlcnZlckAxMDI1MQ==.1], message [[sw8_service_relation_client_side-20210714/D7qzncbeRq6qh2QF5MogTw][[sw8_service_relation_client_side-20210714][0]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[20210714_b3BlcmF0aW9uLXJ1bGUtc2VydmVyQDExNDgx.1-bWFya2V0LXJlZmVycmFsLXNlcnZlckAxMDI1MQ==.1]: version conflict, required seqNo [14012594], primary term [1]. current document has seqNo [14207928] and primary term [1]]]]]", + "endOfBatch": false, + "loggerFqcn": "org.apache.logging.slf4j.Log4jLogger", + "threadId": 44, + "threadPriority": 5, + "timestamp": "2021-07-14 15:28:42.647" +} +``` + +This would not affect the system much, just a possibility of inaccurate of metrics. If this wouldn't show up in high +frequency, you could ignore this directly. + +In case you could see many logs like this. Then it is a signal, that the flush period of your ElasticSearch template can't +catch up your setting. Or you set the `persistentPeriod` less than the flush period. + diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index cf8e0c402d..fb79c15beb 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -20,6 +20,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | gRPCSslCertChainPath| The file path of gRPC SSL cert chain| SW_CORE_GRPC_SSL_CERT_CHAIN_PATH| - | | - | - | gRPCSslTrustedCAPath| The file path of gRPC trusted CA| SW_CORE_GRPC_SSL_TRUSTED_CA_PATH| - | | - | - | downsampling| The activated level of down sampling aggregation | | Hour,Day| +| - | - | persistentPeriod| The execution period of the persistent timer. Unit is second. | | 25 | | - | - | enableDataKeeperExecutor|Controller of TTL scheduler. Once disabled, TTL wouldn't work.|SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR|true| | - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit is minute. Execution doesn't mean deleting data. The storage provider could override this, such as ElasticSearch storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5| | - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3| @@ -97,8 +98,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 | | - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 | | - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 | -| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000| -| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10| +| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 5000| +| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 15| | - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 | | - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000| | - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_ES_QUERY_MAX_SIZE | 5000 | @@ -122,8 +123,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 | | - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 | | - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 | -| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000| -| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. | SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000| +| - | - | bulkActions| Async bulk size of data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 5000| | - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10| | - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 | | - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000| diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index bf5f33f05f..06ba88c1f4 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -141,8 +141,8 @@ storage: superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0 superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces. superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0. - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests - flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000} metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} @@ -169,8 +169,8 @@ storage: user: ${SW_ES_USER:""} password: ${SW_ES_PASSWORD:""} secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests - flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000} metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} @@ -250,9 +250,8 @@ storage: user: ${SW_ES_USER:""} password: ${SW_ES_PASSWORD:""} secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests - syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests - flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000} metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 6fe780bfd7..2ac56eb5a9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -64,7 +64,7 @@ public class CoreModuleConfig extends ModuleConfig { * The period of doing data persistence. Unit is second. */ @Setter - private long persistentPeriod = 3; + private long persistentPeriod = 25; private boolean enableDataKeeperExecutor = true; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java index 9c5f8beb35..0ef237f0c1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java @@ -49,7 +49,7 @@ public class RecordPersistentWorker extends AbstractWorker { public void in(Record record) { try { InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record); - batchDAO.asynchronous(insertRequest); + batchDAO.insert(insertRequest); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java index c6bb7361ed..38c7563fcb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java @@ -23,25 +23,23 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; /** - * IBatchDAO provides two modes of data persistence supported by most databases, including synchronous and - * asynchronous. + * IBatchDAO provides two modes of data persistence supported by most databases, including pure insert and + * batch hybrid insert/update. */ public interface IBatchDAO extends DAO { /** * Push data into the database in async mode. This method is driven by streaming process. This method doesn't * request the data queryable immediately after the method finished. * - * All data are in the additional mode, no modification. - * * @param insertRequest data to insert. */ - void asynchronous(InsertRequest insertRequest); + void insert(InsertRequest insertRequest); /** - * Make all given PrepareRequest efficient in the sync mode. All requests could be confirmed by the database. All - * changes are required queryable after method returns. + * Push data collection into the database in async mode. This method is driven by streaming process. This method + * doesn't request the data queryable immediately after the method finished. * * @param prepareRequests data to insert or update. No delete happens in streaming mode. */ - void synchronous(List prepareRequests); + void flush(List prepareRequests); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index a7e1e89ce4..6c92af0bc7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -161,7 +161,7 @@ public enum PersistenceTimer { HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); try { if (CollectionUtils.isNotEmpty(partition)) { - batchDAO.synchronous(partition); + batchDAO.flush(partition); } } catch (Throwable e) { log.error(e.getMessage(), e); diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java index 1fc73557ff..da0b8e6f41 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java @@ -57,12 +57,12 @@ public class PersistenceTimerTest { moduleConfig.setPersistentPeriod(Integer.MAX_VALUE); IBatchDAO iBatchDAO = new IBatchDAO() { @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { } @Override - public void synchronous(List prepareRequests) { + public void flush(final List prepareRequests) { synchronized (result) { result.addAll(prepareRequests); } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 3727fd8916..4bc5bab483 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -547,6 +547,10 @@ public class ElasticSearchClient implements Client, HealthCheckable { return response.getStatusLine().getStatusCode(); } + /** + * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future. + */ + @Deprecated public void synchronousBulk(BulkRequest request) { request.timeout(TimeValue.timeValueMinutes(2)); request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index 3db7cf772a..8ca9e0e56b 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -42,7 +42,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { */ private int socketTimeout = 30000; /** - * Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES + * @since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES * storage creates new indexes in every day. * * @since 7.0.0 dayStep represents how many days a single one index represents. Default is 1, meaning no difference @@ -64,8 +64,16 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { private int superDatasetIndexReplicasNumber = 0; private int superDatasetIndexShardsFactor = 5; private int indexRefreshInterval = 2; - private int bulkActions = 2000; - private int flushInterval = 10; + /** + * @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy. + */ + private int bulkActions = 5000; + /** + * Period of flush, no matter `bulkActions` reached or not. Unit is second. + * + * @since 8.7.0 increase to 15s from 10s + */ + private int flushInterval = 15; private int concurrentRequests = 2; /** * @since 7.0.0 This could be managed inside {@link #secretsManagementFile} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java index bcbe21b824..fb8df0201a 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -19,22 +19,18 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.util.List; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { - - private static final Logger LOGGER = LoggerFactory.getLogger(BatchProcessEsDAO.class); - private BulkProcessor bulkProcessor; private final int bulkActions; private final int flushInterval; @@ -51,7 +47,7 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { } @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { if (bulkProcessor == null) { this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests); } @@ -60,17 +56,19 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { } @Override - public void synchronous(List prepareRequests) { + public void flush(List prepareRequests) { + if (bulkProcessor == null) { + this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests); + } + if (CollectionUtils.isNotEmpty(prepareRequests)) { - BulkRequest request = new BulkRequest(); for (PrepareRequest prepareRequest : prepareRequests) { if (prepareRequest instanceof InsertRequest) { - request.add((IndexRequest) prepareRequest); + this.bulkProcessor.add((IndexRequest) prepareRequest); } else { - request.add((UpdateRequest) prepareRequest); + this.bulkProcessor.add((UpdateRequest) prepareRequest); } } - getClient().synchronousBulk(request); } } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java index 7d7e6a9ea1..d8eb8cfd50 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -162,9 +162,7 @@ public class StorageEsInstaller extends ModelInstaller { setting.put("index.number_of_shards", model.isSuperDataset() ? config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor() : config.getIndexShardsNumber()); - setting.put("index.refresh_interval", model.isRecord() - ? TimeValue.timeValueSeconds(10).toString() - : TimeValue.timeValueSeconds(config.getFlushInterval()).toString()); + setting.put("index.refresh_interval", TimeValue.timeValueSeconds(config.getFlushInterval()).toString()); setting.put("analysis", getAnalyzerSetting(model.getColumns())); if (!StringUtil.isEmpty(config.getAdvanced())) { Map advancedSettings = gson.fromJson(config.getAdvanced(), Map.class); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java index 2d720f0f4c..4129a8815f 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java @@ -392,6 +392,10 @@ public class ElasticSearch7Client extends ElasticSearchClient { return HttpStatus.SC_OK; } + /** + * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future. + */ + @Deprecated @Override public void synchronousBulk(BulkRequest request) { request.timeout(TimeValue.timeValueMinutes(2)); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java index 713a0b10cb..105383d422 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java @@ -36,12 +36,12 @@ public class BatchDAO implements IBatchDAO { } @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { client.write(((InfluxInsertRequest) insertRequest).getPoint()); } @Override - public void synchronous(List prepareRequests) { + public void flush(List prepareRequests) { if (CollectionUtils.isEmpty(prepareRequests)) { return; } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java index 3539fe7556..3a799f9398 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java @@ -56,7 +56,7 @@ public class H2BatchDAO implements IBatchDAO { } @Override - public void synchronous(List prepareRequests) { + public void flush(List prepareRequests) { if (CollectionUtils.isEmpty(prepareRequests)) { return; } @@ -81,7 +81,7 @@ public class H2BatchDAO implements IBatchDAO { } @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { this.dataCarrier.produce(insertRequest); } @@ -100,7 +100,7 @@ public class H2BatchDAO implements IBatchDAO { @Override public void consume(List prepareRequests) { - h2BatchDAO.synchronous(prepareRequests); + h2BatchDAO.flush(prepareRequests); } @Override diff --git a/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java b/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java index 110a8ce28b..f200ed8ae1 100644 --- a/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java +++ b/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java @@ -158,10 +158,8 @@ public class MetricsQuery extends AbstractQuery { "envoy_worker_threads_max" }; - public static String METER_INSTANCE_PERSISTENCE_EXECUTE_PERCENTILE = "meter_oap_instance_persistence_execute_percentile"; - public static String[] ALL_SO11Y_LABELED_METRICS = { - METER_INSTANCE_PERSISTENCE_EXECUTE_PERCENTILE + // Nothing to check for now. }; private String id; private String metricsName; -- GitLab