From e736102594ee08c195793c089faad0fe5f4d3111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Sun, 22 Apr 2018 16:14:27 +0800 Subject: [PATCH] Fixed #1090 (#1101) * Fixed #1090 1. Move setting the performance from create table action to install action. 2. Add check step that avoid filed type not match the definition when project upgraded. * Setting namespace by class construct. --- .../collector/boot/CollectorBootStartUp.java | 3 +- .../elasticsearch/ElasticSearchClient.java | 75 ++++++------------- .../ElasticSearchClientException.java | 9 +-- .../collector/core/module/BootstrapFlow.java | 3 +- .../collector/core/module/ModuleManager.java | 2 +- .../collector/core/module/ModuleProvider.java | 2 +- .../core/module/ModuleStartException.java} | 12 +-- .../core/module/ModuleManagerTest.java | 3 +- .../storage/StorageInstallException.java | 1 - .../collector/storage/StorageInstaller.java | 27 +++++-- .../storage/es/StorageModuleEsProvider.java | 14 +--- .../define/ElasticSearchStorageInstaller.java | 48 ++++++++---- .../storage/h2/StorageModuleH2Provider.java | 2 +- .../h2/base/define/H2StorageInstaller.java | 14 +++- 14 files changed, 109 insertions(+), 106 deletions(-) rename apm-collector/{apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java => apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java} (69%) diff --git a/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java b/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java index ff9dd5a24..83f3b6972 100644 --- a/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java +++ b/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.collector.core.module.ApplicationConfiguration; import org.apache.skywalking.apm.collector.core.module.ModuleConfigException; import org.apache.skywalking.apm.collector.core.module.ModuleManager; import org.apache.skywalking.apm.collector.core.module.ModuleNotFoundException; +import org.apache.skywalking.apm.collector.core.module.ModuleStartException; import org.apache.skywalking.apm.collector.core.module.ProviderNotFoundException; import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.slf4j.Logger; @@ -42,7 +43,7 @@ public class CollectorBootStartUp { try { ApplicationConfiguration applicationConfiguration = configLoader.load(); manager.init(applicationConfiguration); - } catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException e) { + } catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException | ModuleStartException e) { logger.error(e.getMessage(), e); System.exit(1); } diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java index b41b7c8d2..91bc98117 100644 --- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java +++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java @@ -25,10 +25,13 @@ import java.util.List; import java.util.function.Consumer; import org.apache.skywalking.apm.collector.client.Client; import org.apache.skywalking.apm.collector.client.ClientException; +import org.apache.skywalking.apm.collector.core.data.CommonTable; +import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.core.util.StringUtils; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.MultiGetRequestBuilder; @@ -62,14 +65,22 @@ public class ElasticSearchClient implements Client { private final String clusterNodes; - private boolean ready = false; - private String namespace; + private final String namespace; public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer, String clusterNodes) { this.clusterName = clusterName; this.clusterTransportSniffer = clusterTransportSniffer; this.clusterNodes = clusterNodes; + this.namespace = Const.EMPTY_STRING; + } + + public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer, + String clusterNodes, String namespace) { + this.clusterName = clusterName; + this.clusterTransportSniffer = clusterTransportSniffer; + this.clusterNodes = clusterNodes; + this.namespace = namespace; } @Override @@ -89,8 +100,6 @@ public class ElasticSearchClient implements Client { throw new ElasticSearchClientException(e.getMessage(), e); } } - - this.ready = true; } @Override @@ -111,14 +120,6 @@ public class ElasticSearchClient implements Client { return pairsList; } - public void setNamespace(String namespace) throws ElasticSearchClientException { - if (!ready) { - this.namespace = namespace; - } else { - throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready."); - } - } - class AddressPairs { private String host; private Integer port; @@ -130,10 +131,6 @@ public class ElasticSearchClient implements Client { } public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - IndicesAdminClient adminClient = client.admin().indices(); indexName = formatIndexName(indexName); CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get(); @@ -142,10 +139,6 @@ public class ElasticSearchClient implements Client { } public boolean deleteIndex(String indexName) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - indexName = formatIndexName(indexName); IndicesAdminClient adminClient = client.admin().indices(); DeleteIndexResponse response = adminClient.prepareDelete(indexName).get(); @@ -154,10 +147,6 @@ public class ElasticSearchClient implements Client { } public boolean isExistsIndex(String indexName) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - indexName = formatIndexName(indexName); IndicesAdminClient adminClient = client.admin().indices(); IndicesExistsResponse response = adminClient.prepareExists(indexName).get(); @@ -165,60 +154,42 @@ public class ElasticSearchClient implements Client { } public SearchRequestBuilder prepareSearch(String indexName) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - indexName = formatIndexName(indexName); return client.prepareSearch(indexName); } public IndexRequestBuilder prepareIndex(String indexName, String id) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } + indexName = formatIndexName(indexName); + return client.prepareIndex(indexName, CommonTable.TABLE_TYPE, id); + } + public GetFieldMappingsResponse.FieldMappingMetaData prepareGetMappings(String indexName, String fieldName) { indexName = formatIndexName(indexName); - return client.prepareIndex(indexName, "type", id); + GetFieldMappingsResponse response = client.admin().indices().prepareGetFieldMappings(indexName).setFields(fieldName).get(); + return response.fieldMappings(indexName, CommonTable.TABLE_TYPE, fieldName); } public UpdateRequestBuilder prepareUpdate(String indexName, String id) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - indexName = formatIndexName(indexName); - return client.prepareUpdate(indexName, "type", id); + return client.prepareUpdate(indexName, CommonTable.TABLE_TYPE, id); } public GetRequestBuilder prepareGet(String indexName, String id) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - indexName = formatIndexName(indexName); - return client.prepareGet(indexName, "type", id); + return client.prepareGet(indexName, CommonTable.TABLE_TYPE, id); } public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - indexName = formatIndexName(indexName); return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName); } public MultiGetRequestBuilder prepareMultiGet(List rows, MultiGetRowHandler rowHandler) { - if (!ready) { - throw new ElasticSearchClientNotReadyException(); - } - MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet(); rowHandler.setPrepareMultiGet(prepareMultiGet); rowHandler.setNamespace(namespace); - rows.forEach(row -> rowHandler.accept(row)); + rows.forEach(rowHandler::accept); return rowHandler.getPrepareMultiGet(); } @@ -255,7 +226,7 @@ public class ElasticSearchClient implements Client { private static String formatIndexName(String namespace, String indexName) { if (StringUtils.isNotEmpty(namespace)) { - return namespace + "_" + indexName; + return namespace + Const.ID_SPLIT + indexName; } return indexName; } diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java index 9691839c4..5568d733c 100644 --- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java +++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java @@ -16,7 +16,6 @@ * */ - package org.apache.skywalking.apm.collector.client.elasticsearch; import org.apache.skywalking.apm.collector.client.ClientException; @@ -24,12 +23,8 @@ import org.apache.skywalking.apm.collector.client.ClientException; /** * @author peng-yongsheng */ -public class ElasticSearchClientException extends ClientException { - public ElasticSearchClientException(String message) { - super(message); - } - - public ElasticSearchClientException(String message, Throwable cause) { +class ElasticSearchClientException extends ClientException { + ElasticSearchClientException(String message, Throwable cause) { super(message, cause); } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java index 8a3b91d90..6dc69de39 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java +++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java @@ -43,7 +43,8 @@ class BootstrapFlow { } @SuppressWarnings("unchecked") - void start(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException { + void start( + ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException { for (ModuleProvider provider : startupSequence) { String[] requiredModules = provider.requiredModules(); if (requiredModules != null) { diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java index b09b42a0d..21df5a836 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java +++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java @@ -37,7 +37,7 @@ public class ModuleManager { * Init the given modules */ public void init( - ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException { + ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException { String[] moduleNames = applicationConfiguration.moduleList(); ServiceLoader moduleServiceLoader = ServiceLoader.load(Module.class); LinkedList moduleList = new LinkedList<>(Arrays.asList(moduleNames)); diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java index c16ad3530..5e2b8853d 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java +++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java @@ -71,7 +71,7 @@ public abstract class ModuleProvider { /** * In start stage, the module has been ready for interop. */ - public abstract void start() throws ServiceNotProvidedException; + public abstract void start() throws ServiceNotProvidedException, ModuleStartException; /** * This callback executes after all modules start up successfully. diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java similarity index 69% rename from apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java rename to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java index 7be7c53c6..725327286 100644 --- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java +++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java @@ -16,14 +16,14 @@ * */ -package org.apache.skywalking.apm.collector.client.elasticsearch; +package org.apache.skywalking.apm.collector.core.module; /** - * @author zhang xin + * @author peng-yongsheng */ -public class ElasticSearchClientNotReadyException extends RuntimeException { - public ElasticSearchClientNotReadyException() { - super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient."); - } +public class ModuleStartException extends Exception { + public ModuleStartException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java b/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java index 7b2f32b6c..a00feb7bf 100644 --- a/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java +++ b/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java @@ -16,7 +16,6 @@ * */ - package org.apache.skywalking.apm.collector.core.module; import java.util.Properties; @@ -28,7 +27,7 @@ import org.junit.Test; */ public class ModuleManagerTest { @Test - public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException { + public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException, ModuleStartException { ApplicationConfiguration configuration = new ApplicationConfiguration(); configuration.addModule("Test").addProviderConfiguration("TestModule-Provider", new Properties()); configuration.addModule("BaseA").addProviderConfiguration("P-A", new Properties()); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java index 1671769a6..4f99bfed7 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java @@ -16,7 +16,6 @@ * */ - package org.apache.skywalking.apm.collector.storage; /** diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java index 675eb7ec4..3ad986d12 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java @@ -16,14 +16,13 @@ * */ - package org.apache.skywalking.apm.collector.storage; import java.util.List; -import org.apache.skywalking.apm.collector.core.data.StorageDefineLoader; import org.apache.skywalking.apm.collector.client.Client; -import org.apache.skywalking.apm.collector.core.define.DefineException; +import org.apache.skywalking.apm.collector.core.data.StorageDefineLoader; import org.apache.skywalking.apm.collector.core.data.TableDefine; +import org.apache.skywalking.apm.collector.core.define.DefineException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +33,12 @@ public abstract class StorageInstaller { private final Logger logger = LoggerFactory.getLogger(StorageInstaller.class); + private final boolean isHighPerformanceMode; + + public StorageInstaller(boolean isHighPerformanceMode) { + this.isHighPerformanceMode = isHighPerformanceMode; + } + public final void install(Client client) throws StorageException { StorageDefineLoader defineLoader = new StorageDefineLoader(); try { @@ -43,6 +48,7 @@ public abstract class StorageInstaller { for (TableDefine tableDefine : tableDefines) { tableDefine.initialize(); + settingHighPerformance(tableDefine); if (!isExists(client, tableDefine)) { logger.info("table: {} not exists", tableDefine.getName()); createTable(client, tableDefine); @@ -51,17 +57,28 @@ public abstract class StorageInstaller { deleteTable(client, tableDefine); createTable(client, tableDefine); } + columnCheck(client, tableDefine); } } catch (DefineException e) { throw new StorageInstallException(e.getMessage(), e); } } + private void settingHighPerformance(TableDefine tableDefine) { + tableDefine.getColumnDefines().forEach(column -> { + if (isHighPerformanceMode) { + column.getColumnName().useShortName(); + } + }); + } + protected abstract void defineFilter(List tableDefines); protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException; - protected abstract boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException; + protected abstract void columnCheck(Client client, TableDefine tableDefine) throws StorageException; + + protected abstract void deleteTable(Client client, TableDefine tableDefine) throws StorageException; - protected abstract boolean createTable(Client client, TableDefine tableDefine) throws StorageException; + protected abstract void createTable(Client client, TableDefine tableDefine) throws StorageException; } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index 12d610af6..caf11ebb0 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -29,6 +29,7 @@ import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfi import org.apache.skywalking.apm.collector.core.module.Module; import org.apache.skywalking.apm.collector.core.module.ModuleConfig; import org.apache.skywalking.apm.collector.core.module.ModuleProvider; +import org.apache.skywalking.apm.collector.core.module.ModuleStartException; import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.apache.skywalking.apm.collector.remote.RemoteModule; import org.apache.skywalking.apm.collector.storage.StorageException; @@ -242,16 +243,12 @@ import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceAlarmEsUIDAO import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceMetricEsUIDAO; import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceNameServiceEsUIDAO; import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceReferenceEsMetricUIDAO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class StorageModuleEsProvider extends ModuleProvider { - private static final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class); - static final String NAME = "elasticsearch"; private final StorageModuleEsConfig config; private ElasticSearchClient elasticSearchClient; @@ -275,8 +272,6 @@ public class StorageModuleEsProvider extends ModuleProvider { } @Override public void prepare() throws ServiceNotProvidedException { - elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes()); - this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient)); registerCacheDAO(); registerRegisterDAO(); @@ -286,17 +281,16 @@ public class StorageModuleEsProvider extends ModuleProvider { } @Override - public void start() { + public void start() throws ModuleStartException { try { String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace(); - elasticSearchClient.setNamespace(namespace); - + elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), namespace); elasticSearchClient.initialize(); ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(config.getIndexShardsNumber(), config.getIndexReplicasNumber(), config.isHighPerformanceMode()); installer.install(elasticSearchClient); } catch (ClientException | StorageException e) { - logger.error(e.getMessage(), e); + throw new ModuleStartException(e.getMessage(), e); } String uuId = UUID.randomUUID().toString(); diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java index 66510bf99..b7a21c1ee 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java @@ -20,15 +20,19 @@ package org.apache.skywalking.apm.collector.storage.es.base.define; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.skywalking.apm.collector.client.Client; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.core.data.ColumnDefine; import org.apache.skywalking.apm.collector.core.data.TableDefine; +import org.apache.skywalking.apm.collector.storage.StorageException; +import org.apache.skywalking.apm.collector.storage.StorageInstallException; import org.apache.skywalking.apm.collector.storage.StorageInstaller; +import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.IndexNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +45,12 @@ public class ElasticSearchStorageInstaller extends StorageInstaller { private final int indexShardsNumber; private final int indexReplicasNumber; - private final boolean isHighPerformanceMode; public ElasticSearchStorageInstaller(int indexShardsNumber, int indexReplicasNumber, boolean isHighPerformanceMode) { + super(isHighPerformanceMode); this.indexShardsNumber = indexShardsNumber; this.indexReplicasNumber = indexReplicasNumber; - this.isHighPerformanceMode = isHighPerformanceMode; } @Override protected void defineFilter(List tableDefines) { @@ -59,7 +62,26 @@ public class ElasticSearchStorageInstaller extends StorageInstaller { } } - @Override protected boolean createTable(Client client, TableDefine tableDefine) { + @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException { + ElasticSearchClient esClient = (ElasticSearchClient)client; + ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine; + + for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) { + GetFieldMappingsResponse.FieldMappingMetaData metaData = esClient.prepareGetMappings(esTableDefine.getName(), columnDefine.getColumnName().getName()); + + if (Objects.nonNull(metaData)) { + Map field = (Map)metaData.sourceAsMap().get(columnDefine.getColumnName().getName()); + if (!columnDefine.getType().toLowerCase().equals(field.get("type"))) { + throw new StorageInstallException("Field named " + columnDefine.getColumnName().getName() + "'s type not match the definition. Expect: " + + columnDefine.getType().toLowerCase() + ", actual: " + field.get("type")); + } + } else { + throw new StorageInstallException("Field named " + columnDefine.getColumnName().getName() + " in " + tableDefine.getName() + " index not found."); + } + } + } + + @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException { ElasticSearchClient esClient = (ElasticSearchClient)client; ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine; @@ -76,7 +98,10 @@ public class ElasticSearchStorageInstaller extends StorageInstaller { boolean isAcknowledged = esClient.createIndex(esTableDefine.getName(), esTableDefine.type(), settings, mappingBuilder); logger.info("create {} index with type of {} finished, isAcknowledged: {}", esTableDefine.getName(), esTableDefine.type(), isAcknowledged); - return isAcknowledged; + + if (!isAcknowledged) { + throw new StorageInstallException("create " + esTableDefine.getName() + " index failure, "); + } } private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) { @@ -96,9 +121,6 @@ public class ElasticSearchStorageInstaller extends StorageInstaller { for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) { ElasticSearchColumnDefine elasticSearchColumnDefine = (ElasticSearchColumnDefine)columnDefine; - if (isHighPerformanceMode) { - elasticSearchColumnDefine.getColumnName().useShortName(); - } if (ElasticSearchColumnDefine.Type.Text.name().toLowerCase().equals(elasticSearchColumnDefine.getType().toLowerCase())) { mappingBuilder @@ -121,14 +143,12 @@ public class ElasticSearchStorageInstaller extends StorageInstaller { return mappingBuilder; } - @Override protected boolean deleteTable(Client client, TableDefine tableDefine) { + @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException { ElasticSearchClient esClient = (ElasticSearchClient)client; - try { - return esClient.deleteIndex(tableDefine.getName()); - } catch (IndexNotFoundException e) { - logger.info("{} index not found", tableDefine.getName()); + + if (!esClient.deleteIndex(tableDefine.getName())) { + throw new StorageInstallException(tableDefine.getName() + " index delete failure."); } - return false; } @Override protected boolean isExists(Client client, TableDefine tableDefine) { diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java index fa5b91279..63d7e0901 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java @@ -280,7 +280,7 @@ public class StorageModuleH2Provider extends ModuleProvider { try { h2Client.initialize(); - H2StorageInstaller installer = new H2StorageInstaller(); + H2StorageInstaller installer = new H2StorageInstaller(false); installer.install(h2Client); } catch (H2ClientException | StorageException e) { logger.error(e.getMessage(), e); diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java index 21adca86a..1e8fcd402 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java @@ -38,6 +38,10 @@ public class H2StorageInstaller extends StorageInstaller { private final Logger logger = LoggerFactory.getLogger(H2StorageInstaller.class); + public H2StorageInstaller(boolean isHighPerformanceMode) { + super(isHighPerformanceMode); + } + @Override protected void defineFilter(List tableDefines) { int size = tableDefines.size(); for (int i = size - 1; i >= 0; i--) { @@ -70,17 +74,20 @@ public class H2StorageInstaller extends StorageInstaller { return false; } - @Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException { + @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException { + + } + + @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException { H2Client h2Client = (H2Client)client; try { h2Client.execute("drop table if exists " + tableDefine.getName()); - return true; } catch (H2ClientException e) { throw new StorageInstallException(e.getMessage(), e); } } - @Override protected boolean createTable(Client client, TableDefine tableDefine) throws StorageException { + @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException { H2Client h2Client = (H2Client)client; H2TableDefine h2TableDefine = (H2TableDefine)tableDefine; @@ -104,6 +111,5 @@ public class H2StorageInstaller extends StorageInstaller { } catch (H2ClientException e) { throw new StorageInstallException(e.getMessage(), e); } - return true; } } -- GitLab