提交 e7361025 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Fixed #1090 (#1101)

* Fixed #1090

1. Move setting the performance from create table action to install action.
2. Add check step that avoid filed type not match the definition when project upgraded.

* Setting namespace by class construct.
上级 bc44fa36
......@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.collector.core.module.ApplicationConfiguration;
import org.apache.skywalking.apm.collector.core.module.ModuleConfigException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.apache.skywalking.apm.collector.core.module.ModuleStartException;
import org.apache.skywalking.apm.collector.core.module.ProviderNotFoundException;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.slf4j.Logger;
......@@ -42,7 +43,7 @@ public class CollectorBootStartUp {
try {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
} catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException e) {
} catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException | ModuleStartException e) {
logger.error(e.getMessage(), e);
System.exit(1);
}
......
......@@ -25,10 +25,13 @@ import java.util.List;
import java.util.function.Consumer;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.core.data.CommonTable;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
......@@ -62,14 +65,22 @@ public class ElasticSearchClient implements Client {
private final String clusterNodes;
private boolean ready = false;
private String namespace;
private final String namespace;
public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer,
String clusterNodes) {
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
this.namespace = Const.EMPTY_STRING;
}
public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer,
String clusterNodes, String namespace) {
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
this.namespace = namespace;
}
@Override
......@@ -89,8 +100,6 @@ public class ElasticSearchClient implements Client {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
this.ready = true;
}
@Override
......@@ -111,14 +120,6 @@ public class ElasticSearchClient implements Client {
return pairsList;
}
public void setNamespace(String namespace) throws ElasticSearchClientException {
if (!ready) {
this.namespace = namespace;
} else {
throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready.");
}
}
class AddressPairs {
private String host;
private Integer port;
......@@ -130,10 +131,6 @@ public class ElasticSearchClient implements Client {
}
public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
IndicesAdminClient adminClient = client.admin().indices();
indexName = formatIndexName(indexName);
CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
......@@ -142,10 +139,6 @@ public class ElasticSearchClient implements Client {
}
public boolean deleteIndex(String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
......@@ -154,10 +147,6 @@ public class ElasticSearchClient implements Client {
}
public boolean isExistsIndex(String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
......@@ -165,60 +154,42 @@ public class ElasticSearchClient implements Client {
}
public SearchRequestBuilder prepareSearch(String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, CommonTable.TABLE_TYPE, id);
}
public GetFieldMappingsResponse.FieldMappingMetaData prepareGetMappings(String indexName, String fieldName) {
indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, "type", id);
GetFieldMappingsResponse response = client.admin().indices().prepareGetFieldMappings(indexName).setFields(fieldName).get();
return response.fieldMappings(indexName, CommonTable.TABLE_TYPE, fieldName);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareUpdate(indexName, "type", id);
return client.prepareUpdate(indexName, CommonTable.TABLE_TYPE, id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return client.prepareGet(indexName, "type", id);
return client.prepareGet(indexName, CommonTable.TABLE_TYPE, id);
}
public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
indexName = formatIndexName(indexName);
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
}
public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
if (!ready) {
throw new ElasticSearchClientNotReadyException();
}
MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
rowHandler.setPrepareMultiGet(prepareMultiGet);
rowHandler.setNamespace(namespace);
rows.forEach(row -> rowHandler.accept(row));
rows.forEach(rowHandler::accept);
return rowHandler.getPrepareMultiGet();
}
......@@ -255,7 +226,7 @@ public class ElasticSearchClient implements Client {
private static String formatIndexName(String namespace, String indexName) {
if (StringUtils.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
return namespace + Const.ID_SPLIT + indexName;
}
return indexName;
}
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.client.elasticsearch;
import org.apache.skywalking.apm.collector.client.ClientException;
......@@ -24,12 +23,8 @@ import org.apache.skywalking.apm.collector.client.ClientException;
/**
* @author peng-yongsheng
*/
public class ElasticSearchClientException extends ClientException {
public ElasticSearchClientException(String message) {
super(message);
}
public ElasticSearchClientException(String message, Throwable cause) {
class ElasticSearchClientException extends ClientException {
ElasticSearchClientException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -43,7 +43,8 @@ class BootstrapFlow {
}
@SuppressWarnings("unchecked")
void start(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException {
void start(
ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
String[] requiredModules = provider.requiredModules();
if (requiredModules != null) {
......
......@@ -37,7 +37,7 @@ public class ModuleManager {
* Init the given modules
*/
public void init(
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException {
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
String[] moduleNames = applicationConfiguration.moduleList();
ServiceLoader<Module> moduleServiceLoader = ServiceLoader.load(Module.class);
LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
......
......@@ -71,7 +71,7 @@ public abstract class ModuleProvider {
/**
* In start stage, the module has been ready for interop.
*/
public abstract void start() throws ServiceNotProvidedException;
public abstract void start() throws ServiceNotProvidedException, ModuleStartException;
/**
* This callback executes after all modules start up successfully.
......
......@@ -16,14 +16,14 @@
*
*/
package org.apache.skywalking.apm.collector.client.elasticsearch;
package org.apache.skywalking.apm.collector.core.module;
/**
* @author zhang xin
* @author peng-yongsheng
*/
public class ElasticSearchClientNotReadyException extends RuntimeException {
public ElasticSearchClientNotReadyException() {
super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient.");
}
public class ModuleStartException extends Exception {
public ModuleStartException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
import java.util.Properties;
......@@ -28,7 +27,7 @@ import org.junit.Test;
*/
public class ModuleManagerTest {
@Test
public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException {
public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException, ModuleStartException {
ApplicationConfiguration configuration = new ApplicationConfiguration();
configuration.addModule("Test").addProviderConfiguration("TestModule-Provider", new Properties());
configuration.addModule("BaseA").addProviderConfiguration("P-A", new Properties());
......
......@@ -16,14 +16,13 @@
*
*/
package org.apache.skywalking.apm.collector.storage;
import java.util.List;
import org.apache.skywalking.apm.collector.core.data.StorageDefineLoader;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.core.define.DefineException;
import org.apache.skywalking.apm.collector.core.data.StorageDefineLoader;
import org.apache.skywalking.apm.collector.core.data.TableDefine;
import org.apache.skywalking.apm.collector.core.define.DefineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,6 +33,12 @@ public abstract class StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(StorageInstaller.class);
private final boolean isHighPerformanceMode;
public StorageInstaller(boolean isHighPerformanceMode) {
this.isHighPerformanceMode = isHighPerformanceMode;
}
public final void install(Client client) throws StorageException {
StorageDefineLoader defineLoader = new StorageDefineLoader();
try {
......@@ -43,6 +48,7 @@ public abstract class StorageInstaller {
for (TableDefine tableDefine : tableDefines) {
tableDefine.initialize();
settingHighPerformance(tableDefine);
if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName());
createTable(client, tableDefine);
......@@ -51,17 +57,28 @@ public abstract class StorageInstaller {
deleteTable(client, tableDefine);
createTable(client, tableDefine);
}
columnCheck(client, tableDefine);
}
} catch (DefineException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
private void settingHighPerformance(TableDefine tableDefine) {
tableDefine.getColumnDefines().forEach(column -> {
if (isHighPerformanceMode) {
column.getColumnName().useShortName();
}
});
}
protected abstract void defineFilter(List<TableDefine> tableDefines);
protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException;
protected abstract void columnCheck(Client client, TableDefine tableDefine) throws StorageException;
protected abstract void deleteTable(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean createTable(Client client, TableDefine tableDefine) throws StorageException;
protected abstract void createTable(Client client, TableDefine tableDefine) throws StorageException;
}
......@@ -29,6 +29,7 @@ import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfi
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ModuleStartException;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.storage.StorageException;
......@@ -242,16 +243,12 @@ import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceAlarmEsUIDAO
import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceNameServiceEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceReferenceEsMetricUIDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class StorageModuleEsProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
static final String NAME = "elasticsearch";
private final StorageModuleEsConfig config;
private ElasticSearchClient elasticSearchClient;
......@@ -275,8 +272,6 @@ public class StorageModuleEsProvider extends ModuleProvider {
}
@Override public void prepare() throws ServiceNotProvidedException {
elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes());
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
registerRegisterDAO();
......@@ -286,17 +281,16 @@ public class StorageModuleEsProvider extends ModuleProvider {
}
@Override
public void start() {
public void start() throws ModuleStartException {
try {
String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
elasticSearchClient.setNamespace(namespace);
elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), namespace);
elasticSearchClient.initialize();
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(config.getIndexShardsNumber(), config.getIndexReplicasNumber(), config.isHighPerformanceMode());
installer.install(elasticSearchClient);
} catch (ClientException | StorageException e) {
logger.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e);
}
String uuId = UUID.randomUUID().toString();
......
......@@ -20,15 +20,19 @@ package org.apache.skywalking.apm.collector.storage.es.base.define;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.ColumnDefine;
import org.apache.skywalking.apm.collector.core.data.TableDefine;
import org.apache.skywalking.apm.collector.storage.StorageException;
import org.apache.skywalking.apm.collector.storage.StorageInstallException;
import org.apache.skywalking.apm.collector.storage.StorageInstaller;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -41,13 +45,12 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
private final int indexShardsNumber;
private final int indexReplicasNumber;
private final boolean isHighPerformanceMode;
public ElasticSearchStorageInstaller(int indexShardsNumber, int indexReplicasNumber,
boolean isHighPerformanceMode) {
super(isHighPerformanceMode);
this.indexShardsNumber = indexShardsNumber;
this.indexReplicasNumber = indexReplicasNumber;
this.isHighPerformanceMode = isHighPerformanceMode;
}
@Override protected void defineFilter(List<TableDefine> tableDefines) {
......@@ -59,7 +62,26 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
}
}
@Override protected boolean createTable(Client client, TableDefine tableDefine) {
@Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
GetFieldMappingsResponse.FieldMappingMetaData metaData = esClient.prepareGetMappings(esTableDefine.getName(), columnDefine.getColumnName().getName());
if (Objects.nonNull(metaData)) {
Map field = (Map)metaData.sourceAsMap().get(columnDefine.getColumnName().getName());
if (!columnDefine.getType().toLowerCase().equals(field.get("type"))) {
throw new StorageInstallException("Field named " + columnDefine.getColumnName().getName() + "'s type not match the definition. Expect: "
+ columnDefine.getType().toLowerCase() + ", actual: " + field.get("type"));
}
} else {
throw new StorageInstallException("Field named " + columnDefine.getColumnName().getName() + " in " + tableDefine.getName() + " index not found.");
}
}
}
@Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
......@@ -76,7 +98,10 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
boolean isAcknowledged = esClient.createIndex(esTableDefine.getName(), esTableDefine.type(), settings, mappingBuilder);
logger.info("create {} index with type of {} finished, isAcknowledged: {}", esTableDefine.getName(), esTableDefine.type(), isAcknowledged);
return isAcknowledged;
if (!isAcknowledged) {
throw new StorageInstallException("create " + esTableDefine.getName() + " index failure, ");
}
}
private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
......@@ -96,9 +121,6 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
ElasticSearchColumnDefine elasticSearchColumnDefine = (ElasticSearchColumnDefine)columnDefine;
if (isHighPerformanceMode) {
elasticSearchColumnDefine.getColumnName().useShortName();
}
if (ElasticSearchColumnDefine.Type.Text.name().toLowerCase().equals(elasticSearchColumnDefine.getType().toLowerCase())) {
mappingBuilder
......@@ -121,14 +143,12 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
return mappingBuilder;
}
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) {
@Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
try {
return esClient.deleteIndex(tableDefine.getName());
} catch (IndexNotFoundException e) {
logger.info("{} index not found", tableDefine.getName());
if (!esClient.deleteIndex(tableDefine.getName())) {
throw new StorageInstallException(tableDefine.getName() + " index delete failure.");
}
return false;
}
@Override protected boolean isExists(Client client, TableDefine tableDefine) {
......
......@@ -280,7 +280,7 @@ public class StorageModuleH2Provider extends ModuleProvider {
try {
h2Client.initialize();
H2StorageInstaller installer = new H2StorageInstaller();
H2StorageInstaller installer = new H2StorageInstaller(false);
installer.install(h2Client);
} catch (H2ClientException | StorageException e) {
logger.error(e.getMessage(), e);
......
......@@ -38,6 +38,10 @@ public class H2StorageInstaller extends StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(H2StorageInstaller.class);
public H2StorageInstaller(boolean isHighPerformanceMode) {
super(isHighPerformanceMode);
}
@Override protected void defineFilter(List<TableDefine> tableDefines) {
int size = tableDefines.size();
for (int i = size - 1; i >= 0; i--) {
......@@ -70,17 +74,20 @@ public class H2StorageInstaller extends StorageInstaller {
return false;
}
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException {
@Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
}
@Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
H2Client h2Client = (H2Client)client;
try {
h2Client.execute("drop table if exists " + tableDefine.getName());
return true;
} catch (H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
@Override protected boolean createTable(Client client, TableDefine tableDefine) throws StorageException {
@Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
H2Client h2Client = (H2Client)client;
H2TableDefine h2TableDefine = (H2TableDefine)tableDefine;
......@@ -104,6 +111,5 @@ public class H2StorageInstaller extends StorageInstaller {
} catch (H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
}
return true;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册