From 536b7d2328e836b3ba82347aa9a907d4944ece6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Thu, 1 Jul 2021 08:20:59 +0800 Subject: [PATCH] Support connectTimeout and socketTimeout settings (#7214) --- CHANGES.md | 1 + docs/en/setup/backend/configuration-vocabulary.md | 4 ++++ .../src/main/resources/application.yml | 4 ++++ .../client/elasticsearch/ElasticSearchClient.java | 13 ++++++++++++- .../client/elasticsearch/ITElasticSearchClient.java | 2 +- .../StorageModuleElasticsearchConfig.java | 12 ++++++++++++ .../StorageModuleElasticsearchProvider.java | 4 ++-- .../StorageModuleElasticsearch7Provider.java | 2 +- .../elasticsearch7/client/ElasticSearch7Client.java | 9 ++++----- 9 files changed, 41 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a6a4609145..589459b72c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,6 +61,7 @@ Release Notes. * Performance: enhance persistent session mechanism, about differentiating cache timeout for different dimensionality metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min. * Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC. +* Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages. #### UI diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 3fa93769fb..d5be1dacb7 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -81,6 +81,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | nameSpace | Prefix of indexes created and used by SkyWalking. | SW_NAMESPACE | - | | - | - | clusterNodes | ElasticSearch cluster nodes for client connection.| SW_STORAGE_ES_CLUSTER_NODES |localhost| | - | - | protocol | HTTP or HTTPs. | SW_STORAGE_ES_HTTP_PROTOCOL | HTTP| +| - | - | connectTimeout | Connect timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_CONNECT_TIMEOUT | 500| +| - | - | socketTimeout | Socket timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_SOCKET_TIMEOUT | 30000| | - | - | user| User name of ElasticSearch cluster| SW_ES_USER | - | | - | - | password | Password of ElasticSearch cluster | SW_ES_PASSWORD | - | | - | - | trustStorePath | Trust JKS file path. Only work when user name and password opened | SW_STORAGE_ES_SSL_JKS_PATH | - | @@ -104,6 +106,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | nameSpace | Prefix of indexes created and used by SkyWalking. | SW_NAMESPACE | - | | - | - | clusterNodes | ElasticSearch cluster nodes for client connection.| SW_STORAGE_ES_CLUSTER_NODES |localhost| | - | - | protocol | HTTP or HTTPs. | SW_STORAGE_ES_HTTP_PROTOCOL | HTTP| +| - | - | connectTimeout | Connect timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_CONNECT_TIMEOUT | 500| +| - | - | socketTimeout | Socket timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_SOCKET_TIMEOUT | 30000| | - | - | user| User name of ElasticSearch cluster| SW_ES_USER | - | | - | - | password | Password of ElasticSearch cluster | SW_ES_PASSWORD | - | | - | - | trustStorePath | Trust JKS file path. Only work when user name and password opened | SW_STORAGE_ES_SSL_JKS_PATH | - | diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 2f888fd12c..71769c0bd8 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -120,6 +120,8 @@ storage: nameSpace: ${SW_NAMESPACE:""} clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"} + connectTimeout: ${SW_STORAGE_ES_CONNECT_TIMEOUT:500} + socketTimeout: ${SW_STORAGE_ES_SOCKET_TIMEOUT:30000} user: ${SW_ES_USER:""} password: ${SW_ES_PASSWORD:""} trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""} @@ -146,6 +148,8 @@ storage: nameSpace: ${SW_NAMESPACE:""} clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200} protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"} + connectTimeout: ${SW_STORAGE_ES_CONNECT_TIMEOUT:500} + socketTimeout: ${SW_STORAGE_ES_SOCKET_TIMEOUT:30000} trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""} trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""} dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index. diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 503fa30640..3727fd8916 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -119,6 +119,8 @@ public class ElasticSearchClient implements Client, HealthCheckable { protected volatile RestHighLevelClient client; protected DelegatedHealthChecker healthChecker = new DelegatedHealthChecker(); protected final ReentrantLock connectLock = new ReentrantLock(); + private final int connectTimeout; + private final int socketTimeout; public ElasticSearchClient(String clusterNodes, String protocol, @@ -126,7 +128,9 @@ public class ElasticSearchClient implements Client, HealthCheckable { String trustStorePass, String user, String password, - List indexNameConverters) { + List indexNameConverters, + int connectTimeout, + int socketTimeout) { this.clusterNodes = clusterNodes; this.protocol = protocol; this.user = user; @@ -134,6 +138,8 @@ public class ElasticSearchClient implements Client, HealthCheckable { this.indexNameConverters = indexNameConverters; this.trustStorePath = trustStorePath; this.trustStorePass = trustStorePass; + this.connectTimeout = connectTimeout; + this.socketTimeout = socketTimeout; } @Override @@ -183,6 +189,11 @@ public class ElasticSearchClient implements Client, HealthCheckable { } else { builder = RestClient.builder(pairsList.toArray(new HttpHost[0])); } + builder.setRequestConfigCallback( + requestConfigBuilder -> requestConfigBuilder + .setConnectTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + ); return new RestHighLevelClient(builder); } diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java index 8a4b3cae52..44d97e8bbe 100644 --- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java @@ -69,7 +69,7 @@ public class ITElasticSearchClient { final String esAddress = System.getProperty("elastic.search.address"); final String esProtocol = System.getProperty("elastic.search.protocol"); client = new ElasticSearchClient(esAddress, esProtocol, "", "", "test", "test", - indexNameConverters(namespace) + indexNameConverters(namespace), 500, 6000 ); client.connect(); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index 126efc2fc2..3db7cf772a 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -29,6 +29,18 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { private String nameSpace; private String clusterNodes; String protocol = "http"; + /** + * Connect timeout of ElasticSearch client. + * + * @since 8.7.0 + */ + private int connectTimeout = 500; + /** + * Socket timeout of ElasticSearch client. + * + * @since 8.7.0 + */ + private int socketTimeout = 30000; /** * Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES * storage creates new indexes in every day. diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 5b8019f457..ce76619b97 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -163,11 +163,11 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { elasticSearchClient = new ElasticSearchClient( config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config .getTrustStorePass(), config.getUser(), config.getPassword(), - indexNameConverters(config.getNameSpace()) + indexNameConverters(config.getNameSpace()), config.getConnectTimeout(), config.getSocketTimeout() ); this.registerServiceImplementation( IBatchDAO.class, - new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config + new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config .getFlushInterval(), config.getConcurrentRequests()) ); this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient)); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java index b0eebef90e..2eb218e81f 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java @@ -161,7 +161,7 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider { elasticSearch7Client = new ElasticSearch7Client( config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config .getTrustStorePass(), config.getUser(), config.getPassword(), - indexNameConverters(config.getNameSpace()) + indexNameConverters(config.getNameSpace()), config.getConnectTimeout(), config.getSocketTimeout() ); this.registerServiceImplementation( IBatchDAO.class, diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java index c0e0d72c30..2d720f0f4c 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java @@ -78,9 +78,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; -/** - * - */ @Slf4j public class ElasticSearch7Client extends ElasticSearchClient { public ElasticSearch7Client(final String clusterNodes, @@ -89,10 +86,12 @@ public class ElasticSearch7Client extends ElasticSearchClient { final String trustStorePass, final String user, final String password, - List indexNameConverters) { + List indexNameConverters, + int connectTimeout, + int socketTimeout) { super( clusterNodes, protocol, trustStorePath, trustStorePass, user, password, - indexNameConverters + indexNameConverters, connectTimeout, socketTimeout ); } -- GitLab