提交 0b2e355b 编写于 作者: J Jared Tan 提交者: wu-sheng

extend Metadata limit (#2492)

* fix default MetadataQuery Max size.

* revert mysql
上级 ce1c7aad
......@@ -56,10 +56,10 @@ core:
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
storage:
elasticsearch:
# set the namespace in elasticsearch
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:elasticsearch:9200}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
......@@ -67,11 +67,14 @@ storage:
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
# h2:
# driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
# url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
# user: ${SW_STORAGE_H2_USER:sa}
# metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
# mysql:
# metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
receiver-sharing-server:
default:
receiver-register:
......@@ -107,4 +110,4 @@ query:
alarm:
default:
telemetry:
none:
\ No newline at end of file
none:
......@@ -55,15 +55,11 @@ core:
dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
storage:
h2:
driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
user: ${SW_STORAGE_H2_USER:sa}
# elasticsearch:
# # nameSpace: ${SW_NAMESPACE:""}
# # user: ${SW_ES_USER:""}
# # password: ${SW_ES_PASSWORD:""}
# nameSpace: ${SW_NAMESPACE:""}
# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
......@@ -71,7 +67,14 @@ storage:
# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
# metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
h2:
driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
user: ${SW_STORAGE_H2_USER:sa}
metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
# mysql:
# metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
receiver-sharing-server:
default:
receiver-register:
......@@ -109,4 +112,4 @@ query:
alarm:
default:
telemetry:
none:
\ No newline at end of file
none:
......@@ -67,11 +67,14 @@ storage:
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
# h2:
# driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
# url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
# user: ${SW_STORAGE_H2_USER:sa}
# metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
# mysql:
# metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
receiver-sharing-server:
default:
receiver-register:
......
......@@ -42,6 +42,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int concurrentRequests = 2;
private String user;
private String password;
private int metadataQueryMaxSize = 5000;
public String getUser() {
return user;
......@@ -154,4 +155,12 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
public void setConcurrentRequests(int concurrentRequests) {
this.concurrentRequests = concurrentRequests == 0 ? 2 : concurrentRequests;
}
public int getMetadataQueryMaxSize() {
return metadataQueryMaxSize;
}
public void setMetadataQueryMaxSize(int metadataQueryMaxSize) {
this.metadataQueryMaxSize = metadataQueryMaxSize;
}
}
......@@ -21,16 +21,50 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import java.io.IOException;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.EndpointInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInventoryCacheEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockDAOImpl;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AlarmQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.LogQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
......@@ -82,7 +116,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
......
......@@ -19,25 +19,42 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import com.google.common.base.Strings;
import com.google.gson.*;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.query.entity.Attribute;
import org.apache.skywalking.oap.server.core.query.entity.Database;
import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
import org.apache.skywalking.oap.server.core.query.entity.Language;
import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans;
import org.apache.skywalking.oap.server.core.query.entity.Service;
import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.*;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.HOST_NAME;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.IPV4S;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.LANGUAGE;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.OS_NAME;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.PROCESS_NO;
/**
* @author peng-yongsheng
......@@ -45,8 +62,11 @@ import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInve
public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
private static final Gson GSON = new Gson();
public MetadataQueryEsDAO(ElasticSearchClient client) {
private final int queryMaxSize;
public MetadataQueryEsDAO(ElasticSearchClient client, int queryMaxSize) {
super(client);
this.queryMaxSize = queryMaxSize;
}
@Override public int numOfService(long startTimestamp, long endTimestamp) throws IOException {
......@@ -100,7 +120,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(100);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
......@@ -115,7 +135,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.NODE_TYPE, NodeType.Database.value()));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(100);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
......@@ -123,9 +143,9 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
for (SearchHit searchHit : response.getHits()) {
Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
Database database = new Database();
database.setId(((Number) sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue());
database.setName((String) sourceAsMap.get(ServiceInventory.NAME));
String propertiesString = (String) sourceAsMap.get(ServiceInstanceInventory.PROPERTIES);
database.setId(((Number)sourceAsMap.get(ServiceInventory.SEQUENCE)).intValue());
database.setName((String)sourceAsMap.get(ServiceInventory.NAME));
String propertiesString = (String)sourceAsMap.get(ServiceInstanceInventory.PROPERTIES);
if (!Strings.isNullOrEmpty(propertiesString)) {
JsonObject properties = GSON.fromJson(propertiesString, JsonObject.class);
if (properties.has(ServiceInventory.PropertyUtil.DATABASE)) {
......@@ -153,7 +173,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
}
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(100);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
return buildServices(response);
......@@ -214,7 +234,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInstanceInventory.SERVICE_ID, serviceId));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(100);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInstanceInventory.MODEL_NAME, sourceBuilder);
......
......@@ -50,4 +50,4 @@
<!--</dependency>-->
</dependencies>
</project>
\ No newline at end of file
</project>
......@@ -32,4 +32,5 @@ public class H2StorageConfig extends ModuleConfig {
private String url = "jdbc:h2:mem:collector";
private String user = "";
private String password = "";
private int metadataQueryMaxSize = 5000;
}
......@@ -20,13 +20,50 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2LogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* H2 Storage provider is for demonstration and preview only. I will find that haven't implemented several interfaces,
......@@ -82,7 +119,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
this.registerServiceImplementation(IMetricQueryDAO.class, new H2MetricQueryDAO(h2Client));
this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client));
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client));
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
......
......@@ -19,20 +19,36 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import com.google.common.base.Strings;
import com.google.gson.*;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.*;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.Attribute;
import org.apache.skywalking.oap.server.core.query.entity.Database;
import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
import org.apache.skywalking.oap.server.core.query.entity.Language;
import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans;
import org.apache.skywalking.oap.server.core.query.entity.Service;
import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.*;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.HOST_NAME;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.IPV4S;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.LANGUAGE;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.OS_NAME;
import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.PROCESS_NO;
/**
* @author wusheng
......@@ -41,9 +57,11 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
private static final Gson GSON = new Gson();
private JDBCHikariCPClient h2Client;
private int metadataQueryMaxSize;
public H2MetadataQueryDAO(JDBCHikariCPClient h2Client) {
public H2MetadataQueryDAO(JDBCHikariCPClient h2Client, int metadataQueryMaxSize) {
this.h2Client = h2Client;
this.metadataQueryMaxSize = metadataQueryMaxSize;
}
@Override
......@@ -88,7 +106,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
@Override
public int numOfConjectural(long startTimestamp, long endTimestamp,
int nodeTypeValue) throws IOException {
int nodeTypeValue) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(ServiceInventory.MODEL_NAME).append(" where ");
......@@ -113,7 +131,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=? limit 100");
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=? limit ").append(metadataQueryMaxSize);
condition.add(BooleanUtils.FALSE);
try (Connection connection = h2Client.getConnection()) {
......@@ -130,7 +148,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(1);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append(ServiceInventory.NODE_TYPE).append("=? limit 100");
sql.append(ServiceInventory.NODE_TYPE).append("=? limit ").append(metadataQueryMaxSize);
condition.add(NodeType.Database.value());
try (Connection connection = h2Client.getConnection()) {
......@@ -160,7 +178,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
@Override
public List<Service> searchServices(long startTimestamp, long endTimestamp,
String keyword) throws IOException {
String keyword) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
......@@ -170,7 +188,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
if (!Strings.isNullOrEmpty(keyword)) {
sql.append(" and ").append(ServiceInventory.NAME).append(" like \"%").append(keyword).append("%\"");
}
sql.append(" limit 100");
sql.append(" limit ").append(metadataQueryMaxSize);
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
......@@ -210,7 +228,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
@Override
public List<Endpoint> searchEndpoint(String keyword, String serviceId,
int limit) throws IOException {
int limit) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(EndpointInventory.MODEL_NAME).append(" where ");
......@@ -242,7 +260,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
@Override
public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp,
String serviceId) throws IOException {
String serviceId) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInstanceInventory.MODEL_NAME).append(" where ");
......@@ -297,13 +315,13 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
}
private void setTimeRangeCondition(StringBuilder sql, List<Object> conditions, long startTimestamp,
long endTimestamp) {
long endTimestamp) {
sql.append(" ( (").append(RegisterSource.HEARTBEAT_TIME).append(" >= ? and ")
.append(RegisterSource.REGISTER_TIME).append(" <= ? )");
.append(RegisterSource.REGISTER_TIME).append(" <= ? )");
conditions.add(endTimestamp);
conditions.add(endTimestamp);
sql.append(" or (").append(RegisterSource.REGISTER_TIME).append(" <= ? and ")
.append(RegisterSource.HEARTBEAT_TIME).append(" >= ? ) ) ");
.append(RegisterSource.HEARTBEAT_TIME).append(" >= ? ) ) ");
conditions.add(endTimestamp);
conditions.add(startTimestamp);
}
......
......@@ -21,15 +21,48 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.io.IOException;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.*;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MySQL storage provider should be secondary choice for production usage as SkyWalking storage solution. It enhanced
......@@ -88,7 +121,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(mysqlClient));
this.registerServiceImplementation(IMetricQueryDAO.class, new H2MetricQueryDAO(mysqlClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new MySQLTraceQueryDAO(mysqlClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册