提交 dd595806 编写于 作者: P peng-yongsheng

Fixed the bugs from the change of cache module created and segment standardizing.

上级 cdd9e88a
......@@ -62,7 +62,7 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
if (spanDecorator.getOperationNameId() == 0) {
segmentCost.setServiceName(spanDecorator.getOperationName());
} else {
segmentCost.setServiceName(ServiceNameCache.get(spanDecorator.getOperationNameId()));
segmentCost.setServiceName(ServiceNameCache.getSplitServiceName(ServiceNameCache.get(spanDecorator.getOperationNameId())));
}
segmentCosts.add(segmentCost);
......
......@@ -18,10 +18,8 @@
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
......@@ -38,14 +36,16 @@ import org.slf4j.LoggerFactory;
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public H2SqlEntity prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put("id", data.getDataString(0));
source.put(SegmentTable.COLUMN_DATA_BINARY, Base64.getEncoder().encode(data.getDataBytes(0)));
source.put(SegmentTable.COLUMN_DATA_BINARY, data.getDataBytes(0));
logger.debug("segment source: {}", source.toString());
String sql = SqlBuilder.buildBatchInsertSql(SegmentTable.TABLE, source.keySet());
......@@ -53,6 +53,7 @@ public class SegmentH2DAO extends H2DAO implements ISegmentDAO, IPersistenceDAO<
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public H2SqlEntity prepareBatchUpdate(Data data) {
return null;
}
......
......@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agentstream.worker.segment.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.ServiceNameCache;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
......@@ -45,6 +46,7 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
private boolean hasReference = false;
private int applicationId;
private int entryServiceId;
private String entryServiceName;
private boolean hasEntry = false;
@Override
......@@ -52,6 +54,7 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
String segmentId) {
this.applicationId = applicationId;
this.entryServiceId = spanDecorator.getOperationNameId();
this.entryServiceName = ServiceNameCache.getSplitServiceName(ServiceNameCache.get(entryServiceId));
this.hasEntry = true;
}
......@@ -74,6 +77,7 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
serviceEntry.setId(applicationId + Const.ID_SPLIT + entryServiceId);
serviceEntry.setApplicationId(applicationId);
serviceEntry.setEntryServiceId(entryServiceId);
serviceEntry.setEntryServiceName(entryServiceName);
serviceEntry.setRegisterTime(timeBucket);
serviceEntry.setNewestTime(timeBucket);
......
......@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.stream.Data;
......@@ -42,10 +41,11 @@ import org.slf4j.LoggerFactory;
*/
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity> {
private final Logger logger = LoggerFactory.getLogger(ServiceEntryH2DAO.class);
private static final String GET_SERIVCE_ENTRY_SQL = "select * from {0} where {1} = ?";
private static final String GET_SERVICE_ENTRY_SQL = "select * from {0} where {1} = ?";
@Override public Data get(String id, DataDefine dataDefine) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(ServiceEntryTable.TABLE, "id");
String sql = SqlBuilder.buildSql(GET_SERVICE_ENTRY_SQL, ServiceEntryTable.TABLE, "id");
Object[] params = new Object[] {id};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
......@@ -62,6 +62,7 @@ public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO, IPersi
}
return null;
}
@Override public H2SqlEntity prepareBatchInsert(Data data) {
H2SqlEntity entity = new H2SqlEntity();
Map<String, Object> source = new HashMap<>();
......@@ -76,6 +77,7 @@ public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO, IPersi
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public H2SqlEntity prepareBatchUpdate(Data data) {
H2SqlEntity entity = new H2SqlEntity();
Map<String, Object> source = new HashMap<>();
......
......@@ -22,16 +22,21 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.skywalking.apm.collector.agentregister.worker.application.dao.ApplicationEsDAO;
import org.skywalking.apm.collector.agentregister.worker.instance.dao.InstanceEsDAO;
import org.skywalking.apm.collector.agentregister.worker.servicename.dao.ServiceNameEsDAO;
import org.skywalking.apm.collector.agentregister.worker.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.agentregister.worker.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.agentregister.worker.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.agentstream.HttpClientTools;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.config.SystemConfig;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.elasticsearch.StorageElasticSearchModuleDefine;
import org.skywalking.apm.collector.storage.h2.StorageH2ModuleDefine;
/**
* @author pengys5
......@@ -39,41 +44,47 @@ import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefin
public class SegmentPost {
public static void main(String[] args) throws IOException, InterruptedException, CollectorException {
ElasticSearchClient client = new ElasticSearchClient("CollectorDBCluster", true, "127.0.0.1:9300");
client.initialize();
long now = TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis());
SystemConfig.DATA_PATH = "/Users/pengys5/code/sky-walking/sky-walking/apm-collector/data";
ElasticSearchClient elasticSearchClient = new ElasticSearchClient("CollectorDBCluster", true, "127.0.0.1:9300");
elasticSearchClient.initialize();
StorageElasticSearchModuleDefine storageElasticSearchModuleDefine = new StorageElasticSearchModuleDefine();
storageElasticSearchModuleDefine.injectClientIntoDAO(elasticSearchClient);
InstanceEsDAO instanceEsDAO = new InstanceEsDAO();
instanceEsDAO.setClient(client);
H2Client h2Client = new H2Client("jdbc:h2:tcp://localhost/~/test", "sa", "");
h2Client.initialize();
StorageH2ModuleDefine storageH2ModuleDefine = new StorageH2ModuleDefine();
storageH2ModuleDefine.injectClientIntoDAO(h2Client);
long now = TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis());
IInstanceDAO instanceDAO = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
InstanceDataDefine.Instance consumerInstance = new InstanceDataDefine.Instance("2", 2, "dubbox-consumer", now, 2, now, osInfo("consumer").toString());
instanceEsDAO.save(consumerInstance);
instanceDAO.save(consumerInstance);
InstanceDataDefine.Instance providerInstance = new InstanceDataDefine.Instance("3", 3, "dubbox-provider", now, 3, now, osInfo("provider").toString());
instanceEsDAO.save(providerInstance);
instanceDAO.save(providerInstance);
ApplicationEsDAO applicationEsDAO = new ApplicationEsDAO();
applicationEsDAO.setClient(client);
IApplicationDAO applicationDAO = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
ApplicationDataDefine.Application userApplication = new ApplicationDataDefine.Application("1", "User", 1);
applicationEsDAO.save(userApplication);
applicationDAO.save(userApplication);
ApplicationDataDefine.Application consumerApplication = new ApplicationDataDefine.Application("2", "dubbox-consumer", 2);
applicationEsDAO.save(consumerApplication);
applicationDAO.save(consumerApplication);
ApplicationDataDefine.Application providerApplication = new ApplicationDataDefine.Application("3", "dubbox-provider", 3);
applicationEsDAO.save(providerApplication);
// ApplicationDataDefine.Application peer = new ApplicationDataDefine.Application("4", "172.25.0.4:20880", 4);
// applicationEsDAO.save(peer);
applicationDAO.save(providerApplication);
ApplicationDataDefine.Application peer = new ApplicationDataDefine.Application("4", "172.25.0.4:20880", 4);
applicationDAO.save(peer);
ServiceNameEsDAO serviceNameEsDAO = new ServiceNameEsDAO();
serviceNameEsDAO.setClient(client);
IServiceNameDAO serviceNameDAO = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
ServiceNameDataDefine.ServiceName serviceName1 = new ServiceNameDataDefine.ServiceName("1", "", 0, 1);
serviceNameEsDAO.save(serviceName1);
serviceNameDAO.save(serviceName1);
ServiceNameDataDefine.ServiceName serviceName2 = new ServiceNameDataDefine.ServiceName("2", "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()", 2, 2);
serviceNameEsDAO.save(serviceName2);
serviceNameDAO.save(serviceName2);
ServiceNameDataDefine.ServiceName serviceName3 = new ServiceNameDataDefine.ServiceName("3", "/dubbox-case/case/dubbox-rest", 2, 3);
serviceNameEsDAO.save(serviceName3);
serviceNameDAO.save(serviceName3);
ServiceNameDataDefine.ServiceName serviceName4 = new ServiceNameDataDefine.ServiceName("4", "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()", 3, 4);
serviceNameEsDAO.save(serviceName4);
serviceNameDAO.save(serviceName4);
while (true) {
JsonElement consumer = JsonFileReader.INSTANCE.read("json/segment/normal/dubbox-consumer.json");
......@@ -124,4 +135,8 @@ public class SegmentPost {
return osInfoJson;
}
private void newDao() {
}
}
......@@ -16,8 +16,8 @@ agent_stream:
port: 12800
context_path: /
config:
buffer_offset_max_file_size: 1k
buffer_segment_max_file_size: 10k
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
ui:
jetty:
host: localhost
......
......@@ -56,4 +56,17 @@ public class ServiceNameCache {
return serviceName;
}
public static String getSplitServiceName(String serviceName) {
if (StringUtils.isNotEmpty(serviceName)) {
String[] serviceNames = serviceName.split(Const.ID_SPLIT);
if (serviceNames.length == 2) {
return serviceNames[1];
} else {
return Const.EMPTY_STRING;
}
} else {
return Const.EMPTY_STRING;
}
}
}
......@@ -51,8 +51,7 @@ public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int applicationId = (int)searchHit.getSource().get(ApplicationTable.COLUMN_APPLICATION_ID);
return applicationId;
return (int)searchHit.getSource().get(ApplicationTable.COLUMN_APPLICATION_ID);
}
return 0;
}
......@@ -66,6 +65,6 @@ public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
if (getResponse.isExists()) {
return (String)getResponse.getSource().get(ApplicationTable.COLUMN_APPLICATION_CODE);
}
return Const.UNKNOWN;
return Const.EMPTY_STRING;
}
}
......@@ -57,6 +57,6 @@ public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return Const.UNKNOWN;
return Const.EMPTY_STRING;
}
}
......@@ -44,7 +44,7 @@ public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
int applicationId = ((Number)getResponse.getSource().get(ServiceNameTable.COLUMN_APPLICATION_ID)).intValue();
return applicationId + Const.ID_SPLIT + serviceName;
}
return Const.UNKNOWN;
return Const.EMPTY_STRING;
}
@Override public int getServiceId(int applicationId, String serviceName) {
......@@ -61,8 +61,7 @@ public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
SearchResponse searchResponse = searchRequestBuilder.get();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int serviceId = (int)searchHit.getSource().get(ServiceNameTable.COLUMN_SERVICE_ID);
return serviceId;
return (int)searchHit.getSource().get(ServiceNameTable.COLUMN_SERVICE_ID);
}
return 0;
}
......
......@@ -33,23 +33,27 @@ import org.slf4j.LoggerFactory;
* @author pengys5, clevertension
*/
public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameH2DAO.class);
private static final String GET_SERVICE_NAME_SQL = "select {0} from {1} where {2} = ?";
private static final String GET_SERVICE_NAME_SQL = "select {0},{1} from {2} where {3} = ?";
private static final String GET_SERVICE_ID_SQL = "select {0} from {1} where {2} = ? and {3} = ? limit 1";
@Override public String getServiceName(int serviceId) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SERVICE_NAME_SQL, ServiceNameTable.COLUMN_SERVICE_NAME,
String sql = SqlBuilder.buildSql(GET_SERVICE_NAME_SQL, ServiceNameTable.COLUMN_APPLICATION_ID, ServiceNameTable.COLUMN_SERVICE_NAME,
ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
Object[] params = new Object[] {serviceId};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
return rs.getString(ServiceNameTable.COLUMN_SERVICE_NAME);
String serviceName = rs.getString(ServiceNameTable.COLUMN_SERVICE_NAME);
int applicationId = rs.getInt(ServiceNameTable.COLUMN_APPLICATION_ID);
return applicationId + Const.ID_SPLIT + serviceName;
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return Const.UNKNOWN;
return Const.EMPTY_STRING;
}
@Override public int getServiceId(int applicationId, String serviceName) {
......@@ -59,8 +63,7 @@ public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO {
Object[] params = new Object[] {applicationId, serviceName};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
int serviceId = rs.getInt(ServiceNameTable.COLUMN_SERVICE_ID);
return serviceId;
return rs.getInt(ServiceNameTable.COLUMN_SERVICE_ID);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
......
......@@ -19,14 +19,13 @@
package org.skywalking.apm.collector.client.h2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.h2.jdbcx.JdbcConnectionPool;
import org.h2.util.IOUtils;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.config.SystemConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,32 +36,34 @@ public class H2Client implements Client {
private final Logger logger = LoggerFactory.getLogger(H2Client.class);
private JdbcConnectionPool cp;
private Connection conn;
private String url;
private String userName;
private String password;
public H2Client() {
this.url = "jdbc:h2:" + SystemConfig.DATA_PATH + "/h2";
this.url = "jdbc:h2:mem:collector";
this.userName = "";
this.password = "";
}
public H2Client(String url, String userName, String password) {
this.url = url;
this.userName = userName;
this.password = password;
}
@Override public void initialize() throws H2ClientException {
try {
cp = JdbcConnectionPool.
create(this.url, this.userName, this.password);
conn = cp.getConnection();
Class.forName("org.h2.Driver");
conn = DriverManager.
getConnection(this.url, this.userName, this.password);
} catch (Exception e) {
throw new H2ClientException(e.getMessage(), e);
}
}
@Override public void shutdown() {
if (cp != null) {
cp.dispose();
}
IOUtils.closeSilently(conn);
}
......
......@@ -38,9 +38,9 @@ public class ServiceEntryDataDefine extends DataDefine {
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceEntryTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceEntryTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(1, new Attribute(ServiceEntryTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(ServiceEntryTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new NonOperation()));
addAttribute(5, new Attribute(ServiceEntryTable.COLUMN_NEWEST_TIME, AttributeType.LONG, new CoverOperation()));
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2;
/**
* @author clevertension
*/
public class StorageH2Config {
public static String URL;
public static String USER_NAME;
public static String PASSWORD;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.config.SystemConfig;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class StorageH2ConfigParser implements ModuleConfigParser {
private static final String URL = "url";
public static final String USER_NAME = "user_name";
public static final String PASSWORD = "password";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(URL))) {
StorageH2Config.URL = (String)config.get(URL);
} else {
StorageH2Config.URL = "jdbc:h2:" + SystemConfig.DATA_PATH + "/h2";
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(USER_NAME))) {
StorageH2Config.USER_NAME = (String)config.get(USER_NAME);
} else {
StorageH2Config.USER_NAME = "sa";
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(PASSWORD))) {
StorageH2Config.PASSWORD = (String)config.get(PASSWORD);
}
}
}
\ No newline at end of file
......@@ -51,11 +51,11 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
}
@Override protected ModuleConfigParser configParser() {
return null;
return new StorageH2ConfigParser();
}
@Override protected Client createClient() {
return new H2Client();
return new H2Client(StorageH2Config.URL, StorageH2Config.USER_NAME, StorageH2Config.PASSWORD);
}
@Override public StorageInstaller storageInstaller() {
......
......@@ -66,7 +66,7 @@ public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
while (rs.next()) {
String segmentId = rs.getString(GlobalTraceTable.COLUMN_SEGMENT_ID);
logger.debug("segmentId: {}, global trace id: {}", segmentId, globalTraceId);
segmentIds.add(globalTraceId);
segmentIds.add(segmentId);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
......
......@@ -26,6 +26,4 @@ import java.util.Map;
*/
public interface IServiceReferenceDAO {
Map<String, JsonObject> load(int entryServiceId, long startTime, long endTime);
Map<String, JsonObject> load(String entryServiceName, int entryApplicationId, long startTime, long endTime);
}
......@@ -18,13 +18,11 @@
package org.skywalking.apm.collector.ui.dao;
import com.google.protobuf.InvalidProtocolBufferException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Base64;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.define.segment.SegmentTable;
import org.skywalking.apm.collector.storage.h2.SqlBuilder;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
......@@ -32,29 +30,25 @@ import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* @author pengys5, clevertension
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentH2DAO.class);
private static final String GET_SEGMENT_SQL = "select {0} from {1} where {2} = ?";
@Override public TraceSegmentObject load(String segmentId) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SEGMENT_SQL, SegmentTable.COLUMN_DATA_BINARY,
SegmentTable.TABLE, "id");
Object[] params = new Object[]{segmentId};
SegmentTable.TABLE, "id");
Object[] params = new Object[] {segmentId};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
String dataBinaryBase64 = rs.getString(SegmentTable.COLUMN_DATA_BINARY);
if (StringUtils.isNotEmpty(dataBinaryBase64)) {
byte[] dataBinary = Base64.getDecoder().decode(dataBinaryBase64);
try {
return TraceSegmentObject.parseFrom(dataBinary);
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
byte[] dataBinary = rs.getBytes(SegmentTable.COLUMN_DATA_BINARY);
try {
return TraceSegmentObject.parseFrom(dataBinary);
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
}
} catch (SQLException | H2ClientException e) {
......
......@@ -76,7 +76,7 @@ public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO {
while (rs.next()) {
int appId = rs.getInt(ServiceEntryTable.COLUMN_APPLICATION_ID);
int entryServiceId = rs.getInt(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID);
String applicationCode = ApplicationCache.get(applicationId);
String applicationCode = ApplicationCache.get(appId);
String entryServiceName1 = rs.getString(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME);
JsonObject row = new JsonObject();
......
......@@ -29,11 +29,9 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.cache.ServiceIdCache;
import org.skywalking.apm.collector.cache.ServiceNameCache;
import org.skywalking.apm.collector.core.util.ColumnNameUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......@@ -46,30 +44,6 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsDAO.class);
@Override
public Map<String, JsonObject> load(String entryServiceName, int entryApplicationId, long startTime, long endTime) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceReferenceTable.TABLE);
searchRequestBuilder.setTypes(ServiceReferenceTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
int entryServiceId = ServiceIdCache.get(entryApplicationId, entryServiceName);
BoolQueryBuilder entryBoolQuery = QueryBuilders.boolQuery();
if (entryServiceId != 0) {
entryBoolQuery.should().add(QueryBuilders.matchQuery(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, entryServiceId));
}
entryBoolQuery.should().add(QueryBuilders.matchQuery(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, entryApplicationId + Const.ID_SPLIT + entryServiceName));
boolQuery.must(entryBoolQuery);
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
return load(searchRequestBuilder);
}
@Override public Map<String, JsonObject> load(int entryServiceId, long startTime, long endTime) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceReferenceTable.TABLE);
searchRequestBuilder.setTypes(ServiceReferenceTable.TABLE_TYPE);
......@@ -79,7 +53,7 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
String entryServiceName = ServiceNameCache.get(entryServiceId);
String entryServiceName = ServiceNameCache.getSplitServiceName(ServiceNameCache.get(entryServiceId));
BoolQueryBuilder entryBoolQuery = QueryBuilders.boolQuery();
entryBoolQuery.should().add(QueryBuilders.matchQuery(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, entryServiceId));
entryBoolQuery.should().add(QueryBuilders.matchQuery(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, entryServiceName));
......@@ -131,14 +105,8 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
Sum summary = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_SUMMARY);
Sum costSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_COST_SUMMARY);
String frontServiceName = ServiceNameCache.get(frontServiceId);
if (StringUtils.isNotEmpty(frontServiceName)) {
frontServiceName = frontServiceName.split(Const.ID_SPLIT)[1];
}
String behindServiceName = ServiceNameCache.get(behindServiceId);
if (StringUtils.isNotEmpty(frontServiceName)) {
behindServiceName = behindServiceName.split(Const.ID_SPLIT)[1];
}
String frontServiceName = ServiceNameCache.getSplitServiceName(ServiceNameCache.get(frontServiceId));
String behindServiceName = ServiceNameCache.getSplitServiceName(ServiceNameCache.get(behindServiceId));
JsonObject serviceReference = new JsonObject();
serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID), frontServiceId);
......
......@@ -23,13 +23,11 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.cache.ServiceIdCache;
import org.skywalking.apm.collector.cache.ServiceNameCache;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.util.ColumnNameUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.h2.SqlBuilder;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
......@@ -42,50 +40,18 @@ import org.slf4j.LoggerFactory;
public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2DAO.class);
private static final String GET_SRV_REF_LOAD1 = "select {4}, {5}, {6}, {7}, sum({8}) as cnt1, sum({9}) as cnt2, sum({10}) as cnt3" +
",sum({11}) as cnt4, sum({12}) cnt5, sum({13}) as cnt6, sum({14}) as cnt7 from {0} where {1} >= ? and {1} <= ? and {2} = ? and {3} = ? group by {4}, {5}, {6}, {7}";
private static final String GET_SRV_REF_LOAD2 = "select {3}, {4}, {5}, {6}, sum({7}) as cnt1, sum({8}) as cnt2, sum({9}) as cnt3" +
",sum({10}) as cnt4, sum({11}) cnt5, sum({12}) as cnt6, sum({13}) as cnt7 from {0} where {1} >= ? and {1} <= ? and {2} = ? group by {3}, {4}, {5}, {6}";
@Override
public Map<String, JsonObject> load(String entryServiceName, int entryApplicationId, long startTime, long endTime) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SRV_REF_LOAD2, ServiceReferenceTable.TABLE,
ServiceReferenceTable.COLUMN_TIME_BUCKET, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME,
ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID,
ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME,
ServiceReferenceTable.COLUMN_S1_LTE, ServiceReferenceTable.COLUMN_S3_LTE, ServiceReferenceTable.COLUMN_S5_LTE,
ServiceReferenceTable.COLUMN_S5_GT, ServiceReferenceTable.COLUMN_ERROR, ServiceReferenceTable.COLUMN_SUMMARY,
ServiceReferenceTable.COLUMN_COST_SUMMARY);
Object[] params = new Object[] {startTime, endTime, entryServiceName};
entryServiceName = entryApplicationId + Const.ID_SPLIT + entryServiceName;
int entryServiceId = ServiceIdCache.get(entryApplicationId, entryServiceName);
if (entryServiceId != 0) {
sql = SqlBuilder.buildSql(GET_SRV_REF_LOAD1, ServiceReferenceTable.TABLE,
ServiceReferenceTable.COLUMN_TIME_BUCKET, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME,
ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID,
ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME,
ServiceReferenceTable.COLUMN_S1_LTE, ServiceReferenceTable.COLUMN_S3_LTE, ServiceReferenceTable.COLUMN_S5_LTE,
ServiceReferenceTable.COLUMN_S5_GT, ServiceReferenceTable.COLUMN_ERROR, ServiceReferenceTable.COLUMN_SUMMARY,
ServiceReferenceTable.COLUMN_COST_SUMMARY);
params = new Object[] {startTime, endTime, entryServiceId, entryServiceName};
}
return load(client, params, sql);
}
private static final String GET_SRV_REF_LOAD1 = "select {3}, {4}, sum({5}) as {5}, sum({6}) as {6}, sum({7}) as {7}" +
",sum({8}) as {8}, sum({9}) as {9}, sum({10}) as {10}, sum({11}) as {11} from {0} where {1} >= ? and {1} <= ? and {2} = ? group by {3}, {4}";
@Override public Map<String, JsonObject> load(int entryServiceId, long startTime, long endTime) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SRV_REF_LOAD1, ServiceReferenceTable.TABLE,
ServiceReferenceTable.COLUMN_TIME_BUCKET, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME,
ServiceReferenceTable.COLUMN_TIME_BUCKET, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID,
ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID,
ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME,
ServiceReferenceTable.COLUMN_S1_LTE, ServiceReferenceTable.COLUMN_S3_LTE, ServiceReferenceTable.COLUMN_S5_LTE,
ServiceReferenceTable.COLUMN_S5_GT, ServiceReferenceTable.COLUMN_ERROR, ServiceReferenceTable.COLUMN_SUMMARY,
ServiceReferenceTable.COLUMN_COST_SUMMARY);
String entryServiceName = ServiceNameCache.get(entryServiceId);
Object[] params = new Object[] {startTime, endTime, entryServiceId, entryServiceName};
Object[] params = new Object[] {startTime, endTime, entryServiceId};
return load(client, params, sql);
}
......@@ -109,22 +75,16 @@ public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO
try {
int behindServiceId = rs.getInt(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID);
if (behindServiceId != 0) {
long s1LteSum = rs.getLong("cnt1");
long s3LteSum = rs.getLong("cnt2");
long s5LteSum = rs.getLong("cnt3");
long s5GtSum = rs.getLong("cnt3");
long error = rs.getLong("cnt3");
long summary = rs.getLong("cnt3");
long costSum = rs.getLong("cnt3");
long s1LteSum = rs.getLong(ServiceReferenceTable.COLUMN_S1_LTE);
long s3LteSum = rs.getLong(ServiceReferenceTable.COLUMN_S3_LTE);
long s5LteSum = rs.getLong(ServiceReferenceTable.COLUMN_S5_LTE);
long s5GtSum = rs.getLong(ServiceReferenceTable.COLUMN_S5_GT);
long error = rs.getLong(ServiceReferenceTable.COLUMN_ERROR);
long summary = rs.getLong(ServiceReferenceTable.COLUMN_SUMMARY);
long costSum = rs.getLong(ServiceReferenceTable.COLUMN_COST_SUMMARY);
String frontServiceName = ServiceNameCache.get(frontServiceId);
if (StringUtils.isNotEmpty(frontServiceName)) {
frontServiceName = frontServiceName.split(Const.ID_SPLIT)[1];
}
String behindServiceName = ServiceNameCache.get(behindServiceId);
if (StringUtils.isNotEmpty(frontServiceName)) {
behindServiceName = behindServiceName.split(Const.ID_SPLIT)[1];
}
String frontServiceName = ServiceNameCache.getSplitServiceName(ServiceNameCache.get(frontServiceId));
String behindServiceName = ServiceNameCache.getSplitServiceName(ServiceNameCache.get(behindServiceId));
JsonObject serviceReference = new JsonObject();
serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID), frontServiceId);
......
......@@ -39,7 +39,6 @@ import org.skywalking.apm.collector.ui.jetty.handler.instancemetric.InstanceMetr
import org.skywalking.apm.collector.ui.jetty.handler.instancemetric.InstanceOsInfoGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.EntryServiceGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.ServiceTreeGetByIdHandler;
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.ServiceTreeGetByNameHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.AllInstanceLastTimeGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.OneInstanceLastTimeGetHandler;
......@@ -89,7 +88,6 @@ public class UIJettyModuleDefine extends UIModuleDefine {
handlers.add(new InstanceMetricGetRangeTimeBucketHandler());
handlers.add(new EntryServiceGetHandler());
handlers.add(new ServiceTreeGetByIdHandler());
handlers.add(new ServiceTreeGetByNameHandler());
return handlers;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.ui.jetty.handler.servicetree;
import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.service.ServiceTreeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceTreeGetByNameHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceTreeGetByNameHandler.class);
@Override public String pathSpec() {
return "/service/tree/entryServiceName";
}
private ServiceTreeService service = new ServiceTreeService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
if (!req.getParameterMap().containsKey("entryServiceName") || !req.getParameterMap().containsKey("entryApplicationId") || !req.getParameterMap().containsKey("startTime") || !req.getParameterMap().containsKey("endTime")) {
throw new ArgumentsParseException("must contains parameters: entryServiceName, entryApplicationId, startTime, endTime");
}
String entryServiceName = req.getParameter("entryServiceName");
String entryApplicationIdStr = req.getParameter("entryApplicationId");
String startTimeStr = req.getParameter("startTime");
String endTimeStr = req.getParameter("endTime");
logger.debug("service entry get entryServiceName: {}, startTime: {}, endTime: {}", entryServiceName, startTimeStr, endTimeStr);
int entryApplicationId;
try {
entryApplicationId = Integer.parseInt(entryApplicationIdStr);
} catch (NumberFormatException e) {
throw new ArgumentsParseException("entry application id must be integer");
}
long startTime;
try {
startTime = Long.parseLong(startTimeStr);
} catch (NumberFormatException e) {
throw new ArgumentsParseException("start time must be long");
}
long endTime;
try {
endTime = Long.parseLong(endTimeStr);
} catch (NumberFormatException e) {
throw new ArgumentsParseException("end time must be long");
}
return service.loadServiceTree(entryServiceName, entryApplicationId, startTime, endTime);
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
......@@ -47,12 +47,6 @@ public class ServiceTreeService {
return buildTreeData(serviceReferenceMap);
}
public JsonArray loadServiceTree(String entryServiceName, int entryApplicationId, long startTime, long endTime) {
IServiceReferenceDAO serviceReferenceDAO = (IServiceReferenceDAO)DAOContainer.INSTANCE.get(IServiceReferenceDAO.class.getName());
Map<String, JsonObject> serviceReferenceMap = serviceReferenceDAO.load(entryServiceName, entryApplicationId, startTime, endTime);
return buildTreeData(serviceReferenceMap);
}
private JsonArray buildTreeData(Map<String, JsonObject> serviceReferenceMap) {
JsonArray serviceReferenceArray = new JsonArray();
JsonObject rootServiceReference = findRoot(serviceReferenceMap);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册