From 5624417c076bc58b4c3f6374dd91f0f15f640f0a Mon Sep 17 00:00:00 2001 From: "Xin,Zhang" Date: Mon, 26 Mar 2018 10:05:29 +0800 Subject: [PATCH] [Collector] fix the collector cannot started (#987) --- .../elasticsearch/ElasticSearchClient.java | 68 +++++++++++++++---- .../ElasticSearchClientNotReadyException.java | 29 ++++++++ .../storage/es/StorageModuleEsProvider.java | 6 +- 3 files changed, 89 insertions(+), 14 deletions(-) create mode 100644 apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java index d58b95573..d3a3a97ca 100644 --- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java +++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java @@ -16,9 +16,13 @@ * */ - package org.apache.skywalking.apm.collector.client.elasticsearch; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; import org.apache.skywalking.apm.collector.client.Client; import org.apache.skywalking.apm.collector.client.ClientException; import org.apache.skywalking.apm.collector.core.util.StringUtils; @@ -43,12 +47,6 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.LinkedList; -import java.util.List; -import java.util.function.Consumer; - /** * @author peng-yongsheng */ @@ -58,16 +56,17 @@ public class ElasticSearchClient implements Client { private org.elasticsearch.client.Client client; - private final String namespace; - private final String clusterName; private final Boolean clusterTransportSniffer; private final String clusterNodes; - public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) { - this.namespace = namespace; + private boolean ready = false; + private String namespace; + + public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer, + String clusterNodes) { this.clusterName = clusterName; this.clusterTransportSniffer = clusterTransportSniffer; this.clusterNodes = clusterNodes; @@ -90,6 +89,8 @@ public class ElasticSearchClient implements Client { throw new ElasticSearchClientException(e.getMessage(), e); } } + + this.ready = true; } @Override @@ -110,6 +111,14 @@ public class ElasticSearchClient implements Client { return pairsList; } + public void setNamespace(String namespace) throws ElasticSearchClientException { + if (!ready) { + this.namespace = namespace; + } else { + throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready."); + } + } + class AddressPairs { private String host; private Integer port; @@ -121,6 +130,10 @@ public class ElasticSearchClient implements Client { } public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + IndicesAdminClient adminClient = client.admin().indices(); indexName = formatIndexName(indexName); CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get(); @@ -129,6 +142,10 @@ public class ElasticSearchClient implements Client { } public boolean deleteIndex(String indexName) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); IndicesAdminClient adminClient = client.admin().indices(); DeleteIndexResponse response = adminClient.prepareDelete(indexName).get(); @@ -137,6 +154,10 @@ public class ElasticSearchClient implements Client { } public boolean isExistsIndex(String indexName) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); IndicesAdminClient adminClient = client.admin().indices(); IndicesExistsResponse response = adminClient.prepareExists(indexName).get(); @@ -144,31 +165,55 @@ public class ElasticSearchClient implements Client { } public SearchRequestBuilder prepareSearch(String indexName) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); return client.prepareSearch(indexName); } public IndexRequestBuilder prepareIndex(String indexName, String id) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); return client.prepareIndex(indexName, "type", id); } public UpdateRequestBuilder prepareUpdate(String indexName, String id) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); return client.prepareUpdate(indexName, "type", id); } public GetRequestBuilder prepareGet(String indexName, String id) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); return client.prepareGet(indexName, "type", id); } public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + indexName = formatIndexName(indexName); return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName); } public MultiGetRequestBuilder prepareMultiGet(List rows, MultiGetRowHandler rowHandler) { + if (!ready) { + throw new ElasticSearchClientNotReadyException(); + } + MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet(); rowHandler.setPrepareMultiGet(prepareMultiGet); rowHandler.setNamespace(namespace); @@ -202,7 +247,6 @@ public class ElasticSearchClient implements Client { } } - public BulkRequestBuilder prepareBulk() { return client.prepareBulk(); } diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java new file mode 100644 index 000000000..7be7c53c6 --- /dev/null +++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.collector.client.elasticsearch; + +/** + * @author zhang xin + */ +public class ElasticSearchClientNotReadyException extends RuntimeException { + public ElasticSearchClientNotReadyException() { + super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient."); + } + +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index ff52285ae..fb371ae71 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -180,8 +180,7 @@ public class StorageModuleEsProvider extends ModuleProvider { String clusterName = config.getProperty(CLUSTER_NAME); Boolean clusterTransportSniffer = (Boolean) config.get(CLUSTER_TRANSPORT_SNIFFER); String clusterNodes = config.getProperty(CLUSTER_NODES); - String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace(); - elasticSearchClient = new ElasticSearchClient(namespace, clusterName, clusterTransportSniffer, clusterNodes); + elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes); this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient)); registerCacheDAO(); @@ -196,6 +195,9 @@ public class StorageModuleEsProvider extends ModuleProvider { Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER); Integer indexReplicasNumber = (Integer) config.get(INDEX_REPLICAS_NUMBER); try { + String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace(); + elasticSearchClient.setNamespace(namespace); + elasticSearchClient.initialize(); ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber); -- GitLab