提交 02affffa 编写于 作者: J Jared Tan 提交者: wu-sheng

support elasticsearch address configuration with http scheme. (#3236)

* support elasticsearch address configuration with http scheme.

* update logical.
上级 1bfda1dd
......@@ -83,6 +83,7 @@ storage:
elasticsearch:
nameSpace: \${SW_NAMESPACE:""}
clusterNodes: \${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: \${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
user: \${SW_ES_USER:""}
password: \${SW_ES_PASSWORD:""}
indexShardsNumber: \${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -40,6 +40,7 @@ storage:
# user: ${SW_ES_USER:""} # User needs to be set when Http Basic authentication is enabled
# password: ${SW_ES_PASSWORD:""} # Password to be set when Http Basic authentication is enabled
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Those data TTL settings will override the same settings in core module.
......@@ -70,6 +71,7 @@ storage:
zipkin-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -93,6 +95,7 @@ storage:
jaeger-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -134,6 +134,9 @@
<elastic.search.address>
${docker.hostname}:${es-port}
</elastic.search.address>
<elastic.search.protocol>
http
</elastic.search.protocol>
</systemPropertyVariables>
</configuration>
<executions>
......
......@@ -18,33 +18,61 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.*;
import java.io.*;
import java.util.*;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.*;
import org.apache.http.auth.*;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.*;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client;
import org.elasticsearch.action.admin.indices.create.*;
import org.elasticsearch.action.admin.indices.delete.*;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.*;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
......@@ -55,13 +83,15 @@ public class ElasticSearchClient implements Client {
public static final String TYPE = "type";
private final String clusterNodes;
private final String protocol;
private final String namespace;
private final String user;
private final String password;
protected RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes, String namespace, String user, String password) {
public ElasticSearchClient(String clusterNodes, String protocol, String namespace, String user, String password) {
this.clusterNodes = clusterNodes;
this.protocol = protocol;
this.namespace = namespace;
this.user = user;
this.password = password;
......@@ -89,11 +119,12 @@ public class ElasticSearchClient implements Client {
private List<HttpHost> parseClusterNodes(String nodes) {
List<HttpHost> httpHosts = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
String[] nodesSplit = nodes.split(",");
List<String> nodesSplit = Splitter.on(",").omitEmptyStrings().splitToList(nodes);
for (String node : nodesSplit) {
String host = node.split(":")[0];
String port = node.split(":")[1];
httpHosts.add(new HttpHost(host, Integer.valueOf(port)));
httpHosts.add(new HttpHost(host, Integer.parseInt(port), protocol));
}
return httpHosts;
......
......@@ -59,7 +59,8 @@ public class ITElasticSearchClient {
@Before
public void before() throws IOException {
final String esAddress = System.getProperty("elastic.search.address");
client = new ElasticSearchClient(esAddress, namespace, "test", "test");
final String esProtocol = System.getProperty("elastic.search.protocol");
client = new ElasticSearchClient(esAddress, esProtocol, namespace, "test", "test");
client.connect();
}
......
......@@ -69,6 +69,7 @@ storage:
# elasticsearch:
# nameSpace: ${SW_NAMESPACE:""}
# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -69,6 +69,7 @@ storage:
elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter private String nameSpace;
@Setter private String clusterNodes;
@Getter @Setter String protocol = "http";
@Setter private int indexShardsNumber = 2;
@Setter private int indexReplicasNumber = 0;
@Setter private int indexRefreshInterval = 2;
......
......@@ -66,7 +66,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getProtocol(), config.getNameSpace(), config.getUser(), config.getPassword());
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册