提交 6e5789c3 编写于 作者: P peng-yongsheng

Add elasticsearch and h2 storage provider implement

上级 0d8e7f98
......@@ -27,13 +27,13 @@ ui:
# grpc:
# host: localhost
# port: 11800
#storage:
# elasticsearch:
# cluster_name: CollectorDBCluster
# cluster_transport_sniffer: true
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
#storage:
# h2:
# url: jdbc:h2:tcp://localhost/~/test
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-component</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.3-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>persistence-component</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
......@@ -15,7 +15,6 @@
<module>client-component</module>
<module>server-component</module>
<module>queue-component</module>
<module>persistence-component</module>
<module>stream-component</module>
<module>remote-component</module>
</modules>
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.define;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author peng-yongsheng
*/
public abstract class DefineException extends CollectorException {
public DefineException(String message) {
super(message);
}
public DefineException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.define;
/**
* @author peng-yongsheng
*/
public abstract class DefinitionFile {
private static final String CATALOG = "META-INF/defines/";
protected abstract String fileName();
public final String get() {
return CATALOG + fileName();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.define;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DefinitionLoader<D> implements Iterable<D> {
private final Logger logger = LoggerFactory.getLogger(DefinitionLoader.class);
private final Class<D> definition;
private final DefinitionFile definitionFile;
protected DefinitionLoader(Class<D> svc, DefinitionFile definitionFile) {
this.definition = Objects.requireNonNull(svc, "definition interface cannot be null");
this.definitionFile = definitionFile;
}
public static <D> DefinitionLoader<D> load(Class<D> definition, DefinitionFile definitionFile) {
return new DefinitionLoader(definition, definitionFile);
}
@Override public final Iterator<D> iterator() {
logger.info("load definition file: {}", definitionFile.get());
List<String> definitionList = new LinkedList<>();
try {
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources(definitionFile.get());
while (urlEnumeration.hasMoreElements()) {
URL definitionFileURL = urlEnumeration.nextElement();
logger.info("definition file url: {}", definitionFileURL.getPath());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
Properties properties = new Properties();
properties.load(bufferedReader);
Enumeration defineItem = properties.propertyNames();
while (defineItem.hasMoreElements()) {
String fullNameClass = (String)defineItem.nextElement();
definitionList.add(fullNameClass);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Iterator<String> moduleDefineIterator = definitionList.iterator();
return new Iterator<D>() {
@Override public boolean hasNext() {
return moduleDefineIterator.hasNext();
}
@Override public D next() {
String definitionClass = moduleDefineIterator.next();
logger.info("definitionClass: {}", definitionClass);
try {
Class c = Class.forName(definitionClass);
return (D)c.newInstance();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.define;
/**
* @author peng-yongsheng
*/
public interface Loader<T> {
T load() throws DefineException;
}
package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author peng-yongsheng
*/
public abstract class StorageException extends CollectorException {
public StorageException(String message) {
super(message);
}
public StorageException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage;
/**
* @author peng-yongsheng
*/
public class StorageInstallException extends StorageException {
public StorageInstallException(String message) {
super(message);
}
public StorageInstallException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage;
import java.util.List;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.storage.define.StorageDefineLoader;
import org.skywalking.apm.collector.storage.define.TableDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(StorageInstaller.class);
public final void install(Client client) throws StorageException {
StorageDefineLoader defineLoader = new StorageDefineLoader();
try {
List<TableDefine> tableDefines = defineLoader.load();
defineFilter(tableDefines);
Boolean debug = System.getProperty("debug") != null;
for (TableDefine tableDefine : tableDefines) {
tableDefine.initialize();
if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName());
createTable(client, tableDefine);
} else if (debug) {
logger.info("table: {} exists", tableDefine.getName());
deleteTable(client, tableDefine);
createTable(client, tableDefine);
}
}
} catch (DefineException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
protected abstract void defineFilter(List<TableDefine> tableDefines);
protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException;
protected abstract boolean createTable(Client client, TableDefine tableDefine) throws StorageException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.client.Client;
/**
* @author peng-yongsheng
*/
public abstract class DAO<C extends Client> {
private C client;
public final C getClient() {
return client;
}
public final void setClient(C client) {
this.client = client;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
*/
public enum DAOContainer {
INSTANCE;
private Map<String, DAO> daos = new HashMap<>();
public void put(String interfaceName, DAO dao) {
daos.put(interfaceName, dao);
}
public DAO get(String interfaceName) {
return daos.get(interfaceName);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import java.util.List;
/**
* @author peng-yongsheng
*/
public interface IBatchDAO {
void batchPersistence(List<?> batchCollection);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.define;
/**
* @author peng-yongsheng
*/
public abstract class ColumnDefine {
private final String name;
private final String type;
public ColumnDefine(String name, String type) {
this.name = name;
this.type = type;
}
public final String getName() {
return name;
}
public String getType() {
return type;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.define;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.define.DefinitionLoader;
import org.skywalking.apm.collector.core.define.Loader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class StorageDefineLoader implements Loader<List<TableDefine>> {
private final Logger logger = LoggerFactory.getLogger(StorageDefineLoader.class);
@Override public List<TableDefine> load() throws DefineException {
List<TableDefine> tableDefines = new LinkedList<>();
StorageDefinitionFile definitionFile = new StorageDefinitionFile();
logger.info("storage definition file name: {}", definitionFile.fileName());
DefinitionLoader<TableDefine> definitionLoader = DefinitionLoader.load(TableDefine.class, definitionFile);
for (TableDefine tableDefine : definitionLoader) {
logger.info("loaded storage definition class: {}", tableDefine.getClass().getName());
tableDefines.add(tableDefine);
}
return tableDefines;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.define;
import org.skywalking.apm.collector.core.define.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class StorageDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "storage.define";
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.define;
import java.util.LinkedList;
import java.util.List;
/**
* @author peng-yongsheng
*/
public abstract class TableDefine {
private final String name;
private final List<ColumnDefine> columnDefines;
public TableDefine(String name) {
this.name = name;
this.columnDefines = new LinkedList<>();
}
public abstract void initialize();
public final void addColumn(ColumnDefine columnDefine) {
columnDefines.add(columnDefine);
}
public final String getName() {
return name;
}
public final List<ColumnDefine> getColumnDefines() {
return columnDefines;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.sql;
import java.text.MessageFormat;
import java.util.List;
import java.util.Set;
public class SqlBuilder {
public static String buildSql(String sql, Object... args) {
return MessageFormat.format(sql, args);
}
public static String buildSql(String sql, List<Object> args) {
MessageFormat messageFormat = new MessageFormat(sql);
return messageFormat.format(args.toArray(new Object[0]));
}
public static String buildBatchInsertSql(String tableName, Set<String> columnNames) {
StringBuilder sb = new StringBuilder("insert into ");
sb.append(tableName).append("(");
columnNames.forEach((columnName) -> sb.append(columnName).append(","));
sb.delete(sb.length() - 1, sb.length());
sb.append(") values(");
for (int i = 0; i < columnNames.size(); i++) {
sb.append("?,");
}
sb.delete(sb.length() - 1, sb.length());
sb.append(")");
return sb.toString();
}
public static String buildBatchUpdateSql(String tableName, Set<String> columnNames, String whereClauseName) {
StringBuilder sb = new StringBuilder("update ");
sb.append(tableName).append(" set ");
columnNames.forEach((columnName) -> sb.append(columnName).append("=?,"));
sb.delete(sb.length() - 1, sb.length());
sb.append(" where ").append(whereClauseName).append("=?");
return sb.toString();
}
}
......@@ -18,17 +18,38 @@
package org.skywalking.apm.collector.storage.es;
import java.util.List;
import java.util.Properties;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.es.dao.EsDAO;
import org.skywalking.apm.collector.storage.es.dao.EsDAODefineLoader;
import org.skywalking.apm.collector.storage.es.define.ElasticSearchStorageInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class StorageModuleEsProvider extends ModuleProvider {
private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
private ElasticSearchClient elasticSearchClient;
@Override public String name() {
return "elasticsearch";
}
......@@ -38,11 +59,31 @@ public class StorageModuleEsProvider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
String clusterName = config.getProperty(CLUSTER_NAME);
Boolean clusterTransportSniffer = (Boolean)config.get(CLUSTER_TRANSPORT_SNIFFER);
String clusterNodes = config.getProperty(CLUSTER_NODES);
elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
Integer indexShardsNumber = (Integer)config.get(INDEX_SHARDS_NUMBER);
Integer indexReplicasNumber = (Integer)config.get(INDEX_REPLICAS_NUMBER);
try {
elasticSearchClient.initialize();
EsDAODefineLoader loader = new EsDAODefineLoader();
List<EsDAO> esDAOs = loader.load();
esDAOs.forEach(esDAO -> {
esDAO.setClient(elasticSearchClient);
String interFaceName = esDAO.getClass().getInterfaces()[0].getName();
DAOContainer.INSTANCE.put(interFaceName, esDAO);
});
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);
installer.install(elasticSearchClient);
} catch (ClientException | DefineException | StorageException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.dao;
import java.util.List;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class BatchEsDAO extends EsDAO implements IBatchDAO {
private final Logger logger = LoggerFactory.getLogger(BatchEsDAO.class);
@Override public void batchPersistence(List<?> batchCollection) {
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
logger.debug("bulk data size: {}", batchCollection.size());
if (CollectionUtils.isNotEmpty(batchCollection)) {
for (int i = 0; i < batchCollection.size(); i++) {
Object builder = batchCollection.get(i);
if (builder instanceof IndexRequestBuilder) {
bulkRequest.add((IndexRequestBuilder)builder);
}
if (builder instanceof UpdateRequestBuilder) {
bulkRequest.add((UpdateRequestBuilder)builder);
}
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error(bulkResponse.buildFailureMessage());
}
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.dao;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.DAO;
/**
* @author peng-yongsheng
*/
public abstract class EsDAO extends DAO<ElasticSearchClient> {
public final int getMaxId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field(columnName);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Max agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
public final int getMinId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field(columnName);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Min agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.dao;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.define.DefinitionLoader;
import org.skywalking.apm.collector.core.define.Loader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class EsDAODefineLoader implements Loader<List<EsDAO>> {
private final Logger logger = LoggerFactory.getLogger(EsDAODefineLoader.class);
@Override public List<EsDAO> load() throws DefineException {
List<EsDAO> esDAOs = new ArrayList<>();
EsDAODefinitionFile definitionFile = new EsDAODefinitionFile();
logger.info("elasticsearch dao definition file name: {}", definitionFile.fileName());
DefinitionLoader<EsDAO> definitionLoader = DefinitionLoader.load(EsDAO.class, definitionFile);
for (EsDAO dao : definitionLoader) {
logger.info("loaded elasticsearch dao definition class: {}", dao.getClass().getName());
esDAOs.add(dao);
}
return esDAOs;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.dao;
import org.skywalking.apm.collector.core.define.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class EsDAODefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "es_dao.define";
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.define;
import org.skywalking.apm.collector.storage.define.ColumnDefine;
/**
* @author peng-yongsheng
*/
public class ElasticSearchColumnDefine extends ColumnDefine {
public ElasticSearchColumnDefine(String name, String type) {
super(name, type);
}
public enum Type {
Binary, Boolean, Keyword, Long, Integer, Double, Text
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.define;
import java.io.IOException;
import java.util.List;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.StorageInstaller;
import org.skywalking.apm.collector.storage.define.ColumnDefine;
import org.skywalking.apm.collector.storage.define.TableDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ElasticSearchStorageInstaller extends StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(ElasticSearchStorageInstaller.class);
private final int indexShardsNumber;
private final int indexReplicasNumber;
public ElasticSearchStorageInstaller(int indexShardsNumber, int indexReplicasNumber) {
this.indexShardsNumber = indexShardsNumber;
this.indexReplicasNumber = indexReplicasNumber;
}
@Override protected void defineFilter(List<TableDefine> tableDefines) {
int size = tableDefines.size();
for (int i = size - 1; i >= 0; i--) {
if (!(tableDefines.get(i) instanceof ElasticSearchTableDefine)) {
tableDefines.remove(i);
}
}
}
@Override protected boolean createTable(Client client, TableDefine tableDefine) {
ElasticSearchClient esClient = (ElasticSearchClient)client;
ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
// mapping
XContentBuilder mappingBuilder = null;
Settings settings = createSettingBuilder(esTableDefine);
try {
mappingBuilder = createMappingBuilder(esTableDefine);
logger.info("mapping builder str: {}", mappingBuilder.string());
} catch (Exception e) {
logger.error("create {} index mapping builder error", esTableDefine.getName());
}
boolean isAcknowledged = esClient.createIndex(esTableDefine.getName(), esTableDefine.type(), settings, mappingBuilder);
logger.info("create {} index with type of {} finished, isAcknowledged: {}", esTableDefine.getName(), esTableDefine.type(), isAcknowledged);
return isAcknowledged;
}
private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
return Settings.builder()
.put("index.number_of_shards", indexShardsNumber)
.put("index.number_of_replicas", indexReplicasNumber)
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")
.put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer")
.put("analysis.tokenizer.collector_tokenizer.type", "standard")
.put("analysis.tokenizer.collector_tokenizer.max_token_length", 5)
.build();
}
private XContentBuilder createMappingBuilder(ElasticSearchTableDefine tableDefine) throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties");
for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
ElasticSearchColumnDefine elasticSearchColumnDefine = (ElasticSearchColumnDefine)columnDefine;
if (ElasticSearchColumnDefine.Type.Text.name().toLowerCase().equals(elasticSearchColumnDefine.getType().toLowerCase())) {
mappingBuilder
.startObject(elasticSearchColumnDefine.getName())
.field("type", elasticSearchColumnDefine.getType().toLowerCase())
.field("fielddata", true)
.endObject();
} else {
mappingBuilder
.startObject(elasticSearchColumnDefine.getName())
.field("type", elasticSearchColumnDefine.getType().toLowerCase())
.endObject();
}
}
mappingBuilder
.endObject()
.endObject();
logger.debug("create elasticsearch index: {}", mappingBuilder.string());
return mappingBuilder;
}
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) {
ElasticSearchClient esClient = (ElasticSearchClient)client;
try {
return esClient.deleteIndex(tableDefine.getName());
} catch (IndexNotFoundException e) {
logger.info("{} index not found", tableDefine.getName());
}
return false;
}
@Override protected boolean isExists(Client client, TableDefine tableDefine) {
ElasticSearchClient esClient = (ElasticSearchClient)client;
return esClient.isExistsIndex(tableDefine.getName());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.define;
import org.skywalking.apm.collector.storage.define.TableDefine;
/**
* @author peng-yongsheng
*/
public abstract class ElasticSearchTableDefine extends TableDefine {
public ElasticSearchTableDefine(String name) {
super(name);
}
public final String type() {
return "type";
}
public abstract int refreshInterval();
}
......@@ -18,17 +18,36 @@
package org.skywalking.apm.collector.storage.h2;
import java.util.List;
import java.util.Properties;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.dao.H2DAODefineLoader;
import org.skywalking.apm.collector.storage.h2.define.H2StorageInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class StorageModuleH2Provider extends ModuleProvider {
private final Logger logger = LoggerFactory.getLogger(StorageModuleH2Provider.class);
private static final String URL = "url";
private static final String USER_NAME = "user_name";
private static final String PASSWORD = "password";
private H2Client client;
@Override public String name() {
return "h2";
}
......@@ -38,11 +57,29 @@ public class StorageModuleH2Provider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
String url = config.getProperty(URL);
String userName = config.getProperty(USER_NAME);
String password = config.getProperty(PASSWORD);
client = new H2Client(url, userName, password);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
try {
client.initialize();
H2DAODefineLoader loader = new H2DAODefineLoader();
List<H2DAO> h2DAOs = loader.load();
h2DAOs.forEach(h2DAO -> {
h2DAO.setClient(client);
String interFaceName = h2DAO.getClass().getInterfaces()[0].getName();
DAOContainer.INSTANCE.put(interFaceName, h2DAO);
});
H2StorageInstaller installer = new H2StorageInstaller();
installer.install(client);
} catch (H2ClientException | DefineException | StorageException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.dao;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.h2.define.H2SqlEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class BatchH2DAO extends H2DAO implements IBatchDAO {
private final Logger logger = LoggerFactory.getLogger(BatchH2DAO.class);
@Override
public void batchPersistence(List<?> batchCollection) {
if (batchCollection != null && batchCollection.size() > 0) {
logger.debug("the batch collection size is {}", batchCollection.size());
Connection conn;
final Map<String, PreparedStatement> batchSqls = new HashMap<>();
try {
conn = getClient().getConnection();
conn.setAutoCommit(true);
PreparedStatement ps;
for (Object entity : batchCollection) {
H2SqlEntity e = getH2SqlEntity(entity);
String sql = e.getSql();
if (batchSqls.containsKey(sql)) {
ps = batchSqls.get(sql);
} else {
ps = conn.prepareStatement(sql);
batchSqls.put(sql, ps);
}
Object[] params = e.getParams();
if (params != null) {
logger.debug("the sql is {}, params size is {}, params: {}", e.getSql(), params.length, params);
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ps.addBatch();
}
for (String k : batchSqls.keySet()) {
batchSqls.get(k).executeBatch();
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
batchSqls.clear();
}
}
private H2SqlEntity getH2SqlEntity(Object entity) {
if (entity instanceof H2SqlEntity) {
return (H2SqlEntity)entity;
}
return null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.dao.DAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class H2DAO extends DAO<H2Client> {
private final Logger logger = LoggerFactory.getLogger(H2DAO.class);
protected final int getMaxId(String tableName, String columnName) {
String sql = "select max(" + columnName + ") from " + tableName;
return getIntValueBySQL(sql);
}
protected final int getMinId(String tableName, String columnName) {
String sql = "select min(" + columnName + ") from " + tableName;
return getIntValueBySQL(sql);
}
private int getIntValueBySQL(String sql) {
H2Client client = getClient();
try (ResultSet rs = client.executeQuery(sql, null)) {
if (rs.next()) {
int id = rs.getInt(1);
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return 0;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.dao;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.define.DefineException;
import org.skywalking.apm.collector.core.define.DefinitionLoader;
import org.skywalking.apm.collector.core.define.Loader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class H2DAODefineLoader implements Loader<List<H2DAO>> {
private final Logger logger = LoggerFactory.getLogger(H2DAODefineLoader.class);
@Override public List<H2DAO> load() throws DefineException {
List<H2DAO> h2DAOs = new ArrayList<>();
H2DAODefinitionFile definitionFile = new H2DAODefinitionFile();
logger.info("h2 dao definition file name: {}", definitionFile.fileName());
DefinitionLoader<H2DAO> definitionLoader = DefinitionLoader.load(H2DAO.class, definitionFile);
for (H2DAO dao : definitionLoader) {
logger.info("loaded h2 dao definition class: {}", dao.getClass().getName());
h2DAOs.add(dao);
}
return h2DAOs;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.dao;
import org.skywalking.apm.collector.core.define.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class H2DAODefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "h2_dao.define";
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.define;
import org.skywalking.apm.collector.storage.define.ColumnDefine;
/**
* @author peng-yongsheng
*/
public class H2ColumnDefine extends ColumnDefine {
public H2ColumnDefine(String name, String type) {
super(name, type);
}
public enum Type {
Boolean, Varchar, Int, Bigint, BINARY, Double
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.define;
/**
* @author clevertension
*/
public class H2SqlEntity {
private String sql;
private Object[] params;
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.define;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.StorageException;
import org.skywalking.apm.collector.storage.StorageInstallException;
import org.skywalking.apm.collector.storage.StorageInstaller;
import org.skywalking.apm.collector.storage.define.TableDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
/**
* @author peng-yongsheng
*/
public class H2StorageInstaller extends StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(H2StorageInstaller.class);
@Override protected void defineFilter(List<TableDefine> tableDefines) {
int size = tableDefines.size();
for (int i = size - 1; i >= 0; i--) {
if (!(tableDefines.get(i) instanceof H2TableDefine)) {
tableDefines.remove(i);
}
}
}
@Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
H2Client h2Client = (H2Client)client;
ResultSet rs = null;
try {
logger.info("check if table {} exist ", tableDefine.getName());
rs = h2Client.getConnection().getMetaData().getTables(null, null, tableDefine.getName().toUpperCase(), null);
if (rs.next()) {
return true;
}
} catch (SQLException | H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
} finally {
try {
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
return false;
}
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException {
H2Client h2Client = (H2Client)client;
try {
h2Client.execute("drop table if exists " + tableDefine.getName());
return true;
} catch (H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
@Override protected boolean createTable(Client client, TableDefine tableDefine) throws StorageException {
H2Client h2Client = (H2Client)client;
H2TableDefine h2TableDefine = (H2TableDefine)tableDefine;
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("CREATE TABLE ").append(h2TableDefine.getName()).append(" (");
h2TableDefine.getColumnDefines().forEach(columnDefine -> {
H2ColumnDefine h2ColumnDefine = (H2ColumnDefine)columnDefine;
if (h2ColumnDefine.getType().equals(H2ColumnDefine.Type.Varchar.name())) {
sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append("(255),");
} else {
sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append(",");
}
});
//remove last comma
sqlBuilder.delete(sqlBuilder.length() - 1, sqlBuilder.length());
sqlBuilder.append(")");
try {
logger.info("create h2 table with sql {}", sqlBuilder);
h2Client.execute(sqlBuilder.toString());
} catch (H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
}
return true;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.define;
import org.skywalking.apm.collector.storage.define.TableDefine;
/**
* @author peng-yongsheng
*/
public abstract class H2TableDefine extends TableDefine {
public H2TableDefine(String name) {
super(name);
}
}
......@@ -25,7 +25,7 @@
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>persistence-component</artifactId>
<artifactId>client-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册