diff --git a/CHANGES.md b/CHANGES.md index 0c536f4c19fbde1f03b735920be5bec5c9210c65..fc932f620557d19370a2ad6f11ba8aa5efe27923 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,18 +72,24 @@ Release Notes. * Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages. * Re-implement storage session mechanism, cached metrics are removed only according to their last access timestamp, rather than first time. This makes sure hot data never gets removed unexpectedly. -* Support session expired threshold configurable. +* Support session expired threshold configurable. * Fix InfluxDB storage-plugin Metrics#multiGet issue. * Replace zuul proxy with spring cloud gateway 2.x. in webapp module. * Upgrade etcd cluster coordinator and dynamic configuration to v3.x. -* Configuration: Allow to configure server maximum request header size. +* Configuration: Allow configuring server maximum request header size. * Add thread state metric and class loaded info metric to JVMMetric. * Performance: compile LAL DSL statically and run with type checked. * Add pagination to event query protocol. * Performance: optimize Envoy error logs persistence performance. +* Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced + persistent session mechanism, don't require the data queryable immediately after the insert and update anymore. +* Performance: share `flushInterval` setting for both metrics and record data, due + to `synchronous persistence mechanism` removed. Record flush interval used to be hardcoded as 10s. +* Remove `syncBulkActions` in ElasticSearch storage option. +* Increase the default bulkActions(env, SW_STORAGE_ES_BULK_ACTIONS) to 5000(from 1000). +* Increase the flush interval of ElasticSearch indices to 15s(from 10s) #### UI - * Fix the date component for log conditions. * Fix selector keys for duplicate options. * Add Python celery plugin. @@ -93,6 +99,7 @@ Release Notes. * Fix chart types for setting metrics configure. #### Documentation +* Add FAQ about `Elasticsearch exception type=version_conflict_engine_exception since 8.7.0` All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/90?closed=1) diff --git a/docs/en/FAQ/README.md b/docs/en/FAQ/README.md index 806b96555733999fb68db73ac23285da8c67bd99..49ec708662a579af90403b4ee4bd775c810ecb22 100644 --- a/docs/en/FAQ/README.md +++ b/docs/en/FAQ/README.md @@ -11,6 +11,7 @@ These are known and frequently asked questions about SkyWalking. We welcome you * [Compiling issues on Mac's M1 chip](How-to-build-with-mac-m1.md) ## Runtime +* [Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0](es-version-conflict.md) * [Version 8.x+ upgrade](v8-version-upgrade.md) * [Why do metrics indexes with Hour and Day precisions stop updating after upgrade to 7.x?](Hour-Day-Metrics-Stopping.md) * [Version 6.x upgrade](v6-version-upgrade.md) diff --git a/docs/en/FAQ/es-version-conflict.md b/docs/en/FAQ/es-version-conflict.md new file mode 100644 index 0000000000000000000000000000000000000000..2cde77082474d1e1e1c900e7b0ca49d2c0c595ce --- /dev/null +++ b/docs/en/FAQ/es-version-conflict.md @@ -0,0 +1,35 @@ +# Elasticsearch exception `type=version_conflict_engine_exception` since 8.7.0 + +Since 8.7.0, we did the following optimization to reduce Elasticsearch load. + +```markdown +Performance: remove the synchronous persistence mechanism from batch ElasticSearch DAO. Because the current enhanced +persistent session mechanism, don't require the data queryable immediately after the insert and update anymore. +``` + +Due to this, we flush the metrics into Elasticsearch without using `WriteRequest.RefreshPolicy.WAIT_UNTIL`. This reduces +the load of persistent works in OAP server and load of Elasticsearch CPU dramatically. + +Meanwhile, there is little chance you could see following **warn**s in your logs. + +``` +{ + "timeMillis": 1626247722647, + "thread": "I/O dispatcher 4", + "level": "WARN", + "loggerName": "org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient", + "message": "Bulk [70] executed with failures:[failure in bulk execution:\n[18875]: index [sw8_service_relation_client_side-20210714], type [_doc], id [20210714_b3BlcmF0aW9uLXJ1bGUtc2VydmVyQDExNDgx.1-bWFya2V0LXJlZmVycmFsLXNlcnZlckAxMDI1MQ==.1], message [[sw8_service_relation_client_side-20210714/D7qzncbeRq6qh2QF5MogTw][[sw8_service_relation_client_side-20210714][0]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[20210714_b3BlcmF0aW9uLXJ1bGUtc2VydmVyQDExNDgx.1-bWFya2V0LXJlZmVycmFsLXNlcnZlckAxMDI1MQ==.1]: version conflict, required seqNo [14012594], primary term [1]. current document has seqNo [14207928] and primary term [1]]]]]", + "endOfBatch": false, + "loggerFqcn": "org.apache.logging.slf4j.Log4jLogger", + "threadId": 44, + "threadPriority": 5, + "timestamp": "2021-07-14 15:28:42.647" +} +``` + +This would not affect the system much, just a possibility of inaccurate of metrics. If this wouldn't show up in high +frequency, you could ignore this directly. + +In case you could see many logs like this. Then it is a signal, that the flush period of your ElasticSearch template can't +catch up your setting. Or you set the `persistentPeriod` less than the flush period. + diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index cf8e0c402d80f97d393b3a57f195cb2cae009e4e..fb79c15bebe049d2d31e8b524c268fb59dfaf3bb 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -20,6 +20,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | gRPCSslCertChainPath| The file path of gRPC SSL cert chain| SW_CORE_GRPC_SSL_CERT_CHAIN_PATH| - | | - | - | gRPCSslTrustedCAPath| The file path of gRPC trusted CA| SW_CORE_GRPC_SSL_TRUSTED_CA_PATH| - | | - | - | downsampling| The activated level of down sampling aggregation | | Hour,Day| +| - | - | persistentPeriod| The execution period of the persistent timer. Unit is second. | | 25 | | - | - | enableDataKeeperExecutor|Controller of TTL scheduler. Once disabled, TTL wouldn't work.|SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR|true| | - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit is minute. Execution doesn't mean deleting data. The storage provider could override this, such as ElasticSearch storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5| | - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3| @@ -97,8 +98,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 | | - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 | | - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 | -| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000| -| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10| +| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 5000| +| - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 15| | - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 | | - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000| | - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_ES_QUERY_MAX_SIZE | 5000 | @@ -122,8 +123,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | superDatasetDayStep | Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0.|SW_SUPERDATASET_STORAGE_DAY_STEP|-1 | | - | - | superDatasetIndexShardsFactor | Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.|SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR|5 | | - | - | superDatasetIndexReplicasNumber | Represent the replicas number in the super size dataset record index.|SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER|0 | -| - | - | bulkActions| Async bulk size of the record data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 1000| -| - | - | syncBulkActions| Sync bulk size of the metrics data batch execution. | SW_STORAGE_ES_SYNC_BULK_ACTIONS| 50000| +| - | - | bulkActions| Async bulk size of data batch execution. | SW_STORAGE_ES_BULK_ACTIONS| 5000| | - | - | flushInterval| Period of flush, no matter `bulkActions` reached or not. Unit is second.| SW_STORAGE_ES_FLUSH_INTERVAL | 10| | - | - | concurrentRequests| The number of concurrent requests allowed to be executed. | SW_STORAGE_ES_CONCURRENT_REQUESTS| 2 | | - | - | resultWindowMaxSize | The max size of dataset when OAP loading cache, such as network alias. | SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE | 10000| diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index bf5f33f05f5ba356cc9e703fdc517818253045e9..06ba88c1f4f2d499bf33b7b3bada80cb85a8f674 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -141,8 +141,8 @@ storage: superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0 superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces. superDatasetIndexReplicasNumber: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER:0} # Represent the replicas number in the super size dataset record index, the default value is 0. - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests - flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000} metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} @@ -169,8 +169,8 @@ storage: user: ${SW_ES_USER:""} password: ${SW_ES_PASSWORD:""} secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests - flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000} metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} @@ -250,9 +250,8 @@ storage: user: ${SW_ES_USER:""} password: ${SW_ES_PASSWORD:""} secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool. - bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests - syncBulkActions: ${SW_STORAGE_ES_SYNC_BULK_ACTIONS:50000} # Execute the sync bulk metrics data every ${SW_STORAGE_ES_SYNC_BULK_ACTIONS} requests - flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests + bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:5000} # Execute the async bulk record data every ${SW_STORAGE_ES_BULK_ACTIONS} requests + flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:15} # flush the bulk every 10 seconds whatever the number of requests concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000} metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 6fe780bfd7c6e267619e79ee6212a8bc7a079593..2ac56eb5a9a7d164ccf980c916d5f4d47da7a9d7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -64,7 +64,7 @@ public class CoreModuleConfig extends ModuleConfig { * The period of doing data persistence. Unit is second. */ @Setter - private long persistentPeriod = 3; + private long persistentPeriod = 25; private boolean enableDataKeeperExecutor = true; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java index 9c5f8beb35b5e1efd09c4ba1e540da53596e905b..0ef237f0c19901af6b4f37d66207f118fe7da5f9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java @@ -49,7 +49,7 @@ public class RecordPersistentWorker extends AbstractWorker { public void in(Record record) { try { InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record); - batchDAO.asynchronous(insertRequest); + batchDAO.insert(insertRequest); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java index c6bb7361ed508fd1c5814cff466d7dcfced26c91..38c7563fcbd578ec124f9789e33dabd86d47865a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java @@ -23,25 +23,23 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; /** - * IBatchDAO provides two modes of data persistence supported by most databases, including synchronous and - * asynchronous. + * IBatchDAO provides two modes of data persistence supported by most databases, including pure insert and + * batch hybrid insert/update. */ public interface IBatchDAO extends DAO { /** * Push data into the database in async mode. This method is driven by streaming process. This method doesn't * request the data queryable immediately after the method finished. * - * All data are in the additional mode, no modification. - * * @param insertRequest data to insert. */ - void asynchronous(InsertRequest insertRequest); + void insert(InsertRequest insertRequest); /** - * Make all given PrepareRequest efficient in the sync mode. All requests could be confirmed by the database. All - * changes are required queryable after method returns. + * Push data collection into the database in async mode. This method is driven by streaming process. This method + * doesn't request the data queryable immediately after the method finished. * * @param prepareRequests data to insert or update. No delete happens in streaming mode. */ - void synchronous(List prepareRequests); + void flush(List prepareRequests); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index a7e1e89ce4fc26a6fa669552544ac2e3f87eb05d..6c92af0bc7336f1fd36908d7230c887730945ce0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -161,7 +161,7 @@ public enum PersistenceTimer { HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); try { if (CollectionUtils.isNotEmpty(partition)) { - batchDAO.synchronous(partition); + batchDAO.flush(partition); } } catch (Throwable e) { log.error(e.getMessage(), e); diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java index 1fc73557ffb4cdd5c4b78094907b490b02c5fbee..da0b8e6f414d1b04680585c631135f1cf019ecd1 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java @@ -57,12 +57,12 @@ public class PersistenceTimerTest { moduleConfig.setPersistentPeriod(Integer.MAX_VALUE); IBatchDAO iBatchDAO = new IBatchDAO() { @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { } @Override - public void synchronous(List prepareRequests) { + public void flush(final List prepareRequests) { synchronized (result) { result.addAll(prepareRequests); } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 3727fd89166394bba888931d6cf4ebf19c2872cb..4bc5bab483ad9a73e2568df576cac423705e8b36 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -547,6 +547,10 @@ public class ElasticSearchClient implements Client, HealthCheckable { return response.getStatusLine().getStatusCode(); } + /** + * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future. + */ + @Deprecated public void synchronousBulk(BulkRequest request) { request.timeout(TimeValue.timeValueMinutes(2)); request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index 3db7cf772ad5bfe9624cc6865b93afc316411d66..8ca9e0e56bcb505f35707d6fe1f27b1e13680a84 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -42,7 +42,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { */ private int socketTimeout = 30000; /** - * Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES + * @since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES * storage creates new indexes in every day. * * @since 7.0.0 dayStep represents how many days a single one index represents. Default is 1, meaning no difference @@ -64,8 +64,16 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { private int superDatasetIndexReplicasNumber = 0; private int superDatasetIndexShardsFactor = 5; private int indexRefreshInterval = 2; - private int bulkActions = 2000; - private int flushInterval = 10; + /** + * @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy. + */ + private int bulkActions = 5000; + /** + * Period of flush, no matter `bulkActions` reached or not. Unit is second. + * + * @since 8.7.0 increase to 15s from 10s + */ + private int flushInterval = 15; private int concurrentRequests = 2; /** * @since 7.0.0 This could be managed inside {@link #secretsManagementFile} diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java index bcbe21b82492952d2be617c18ff4db9cc4c20067..fb8df0201a17e0628d3c72c1d1d1b4639fc4357e 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -19,22 +19,18 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.util.List; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { - - private static final Logger LOGGER = LoggerFactory.getLogger(BatchProcessEsDAO.class); - private BulkProcessor bulkProcessor; private final int bulkActions; private final int flushInterval; @@ -51,7 +47,7 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { } @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { if (bulkProcessor == null) { this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests); } @@ -60,17 +56,19 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { } @Override - public void synchronous(List prepareRequests) { + public void flush(List prepareRequests) { + if (bulkProcessor == null) { + this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests); + } + if (CollectionUtils.isNotEmpty(prepareRequests)) { - BulkRequest request = new BulkRequest(); for (PrepareRequest prepareRequest : prepareRequests) { if (prepareRequest instanceof InsertRequest) { - request.add((IndexRequest) prepareRequest); + this.bulkProcessor.add((IndexRequest) prepareRequest); } else { - request.add((UpdateRequest) prepareRequest); + this.bulkProcessor.add((UpdateRequest) prepareRequest); } } - getClient().synchronousBulk(request); } } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java index 7d7e6a9ea1090036dd981028b8b36388728010de..d8eb8cfd501251957e6605d1a904254a3076f2c7 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -162,9 +162,7 @@ public class StorageEsInstaller extends ModelInstaller { setting.put("index.number_of_shards", model.isSuperDataset() ? config.getIndexShardsNumber() * config.getSuperDatasetIndexShardsFactor() : config.getIndexShardsNumber()); - setting.put("index.refresh_interval", model.isRecord() - ? TimeValue.timeValueSeconds(10).toString() - : TimeValue.timeValueSeconds(config.getFlushInterval()).toString()); + setting.put("index.refresh_interval", TimeValue.timeValueSeconds(config.getFlushInterval()).toString()); setting.put("analysis", getAnalyzerSetting(model.getColumns())); if (!StringUtil.isEmpty(config.getAdvanced())) { Map advancedSettings = gson.fromJson(config.getAdvanced(), Map.class); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java index 2d720f0f4c17a9b5eb27d038674099a81aba6388..4129a8815f354f1f0c44a1932b3e72fc45a4833d 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java @@ -392,6 +392,10 @@ public class ElasticSearch7Client extends ElasticSearchClient { return HttpStatus.SC_OK; } + /** + * @since 8.7.0 SkyWalking don't use sync bulk anymore. This method is just kept for unexpected case in the future. + */ + @Deprecated @Override public void synchronousBulk(BulkRequest request) { request.timeout(TimeValue.timeValueMinutes(2)); diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java index 713a0b10cb1159efce8cbfbbbdf34151646802db..105383d422af17b3e742e14c7b9321d539553161 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java @@ -36,12 +36,12 @@ public class BatchDAO implements IBatchDAO { } @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { client.write(((InfluxInsertRequest) insertRequest).getPoint()); } @Override - public void synchronous(List prepareRequests) { + public void flush(List prepareRequests) { if (CollectionUtils.isEmpty(prepareRequests)) { return; } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java index 3539fe7556d8aeff5653c0caaac1b014dfc824bc..3a799f9398b4ce01c9cf241e203a7aec50d0101c 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java @@ -56,7 +56,7 @@ public class H2BatchDAO implements IBatchDAO { } @Override - public void synchronous(List prepareRequests) { + public void flush(List prepareRequests) { if (CollectionUtils.isEmpty(prepareRequests)) { return; } @@ -81,7 +81,7 @@ public class H2BatchDAO implements IBatchDAO { } @Override - public void asynchronous(InsertRequest insertRequest) { + public void insert(InsertRequest insertRequest) { this.dataCarrier.produce(insertRequest); } @@ -100,7 +100,7 @@ public class H2BatchDAO implements IBatchDAO { @Override public void consume(List prepareRequests) { - h2BatchDAO.synchronous(prepareRequests); + h2BatchDAO.flush(prepareRequests); } @Override diff --git a/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java b/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java index 110a8ce28b261e068914c067773bba26bef1f80f..f200ed8ae17562125d55692218c0027db74bb7fc 100644 --- a/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java +++ b/test/e2e/e2e-data/src/main/java/org/apache/skywalking/e2e/metrics/MetricsQuery.java @@ -158,10 +158,8 @@ public class MetricsQuery extends AbstractQuery { "envoy_worker_threads_max" }; - public static String METER_INSTANCE_PERSISTENCE_EXECUTE_PERCENTILE = "meter_oap_instance_persistence_execute_percentile"; - public static String[] ALL_SO11Y_LABELED_METRICS = { - METER_INSTANCE_PERSISTENCE_EXECUTE_PERCENTILE + // Nothing to check for now. }; private String id; private String metricsName;