未验证 提交 e69391a2 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support Secrets Management File in the ElasticSearch 6/7 storage (#4493)

* Temp commit

* Support secretsManagementFile file.

* Update doc.

* 1. Support JKS/pass runtime change too.
2. Follow review.

* Fix format.

* Fix username/password/trustPass haven't been updated in the es client.

* Fix doc issue.
Co-authored-by: NJared Tan <jian.tan@daocloud.io>
上级 80911d08
......@@ -90,6 +90,7 @@ storage:
# dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
......@@ -114,6 +115,7 @@ storage:
# dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
......
......@@ -44,6 +44,7 @@ storage:
# nameSpace: ${SW_NAMESPACE:""}
# user: ${SW_ES_USER:""} # User needs to be set when Http Basic authentication is enabled
# password: ${SW_ES_PASSWORD:""} # Password to be set when Http Basic authentication is enabled
# secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:""}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
......@@ -106,7 +107,8 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
advanced: ${SW_STORAGE_ES_ADVANCED:""}
```
- File at `trustStorePath` is being monitored, once it is changed, the ElasticSearch client will do reconnecting.
- `trustStorePass` could be changed on the runtime through [**Secrets Management File Of ElasticSearch Authentication**](#secrets-management-file-of-elasticsearch-authentication).
### Data TTL
TTL in ElasticSearch overrides the settings of core, read [ElasticSearch section in TTL document](ttl.md#elasticsearch-6-storage-ttl)
......@@ -124,6 +126,20 @@ Such as, if dayStep == 11,
NOTICE, TTL deletion would be affected by these. You should set an extra more dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you actually need to set TTL = 40;
### Secrets Management File Of ElasticSearch Authentication
The value of `secretsManagementFile` should point to the secrets management file absolute path.
The file includes username, password and JKS password of ElasticSearch server in the properties format.
```properties
user=xxx
password=yyy
trustStorePass=zzz
```
The major difference between using `user, password, trustStorePass` configs in the `application.yaml` file is, the **Secrets Management File** is being watched by the OAP server.
Once it is changed manually or through 3rd party tool, such as [Vault](https://github.com/hashicorp/vault),
the storage provider will use the new username, password and JKS password to establish the connection and close the old one. If the information exist in the file,
the `user/password` will be overrided.
### Advanced Configurations For Elasticsearch Index
You can add advanced configurations in `JSON` format to set `ElasticSearch index settings` by following [ElasticSearch doc](https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)
......
......@@ -87,6 +87,7 @@ storage:
# #trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -114,6 +115,7 @@ storage:
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Those data TTL settings will override the same settings in core module.
......
......@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
......@@ -94,11 +95,14 @@ public class ElasticSearchClient implements Client {
protected final String clusterNodes;
protected final String protocol;
private final String trustStorePath;
private final String trustStorePass;
private final String user;
private final String password;
@Setter
private volatile String trustStorePass;
@Setter
private volatile String user;
@Setter
private volatile String password;
private final List<IndexNameConverter> indexNameConverters;
protected RestHighLevelClient client;
protected volatile RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes,
String protocol,
......@@ -119,6 +123,13 @@ public class ElasticSearchClient implements Client {
@Override
public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
if (client != null) {
try {
client.close();
} catch (Throwable t) {
log.error("ElasticSearch client reconnection fails based on new config", t);
}
}
client = createClient(hosts);
client.ping();
}
......
......@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
/**
* MultipleFilesChangeMonitor provides the capability to detect file or multiple files changed. It provide second level
......@@ -68,7 +69,8 @@ public class MultipleFilesChangeMonitor {
*
* @param watchingPeriodInSec The check period.
* @param notifier to accept the file changed notification.
* @param files to be monitored.
* @param files to be monitored. If an element of list is NULL, the virtual(NULL) file is treated
* unchangeable.
*/
public MultipleFilesChangeMonitor(long watchingPeriodInSec,
FilesChangedNotifier notifier,
......@@ -77,7 +79,12 @@ public class MultipleFilesChangeMonitor {
this.watchingPeriodInSec = watchingPeriodInSec;
this.notifier = notifier;
for (final String file : files) {
WatchedFile monitor = new WatchedFile(file);
WatchedFile monitor;
if (StringUtil.isEmpty(file)) {
monitor = new NoopWatchedFile();
} else {
monitor = new WatchedFile(file);
}
watchedFiles.add(monitor);
}
}
......@@ -100,7 +107,11 @@ public class MultipleFilesChangeMonitor {
watchedFiles.forEach(file -> {
contents.add(file.fileContent);
});
notifier.filesChanged(contents);
try {
notifier.filesChanged(contents);
} catch (Exception e) {
log.error("Files=" + this + " notification process failure.", e);
}
}
}
......@@ -170,7 +181,7 @@ public class MultipleFilesChangeMonitor {
*
* @param readableContents include the new contents. NULL if the file doesn't exist.
*/
void filesChanged(List<byte[]> readableContents);
void filesChanged(List<byte[]> readableContents) throws Exception;
}
/**
......@@ -239,4 +250,18 @@ public class MultipleFilesChangeMonitor {
}
}
}
private static class NoopWatchedFile extends WatchedFile {
public NoopWatchedFile() {
super(null);
}
/**
* @return false, as an noop file never changes.
*/
@Override
boolean detectContentChanged() {
return false;
}
}
}
......@@ -45,16 +45,30 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int concurrentRequests = 2;
@Setter
private int syncBulkActions = 3;
/**
* @since 7.0.0 This could be managed inside {@link #secretsManagementFile}
*/
@Setter
private String user;
/**
* @since 7.0.0 This could be managed inside {@link #secretsManagementFile}
*/
@Setter
private String password;
/**
* Secrets management file includes the username, password, which are managed by 3rd party tool.
*/
@Getter
private String secretsManagementFile;
@Getter
@Setter
String trustStorePath;
private String trustStorePath;
/**
* @since 7.0.0 This could be managed inside {@link #secretsManagementFile}
*/
@Getter
@Setter
String trustStorePass;
private String trustStorePass;
/**
* If this is ON, downsampling indexes(hour and day precisions) merged into minute precision. In this case, only
* {@link #minuteMetricsDataTTL} works for minute, hour and day.
......@@ -75,7 +89,6 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
*
* Same as {@link #enablePackedDownsampling} this config doesn't affects month level data. Because usually, no one
* keeps the observability data in several months.
*
*/
@Getter
private int dayStep = 1;
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
......@@ -26,6 +27,7 @@ import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
......@@ -60,6 +62,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
......@@ -84,6 +87,9 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.Topol
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl.ElasticsearchStorageTTL;
/**
* The storage provider for ElasticSearch 6.
*/
public class StorageModuleElasticsearchProvider extends ModuleProvider {
protected final StorageModuleElasticsearchConfig config;
......@@ -117,6 +123,36 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
if (config.getDayStep() > 1) {
TimeSeriesUtils.setDAY_STEP(config.getDayStep());
}
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
10, readableContents -> {
final byte[] secretsFileContent = readableContents.get(0);
if (secretsFileContent == null) {
return;
}
Properties secrets = new Properties();
secrets.load(new ByteArrayInputStream(secretsFileContent));
config.setUser(secrets.getProperty("user", null));
config.setPassword(secrets.getProperty("password", null));
config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearchClient == null) {
// In the startup process, we just need to change the username/password
} else {
// The client has connected, updates the config and connects again.
elasticSearchClient.setUser(config.getUser());
elasticSearchClient.setPassword(config.getPassword());
elasticSearchClient.setTrustStorePass(config.getTrustStorePass());
elasticSearchClient.connect();
}
}, config.getSecretsManagementFile(), config.getTrustStorePass());
/**
* By leveraging the sync update check feature when startup.
*/
monitor.start();
}
elasticSearchClient = new ElasticSearchClient(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
......
......@@ -18,11 +18,13 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Properties;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
......@@ -52,6 +54,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
......@@ -78,6 +81,9 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.Trac
import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider.indexNameConverters;
/**
* The storage provider for ElasticSearch 7.
*/
public class StorageModuleElasticsearch7Provider extends ModuleProvider {
protected final StorageModuleElasticsearch7Config config;
......@@ -108,6 +114,35 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
10, readableContents -> {
final byte[] secretsFileContent = readableContents.get(0);
if (secretsFileContent == null) {
return;
}
Properties secrets = new Properties();
secrets.load(new ByteArrayInputStream(secretsFileContent));
config.setUser(secrets.getProperty("user", null));
config.setPassword(secrets.getProperty("password", null));
config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearch7Client == null) {
//In the startup process, we just need to change the username/password
} else {
// The client has connected, updates the config and connects again.
elasticSearch7Client.setUser(config.getUser());
elasticSearch7Client.setPassword(config.getPassword());
elasticSearch7Client.setTrustStorePass(config.getTrustStorePass());
elasticSearch7Client.connect();
}
}, config.getSecretsManagementFile(), config.getTrustStorePass());
/**
* By leveraging the sync update check feature when startup.
*/
monitor.start();
}
elasticSearch7Client = new ElasticSearch7Client(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
......
......@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -62,13 +63,12 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
@Slf4j
public class ElasticSearch7Client extends ElasticSearchClient {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Client.class);
public ElasticSearch7Client(final String clusterNodes,
final String protocol,
final String trustStorePath,
......@@ -84,6 +84,13 @@ public class ElasticSearch7Client extends ElasticSearchClient {
@Override
public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
if (client != null) {
try {
client.close();
} catch (Throwable t) {
log.error("ElasticSearch7 client reconnection fails based on new config", t);
}
}
List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
client = createClient(hosts);
client.ping(RequestOptions.DEFAULT);
......@@ -94,7 +101,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
......@@ -105,7 +112,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
request.settings(settings);
request.mapping(mapping);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
......@@ -127,7 +134,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
......@@ -234,7 +241,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
deleteByQueryRequest.setAbortOnVersionConflict(false);
deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
logger.debug(
log.debug(
"delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest,
bulkByScrollResponse
);
......@@ -248,9 +255,9 @@ public class ElasticSearch7Client extends ElasticSearchClient {
try {
int size = request.requests().size();
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
} catch (IOException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册