提交 dbbf22a1 编写于 作者: J Jared Tan 提交者: Gao Hongtao

support es https ssl config. (#3370)

* support es https ssl config.

* update

* update docs.

* refactoring var name.

* fix

* update docs.

* add config in es section.

* fix logical.
上级 2a2679ee
......@@ -39,6 +39,8 @@ storage:
# nameSpace: ${SW_NAMESPACE:""}
# user: ${SW_ES_USER:""} # User needs to be set when Http Basic authentication is enabled
# password: ${SW_ES_PASSWORD:""} # Password to be set when Http Basic authentication is enabled
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:""}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -54,6 +56,34 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### ElasticSearch 6 With Https SSL Encrypting communications.
example:
```yaml
storage:
elasticsearch:
# nameSpace: ${SW_NAMESPACE:""}
user: ${SW_ES_USER:""} # User needs to be set when Http Basic authentication is enabled
password: ${SW_ES_PASSWORD:""} # Password to be set when Http Basic authentication is enabled
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:443}
trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"https"}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Those data TTL settings will override the same settings in core module.
recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```
### Data TTL
TTL in ElasticSearch overrides the settings of core, read [ElasticSearch section in TTL document](ttl.md#elasticsearch-6-storage-ttl)
......
......@@ -25,7 +25,7 @@ import java.io.IOException;
*/
public interface Client {
void connect() throws IOException;
void connect() throws Exception;
void shutdown() throws IOException;
}
......@@ -23,12 +23,21 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
......@@ -44,6 +53,8 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.skywalking.oap.server.library.client.Client;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
......@@ -84,30 +95,50 @@ public class ElasticSearchClient implements Client {
public static final String TYPE = "type";
private final String clusterNodes;
private final String protocol;
private final String trustStorePath;
private final String trustStorePass;
private final String namespace;
private final String user;
private final String password;
protected RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes, String protocol, String namespace, String user, String password) {
public ElasticSearchClient(String clusterNodes, String protocol, String trustStorePath, String trustStorePass,
String namespace, String user, String password) {
this.clusterNodes = clusterNodes;
this.protocol = protocol;
this.namespace = namespace;
this.user = user;
this.password = password;
this.trustStorePath = trustStorePath;
this.trustStorePass = trustStorePass;
}
@Override public void connect() throws IOException {
@Override
public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
RestClientBuilder builder;
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
if (StringUtils.isBlank(trustStorePath)) {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(Paths.get(trustStorePath))) {
truststore.load(is, trustStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom()
.loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLContext(sslContext));
}
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
client = new RestHighLevelClient(builder);
client.ping();
}
......@@ -163,9 +194,10 @@ public class ElasticSearchClient implements Client {
}
/**
* If your indexName is retrieved from elasticsearch through {@link #retrievalIndexByAliases(String)} or some other method and it already contains namespace.
* Then you should delete the index by this method, this method will no longer concatenate namespace.
*
* If your indexName is retrieved from elasticsearch through {@link #retrievalIndexByAliases(String)} or some other
* method and it already contains namespace. Then you should delete the index by this method, this method will no
* longer concatenate namespace.
* <p>
* https://github.com/apache/skywalking/pull/3017
*/
public boolean deleteByIndexName(String indexName) throws IOException {
......@@ -173,9 +205,9 @@ public class ElasticSearchClient implements Client {
}
/**
* If your indexName is obtained from metadata or configuration and without namespace.
* Then you should delete the index by this method, this method automatically concatenates namespace.
*
* If your indexName is obtained from metadata or configuration and without namespace. Then you should delete the
* index by this method, this method automatically concatenates namespace.
* <p>
* https://github.com/apache/skywalking/pull/3017
*/
public boolean deleteByModelName(String modelName) throws IOException {
......
......@@ -18,9 +18,13 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.*;
import java.io.*;
import java.util.*;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpGet;
......@@ -29,13 +33,19 @@ import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
......@@ -57,10 +67,10 @@ public class ITElasticSearchClient {
}
@Before
public void before() throws IOException {
public void before() throws Exception {
final String esAddress = System.getProperty("elastic.search.address");
final String esProtocol = System.getProperty("elastic.search.protocol");
client = new ElasticSearchClient(esAddress, esProtocol, namespace, "test", "test");
client = new ElasticSearchClient(esAddress, esProtocol, "", "", namespace, "test", "test");
client.connect();
}
......
......@@ -74,6 +74,8 @@ storage:
# nameSpace: ${SW_NAMESPACE:""}
# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -73,6 +73,8 @@ storage:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -18,7 +18,8 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import lombok.*;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
......@@ -38,6 +39,8 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter private int syncBulkActions = 3;
@Setter private String user;
@Setter private String password;
@Getter @Setter String trustStorePath;
@Getter @Setter String trustStorePass;
@Setter private int metadataQueryMaxSize = 5000;
@Setter private int segmentQueryMaxSize = 200;
@Setter private int recordDataTTL = 7;
......
......@@ -19,18 +19,55 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.EndpointInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockDAOImpl;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AlarmQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.LogQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl.ElasticsearchStorageTTL;
/**
......@@ -66,7 +103,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getProtocol(), config.getNameSpace(), config.getUser(), config.getPassword());
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
......@@ -100,7 +137,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
lockInstaller.install();
} catch (StorageException | IOException e) {
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册