提交 5624417c 编写于 作者: X Xin,Zhang 提交者: wu-sheng

[Collector] fix the collector cannot started (#987)

上级 061a4d4c
......@@ -16,9 +16,13 @@
*
*/
package org.apache.skywalking.apm.collector.client.elasticsearch;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
......@@ -43,12 +47,6 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
/**
* @author peng-yongsheng
*/
......@@ -58,16 +56,17 @@ public class ElasticSearchClient implements Client {
private org.elasticsearch.client.Client client;
private final String namespace;
private final String clusterName;
private final Boolean clusterTransportSniffer;
private final String clusterNodes;
public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
this.namespace = namespace;
private boolean ready = false;
private String namespace;
public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer,
String clusterNodes) {
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
......@@ -90,6 +89,8 @@ public class ElasticSearchClient implements Client {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
this.ready = true;
}
@Override
......@@ -110,6 +111,14 @@ public class ElasticSearchClient implements Client {
return pairsList;
}
public void setNamespace(String namespace) throws ElasticSearchClientException {
if (!ready) {
this.namespace = namespace;
} else {
throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready.");
}
}
class AddressPairs {
private String host;
private Integer port;
......@@ -121,6 +130,10 @@ public class ElasticSearchClient implements Client {
}
public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
IndicesAdminClient adminClient = client.admin().indices();
indexName = formatIndexName(indexName);
CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
......@@ -129,6 +142,10 @@ public class ElasticSearchClient implements Client {
}
public boolean deleteIndex(String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
......@@ -137,6 +154,10 @@ public class ElasticSearchClient implements Client {
}
public boolean isExistsIndex(String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
......@@ -144,31 +165,55 @@ public class ElasticSearchClient implements Client {
}
public SearchRequestBuilder prepareSearch(String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, "type", id);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareUpdate(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareGet(indexName, "type", id);
}
public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
}
public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
rowHandler.setPrepareMultiGet(prepareMultiGet);
rowHandler.setNamespace(namespace);
......@@ -202,7 +247,6 @@ public class ElasticSearchClient implements Client {
}
}
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.client.elasticsearch;
/**
* @author zhang xin
*/
public class ElasticSearchClientNotReadyException extends RuntimeException {
public ElasticSearchClientNotReadyException() {
super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient.");
}
}
......@@ -180,8 +180,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
String clusterName = config.getProperty(CLUSTER_NAME);
Boolean clusterTransportSniffer = (Boolean) config.get(CLUSTER_TRANSPORT_SNIFFER);
String clusterNodes = config.getProperty(CLUSTER_NODES);
String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
elasticSearchClient = new ElasticSearchClient(namespace, clusterName, clusterTransportSniffer, clusterNodes);
elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
......@@ -196,6 +195,9 @@ public class StorageModuleEsProvider extends ModuleProvider {
Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER);
Integer indexReplicasNumber = (Integer) config.get(INDEX_REPLICAS_NUMBER);
try {
String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
elasticSearchClient.setNamespace(namespace);
elasticSearchClient.initialize();
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册