未验证 提交 4fa19f85 编写于 作者: 李运涛 liyuntao 提交者: GitHub

Merge branch 'master' into servicecomb

......@@ -24,7 +24,9 @@ import org.apache.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface INetworkAddressIDService extends Service {
int create(String networkAddress, int spanLayer);
int getOrCreate(String networkAddress);
int get(String networkAddress);
void update(int addressId, int spanLayer, int serverType);
}
......@@ -41,7 +41,7 @@ public class NetworkAddressRegisterSerialWorker extends AbstractLocalAsyncWorker
private final INetworkAddressRegisterDAO networkAddressRegisterDAO;
private final NetworkAddressCacheService networkAddressCacheService;
public NetworkAddressRegisterSerialWorker(ModuleManager moduleManager) {
NetworkAddressRegisterSerialWorker(ModuleManager moduleManager) {
super(moduleManager);
this.networkAddressRegisterDAO = getModuleManager().find(StorageModule.NAME).getService(INetworkAddressRegisterDAO.class);
this.networkAddressCacheService = getModuleManager().find(CacheModule.NAME).getService(NetworkAddressCacheService.class);
......@@ -53,28 +53,32 @@ public class NetworkAddressRegisterSerialWorker extends AbstractLocalAsyncWorker
@Override protected void onWork(NetworkAddress networkAddress) throws WorkerException {
logger.debug("register network address, address: {}", networkAddress.getNetworkAddress());
int addressId = networkAddressCacheService.getAddressId(networkAddress.getNetworkAddress());
if (networkAddress.getAddressId() == 0) {
int addressId = networkAddressCacheService.getAddressId(networkAddress.getNetworkAddress());
if (addressId == 0) {
NetworkAddress newNetworkAddress;
int min = networkAddressRegisterDAO.getMinNetworkAddressId();
if (min == 0) {
newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId("-1");
newNetworkAddress.setAddressId(-1);
newNetworkAddress.setSpanLayer(networkAddress.getSpanLayer());
newNetworkAddress.setNetworkAddress(networkAddress.getNetworkAddress());
} else {
int max = networkAddressRegisterDAO.getMaxNetworkAddressId();
addressId = IdAutoIncrement.INSTANCE.increment(min, max);
if (addressId == 0) {
NetworkAddress newNetworkAddress;
int min = networkAddressRegisterDAO.getMinNetworkAddressId();
if (min == 0) {
newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId("-1");
newNetworkAddress.setAddressId(-1);
newNetworkAddress.setSpanLayer(networkAddress.getSpanLayer());
newNetworkAddress.setNetworkAddress(networkAddress.getNetworkAddress());
} else {
int max = networkAddressRegisterDAO.getMaxNetworkAddressId();
addressId = IdAutoIncrement.INSTANCE.increment(min, max);
newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId(String.valueOf(addressId));
newNetworkAddress.setAddressId(addressId);
newNetworkAddress.setSpanLayer(networkAddress.getSpanLayer());
newNetworkAddress.setNetworkAddress(networkAddress.getNetworkAddress());
newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId(String.valueOf(addressId));
newNetworkAddress.setAddressId(addressId);
newNetworkAddress.setSpanLayer(networkAddress.getSpanLayer());
newNetworkAddress.setNetworkAddress(networkAddress.getNetworkAddress());
}
networkAddressRegisterDAO.save(newNetworkAddress);
}
networkAddressRegisterDAO.save(newNetworkAddress);
} else {
networkAddressRegisterDAO.update(networkAddress.getId(), networkAddress.getSpanLayer(), networkAddress.getServerType());
}
}
......
......@@ -72,7 +72,7 @@ public class InstanceIDService implements IInstanceIDService {
}
@Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo) {
logger.debug("get or create instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
int instanceId = getInstanceCacheService().getInstanceIdByAgentUUID(applicationId, agentUUID);
if (instanceId == 0) {
......@@ -93,7 +93,7 @@ public class InstanceIDService implements IInstanceIDService {
}
@Override public int getOrCreateByAddressId(int applicationId, int addressId, long registerTime) {
logger.debug("get or create instance id by address id, application id: {}, address id: {}, registerTime: {}", applicationId, addressId, registerTime);
logger.debug("get or getOrCreate instance id by address id, application id: {}, address id: {}, registerTime: {}", applicationId, addressId, registerTime);
int instanceId = getInstanceCacheService().getInstanceIdByAddressId(applicationId, addressId);
if (instanceId == 0) {
......
......@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheServ
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
......@@ -74,7 +75,7 @@ public class NetworkAddressIDService implements INetworkAddressIDService {
return this.networkAddressGraph;
}
@Override public int create(String networkAddress, int spanLayer) {
@Override public int getOrCreate(String networkAddress) {
int addressId = getNetworkAddressCacheService().getAddressId(networkAddress);
if (addressId != 0) {
......@@ -89,10 +90,11 @@ public class NetworkAddressIDService implements INetworkAddressIDService {
}
} else {
NetworkAddress newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId("0");
newNetworkAddress.setId(String.valueOf(Const.NONE));
newNetworkAddress.setNetworkAddress(networkAddress);
newNetworkAddress.setSpanLayer(spanLayer);
newNetworkAddress.setAddressId(0);
newNetworkAddress.setSpanLayer(Const.NONE);
newNetworkAddress.setServerType(Const.NONE);
newNetworkAddress.setAddressId(Const.NONE);
getNetworkAddressGraph().start(newNetworkAddress);
}
......@@ -103,4 +105,16 @@ public class NetworkAddressIDService implements INetworkAddressIDService {
@Override public int get(String networkAddress) {
return getNetworkAddressCacheService().getAddressId(networkAddress);
}
@Override public void update(int addressId, int spanLayer, int serverType) {
if (!networkAddressCacheService.compare(addressId, spanLayer, serverType)) {
NetworkAddress newNetworkAddress = new NetworkAddress();
newNetworkAddress.setId(String.valueOf(addressId));
newNetworkAddress.setSpanLayer(spanLayer);
newNetworkAddress.setServerType(serverType);
newNetworkAddress.setAddressId(addressId);
getNetworkAddressGraph().start(newNetworkAddress);
}
}
}
......@@ -81,7 +81,7 @@ public enum SegmentBufferManager {
}
private void newDataFile() throws IOException {
logger.debug("create new segment buffer file");
logger.debug("getOrCreate new segment buffer file");
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
......
......@@ -89,7 +89,8 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getNetworkAddressId() == 0 && StringUtils.isNotEmpty(standardBuilder.getNetworkAddress())) {
int networkAddressId = networkAddressIDService.get(standardBuilder.getNetworkAddress());
int networkAddressId = networkAddressIDService.getOrCreate(standardBuilder.getNetworkAddress());
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("network address: {} from application id: {} exchange failed", standardBuilder.getNetworkAddress(), applicationId);
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decora
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.storage.table.register.ServerTypeDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -53,7 +54,7 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
@Override public boolean exchange(SpanDecorator standardBuilder, int applicationId) {
if (standardBuilder.getPeerId() == 0 && StringUtils.isNotEmpty(standardBuilder.getPeer())) {
int peerId = networkAddressIDService.create(standardBuilder.getPeer(), standardBuilder.getSpanLayer().getNumber());
int peerId = networkAddressIDService.getOrCreate(standardBuilder.getPeer());
if (peerId == 0) {
logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
......@@ -62,6 +63,10 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
standardBuilder.toBuilder();
standardBuilder.setPeerId(peerId);
standardBuilder.setPeer(Const.EMPTY_STRING);
int spanLayer = standardBuilder.getSpanLayerValue();
int serverType = ServerTypeDefine.getInstance().getServerTypeId(standardBuilder.getComponentId());
networkAddressIDService.update(peerId, spanLayer, serverType);
}
}
......
......@@ -27,4 +27,6 @@ public interface NetworkAddressCacheService extends Service {
int getAddressId(String networkAddress);
String getAddress(int addressId);
boolean compare(int addressId, int spanLayer, int serverType);
}
......@@ -22,7 +22,7 @@ import java.util.Properties;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.guava.service.ApplicationCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.guava.service.InstanceCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.guava.service.NetworAddressCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.guava.service.NetworkAddressCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.guava.service.ServiceIdCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.guava.service.ServiceNameCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
......@@ -53,7 +53,7 @@ public class CacheModuleGuavaProvider extends ModuleProvider {
this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheGuavaService(getManager()));
this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheGuavaService(getManager()));
this.registerServiceImplementation(ServiceNameCacheService.class, new ServiceNameCacheGuavaService(getManager()));
this.registerServiceImplementation(NetworkAddressCacheService.class, new NetworAddressCacheGuavaService(getManager()));
this.registerServiceImplementation(NetworkAddressCacheService.class, new NetworkAddressCacheGuavaService(getManager()));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......
......@@ -27,22 +27,23 @@ import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NetworAddressCacheGuavaService implements NetworkAddressCacheService {
public class NetworkAddressCacheGuavaService implements NetworkAddressCacheService {
private final Logger logger = LoggerFactory.getLogger(NetworAddressCacheGuavaService.class);
private final Logger logger = LoggerFactory.getLogger(NetworkAddressCacheGuavaService.class);
private final Cache<String, Integer> addressCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final ModuleManager moduleManager;
private INetworkAddressCacheDAO networkAddressCacheDAO;
public NetworAddressCacheGuavaService(ModuleManager moduleManager) {
public NetworkAddressCacheGuavaService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
......@@ -57,16 +58,17 @@ public class NetworAddressCacheGuavaService implements NetworkAddressCacheServic
int addressId = 0;
try {
addressId = addressCache.get(networkAddress, () -> getNetworkAddressCacheDAO().getAddressId(networkAddress));
if (addressId == 0) {
addressId = getNetworkAddressCacheDAO().getAddressId(networkAddress);
if (addressId != 0) {
addressCache.put(networkAddress, addressId);
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (addressId == 0) {
addressId = getNetworkAddressCacheDAO().getAddressId(networkAddress);
if (addressId != 0) {
addressCache.put(networkAddress, addressId);
}
}
return addressId;
}
......@@ -75,17 +77,33 @@ public class NetworAddressCacheGuavaService implements NetworkAddressCacheServic
public String getAddress(int addressId) {
String networkAddress = Const.EMPTY_STRING;
try {
networkAddress = idCache.get(addressId, () -> getNetworkAddressCacheDAO().getAddress(addressId));
networkAddress = idCache.get(addressId, () -> getNetworkAddressCacheDAO().getAddressById(addressId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (StringUtils.isEmpty(networkAddress)) {
networkAddress = getNetworkAddressCacheDAO().getAddress(addressId);
networkAddress = getNetworkAddressCacheDAO().getAddressById(addressId);
if (StringUtils.isNotEmpty(networkAddress)) {
addressCache.put(networkAddress, addressId);
idCache.put(addressId, networkAddress);
}
}
return networkAddress;
}
private final Cache<Integer, NetworkAddress> addressObjCache = CacheBuilder.newBuilder().maximumSize(5000).build();
@Override public boolean compare(int addressId, int spanLayer, int serverType) {
try {
NetworkAddress address = addressObjCache.get(addressId, () -> getNetworkAddressCacheDAO().getAddress(addressId));
if (ObjectUtils.isNotEmpty(address)) {
if (spanLayer != address.getSpanLayer() || serverType != address.getServerType()) {
return false;
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return true;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.collector.storage.dao.cache;
import org.apache.skywalking.apm.collector.storage.base.dao.DAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
/**
* @author peng-yongsheng
......@@ -26,5 +27,7 @@ import org.apache.skywalking.apm.collector.storage.base.dao.DAO;
public interface INetworkAddressCacheDAO extends DAO {
int getAddressId(String networkAddress);
String getAddress(int addressId);
String getAddressById(int addressId);
NetworkAddress getAddress(int addressId);
}
......@@ -30,4 +30,6 @@ public interface INetworkAddressRegisterDAO extends DAO {
int getMinNetworkAddressId();
void save(NetworkAddress networkAddress);
void update(String id, int spanLayer, int serverType);
}
......@@ -18,11 +18,15 @@
package org.apache.skywalking.apm.collector.storage.dao.ui;
import java.util.List;
import org.apache.skywalking.apm.collector.storage.base.dao.DAO;
import org.apache.skywalking.apm.collector.storage.ui.overview.ConjecturalApp;
/**
* @author peng-yongsheng
*/
public interface INetworkAddressUIDAO extends DAO {
int getNumOfSpanLayer(int spanLayer);
List<ConjecturalApp> getConjecturalApps();
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.table.register;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
/**
......@@ -39,7 +40,8 @@ public class NetworkAddress extends StreamData {
private static final Column[] INTEGER_COLUMNS = {
new Column(NetworkAddressTable.COLUMN_ADDRESS_ID, new NonOperation()),
new Column(NetworkAddressTable.COLUMN_SPAN_LAYER, new NonOperation()),
new Column(NetworkAddressTable.COLUMN_SPAN_LAYER, new CoverOperation()),
new Column(NetworkAddressTable.COLUMN_SERVER_TYPE, new CoverOperation()),
};
private static final Column[] BYTE_COLUMNS = {};
......@@ -87,4 +89,12 @@ public class NetworkAddress extends StreamData {
public void setSpanLayer(Integer spanLayer) {
setDataInteger(1, spanLayer);
}
public Integer getServerType() {
return getDataInteger(2);
}
public void setServerType(Integer serverType) {
setDataInteger(2, serverType);
}
}
......@@ -27,5 +27,6 @@ public class NetworkAddressTable extends CommonTable {
public static final String TABLE = "network_address";
public static final String COLUMN_NETWORK_ADDRESS = "network_address";
public static final String COLUMN_SPAN_LAYER = "span_layer";
public static final String COLUMN_SERVER_TYPE = "server_type";
public static final String COLUMN_ADDRESS_ID = "address_id";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.table.register;
/**
* @author peng-yongsheng
*/
public class ServerType {
private int componentId;
private int id;
private String name;
public ServerType(int componentId, int id, String name) {
this.componentId = componentId;
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
public int getComponentId() {
return componentId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.table.register;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* @author peng-yongsheng
*/
public class ServerTypeDefine {
private static ServerTypeDefine INSTANCE = new ServerTypeDefine();
private String[] serverTypeNames;
private ServerType[] serverTypes;
private ServerTypeDefine() {
this.serverTypes = new ServerType[28];
this.serverTypeNames = new String[11];
addServerType(new ServerType(ComponentsDefine.TOMCAT.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.HTTPCLIENT.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.DUBBO.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.H2.getId(), 1, ComponentsDefine.H2.getName()));
addServerType(new ServerType(ComponentsDefine.MYSQL.getId(), 2, ComponentsDefine.MYSQL.getName()));
addServerType(new ServerType(ComponentsDefine.ORACLE.getId(), 3, ComponentsDefine.ORACLE.getName()));
addServerType(new ServerType(ComponentsDefine.REDIS.getId(), 4, ComponentsDefine.REDIS.getName()));
addServerType(new ServerType(ComponentsDefine.MOTAN.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.MONGODB.getId(), 5, ComponentsDefine.MONGODB.getName()));
addServerType(new ServerType(ComponentsDefine.RESIN.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.FEIGN.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.OKHTTP.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.SPRING_REST_TEMPLATE.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.SPRING_MVC_ANNOTATION.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.STRUTS2.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.NUTZ_MVC_ANNOTATION.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.NUTZ_HTTP.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.JETTY_CLIENT.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.JETTY_SERVER.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.MEMCACHED.getId(), 6, ComponentsDefine.MEMCACHED.getName()));
addServerType(new ServerType(ComponentsDefine.SHARDING_JDBC.getId(), 7, ComponentsDefine.SHARDING_JDBC.getName()));
addServerType(new ServerType(ComponentsDefine.POSTGRESQL.getId(), 8, ComponentsDefine.POSTGRESQL.getName()));
addServerType(new ServerType(ComponentsDefine.GRPC.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.ELASTIC_JOB.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.ROCKET_MQ.getId(), 9, ComponentsDefine.ROCKET_MQ.getName()));
addServerType(new ServerType(ComponentsDefine.HTTP_ASYNC_CLIENT.getId(), Const.NONE, Const.EMPTY_STRING));
addServerType(new ServerType(ComponentsDefine.KAFKA.getId(), 10, ComponentsDefine.KAFKA.getName()));
}
public static ServerTypeDefine getInstance() {
return INSTANCE;
}
private void addServerType(ServerType serverType) {
serverTypeNames[serverType.getId()] = serverType.getName();
serverTypes[serverType.getComponentId()] = serverType;
}
public int getServerTypeId(int componentId) {
return serverTypes[componentId].getId();
}
public String getServerType(int serverTypeId) {
return serverTypeNames[serverTypeId];
}
}
......@@ -22,9 +22,18 @@ package org.apache.skywalking.apm.collector.storage.ui.overview;
* @author peng-yongsheng
*/
public class ConjecturalApp {
private int id;
private String name;
private int num;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.collector.storage.ui.overview;
import java.util.LinkedList;
import java.util.List;
/**
......@@ -26,6 +27,10 @@ import java.util.List;
public class ConjecturalAppBrief {
private List<ConjecturalApp> apps;
public ConjecturalAppBrief() {
this.apps = new LinkedList<>();
}
public List<ConjecturalApp> getApps() {
return apps;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.table.register;
import java.lang.reflect.Field;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.network.trace.component.OfficialComponent;
import org.junit.Test;
/**
* @author peng-yongsheng
*/
public class ServerTypeDefineTestCase {
@Test
public void check() throws IllegalAccessException {
Field[] fields = ComponentsDefine.class.getDeclaredFields();
for (Field field : fields) {
if (field.getType().equals(OfficialComponent.class)) {
OfficialComponent component = (OfficialComponent)field.get(ComponentsDefine.getInstance());
ServerTypeDefine.getInstance().getServerTypeId(component.getId());
}
}
}
}
......@@ -16,20 +16,19 @@
*
*/
package org.apache.skywalking.apm.collector.storage.es.base.define;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.ColumnDefine;
import org.apache.skywalking.apm.collector.core.data.TableDefine;
import org.apache.skywalking.apm.collector.storage.StorageInstaller;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.core.data.ColumnDefine;
import org.apache.skywalking.apm.collector.core.data.TableDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -82,9 +81,7 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
.put("index.number_of_replicas", indexReplicasNumber)
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")
.put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer")
.put("analysis.tokenizer.collector_tokenizer.type", "standard")
.put("analysis.tokenizer.collector_tokenizer.max_token_length", 5)
.put("analysis.analyzer.collector_analyzer.type", "stop")
.build();
}
......@@ -100,7 +97,7 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
mappingBuilder
.startObject(elasticSearchColumnDefine.getName())
.field("type", elasticSearchColumnDefine.getType().toLowerCase())
.field("fielddata", true)
.field("analyzer", "collector_analyzer")
.endObject();
} else {
mappingBuilder
......
......@@ -22,6 +22,7 @@ import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchCli
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddressTable;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
......@@ -56,12 +57,12 @@ public class NetworkAddressEsCacheDAO extends EsDAO implements INetworkAddressCa
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
return (int)searchHit.getSource().get(NetworkAddressTable.COLUMN_ADDRESS_ID);
return ((Number)searchHit.getSource().get(NetworkAddressTable.COLUMN_ADDRESS_ID)).intValue();
}
return 0;
return Const.NONE;
}
@Override public String getAddress(int addressId) {
@Override public String getAddressById(int addressId) {
logger.debug("get network address, address id: {}", addressId);
ElasticSearchClient client = getClient();
GetRequestBuilder getRequestBuilder = client.prepareGet(NetworkAddressTable.TABLE, String.valueOf(addressId));
......@@ -72,4 +73,21 @@ public class NetworkAddressEsCacheDAO extends EsDAO implements INetworkAddressCa
}
return Const.EMPTY_STRING;
}
@Override public NetworkAddress getAddress(int addressId) {
ElasticSearchClient client = getClient();
GetRequestBuilder getRequestBuilder = client.prepareGet(NetworkAddressTable.TABLE, String.valueOf(addressId));
GetResponse getResponse = getRequestBuilder.get();
if (getResponse.isExists()) {
NetworkAddress address = new NetworkAddress();
address.setId((String)getResponse.getSource().get(NetworkAddressTable.COLUMN_ID));
address.setAddressId(((Number)getResponse.getSource().get(NetworkAddressTable.COLUMN_ADDRESS_ID)).intValue());
address.setSpanLayer(((Number)getResponse.getSource().get(NetworkAddressTable.COLUMN_SPAN_LAYER)).intValue());
address.setServerType(((Number)getResponse.getSource().get(NetworkAddressTable.COLUMN_SERVER_TYPE)).intValue());
address.setNetworkAddress((String)getResponse.getSource().get(NetworkAddressTable.COLUMN_NETWORK_ADDRESS));
return address;
}
return null;
}
}
......@@ -56,8 +56,18 @@ public class NetworkAddressRegisterEsDAO extends EsDAO implements INetworkAddres
source.put(NetworkAddressTable.COLUMN_NETWORK_ADDRESS, networkAddress.getNetworkAddress());
source.put(NetworkAddressTable.COLUMN_ADDRESS_ID, networkAddress.getAddressId());
source.put(NetworkAddressTable.COLUMN_SPAN_LAYER, networkAddress.getSpanLayer());
source.put(NetworkAddressTable.COLUMN_SERVER_TYPE, networkAddress.getServerType());
IndexResponse response = client.prepareIndex(NetworkAddressTable.TABLE, networkAddress.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save network address register info, address getId: {}, network address code: {}, status: {}", networkAddress.getAddressId(), networkAddress.getNetworkAddress(), response.status().name());
}
@Override public void update(String id, int spanLayer, int serverType) {
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(NetworkAddressTable.COLUMN_SPAN_LAYER, spanLayer);
source.put(NetworkAddressTable.COLUMN_SERVER_TYPE, serverType);
client.prepareUpdate(NetworkAddressTable.TABLE, id).setDoc(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
}
......@@ -18,14 +18,21 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddressTable;
import org.apache.skywalking.apm.collector.storage.ui.overview.ConjecturalApp;
import org.apache.skywalking.apm.network.proto.SpanLayer;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
/**
* @author peng-yongsheng
......@@ -46,4 +53,31 @@ public class NetworkAddressEsUIDAO extends EsDAO implements INetworkAddressUIDAO
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
return (int)searchResponse.getHits().getTotalHits();
}
@Override public List<ConjecturalApp> getConjecturalApps() {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(NetworkAddressTable.TABLE);
searchRequestBuilder.setTypes(NetworkAddressTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
int[] spanLayers = new int[] {SpanLayer.Database_VALUE, SpanLayer.Cache_VALUE, SpanLayer.MQ_VALUE};
searchRequestBuilder.setQuery(QueryBuilders.termsQuery(NetworkAddressTable.COLUMN_SPAN_LAYER, spanLayers));
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(NetworkAddressTable.COLUMN_SERVER_TYPE).field(NetworkAddressTable.COLUMN_SERVER_TYPE).size(100);
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<ConjecturalApp> conjecturalApps = new LinkedList<>();
Terms serverTypeTerms = searchResponse.getAggregations().get(NetworkAddressTable.COLUMN_SERVER_TYPE);
serverTypeTerms.getBuckets().forEach(serverTypeTerm -> {
int serverType = serverTypeTerm.getKeyAsNumber().intValue();
ConjecturalApp conjecturalApp = new ConjecturalApp();
conjecturalApp.setId(serverType);
conjecturalApp.setNum((int)serverTypeTerm.getDocCount());
conjecturalApps.add(conjecturalApp);
});
return conjecturalApps;
}
}
......@@ -38,5 +38,7 @@ public class NetworkAddressEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NetworkAddressTable.COLUMN_NETWORK_ADDRESS, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NetworkAddressTable.COLUMN_ADDRESS_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NetworkAddressTable.COLUMN_SPAN_LAYER, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NetworkAddressTable.COLUMN_SERVER_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
......@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddressTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,6 +48,7 @@ public class NetworkAddressH2CacheDAO extends H2DAO implements INetworkAddressCa
public int getAddressId(String networkAddress) {
logger.info("get the address id with network address = {}", networkAddress);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_ADDRESS_ID_OR_CODE_SQL, NetworkAddressTable.COLUMN_ADDRESS_ID, NetworkAddressTable.TABLE, NetworkAddressTable.COLUMN_NETWORK_ADDRESS);
Object[] params = new Object[] {networkAddress};
......@@ -57,10 +59,10 @@ public class NetworkAddressH2CacheDAO extends H2DAO implements INetworkAddressCa
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return 0;
return Const.NONE;
}
@Override public String getAddress(int addressId) {
@Override public String getAddressById(int addressId) {
logger.debug("get network address, address id: {}", addressId);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_ADDRESS_ID_OR_CODE_SQL, NetworkAddressTable.COLUMN_NETWORK_ADDRESS, NetworkAddressTable.TABLE, NetworkAddressTable.COLUMN_ADDRESS_ID);
......@@ -74,4 +76,27 @@ public class NetworkAddressH2CacheDAO extends H2DAO implements INetworkAddressCa
}
return Const.EMPTY_STRING;
}
@Override public NetworkAddress getAddress(int addressId) {
logger.debug("get network address, address id: {}", addressId);
H2Client client = getClient();
String dynamicSql = "select * from {0} where {1} = ?";
String sql = SqlBuilder.buildSql(dynamicSql, NetworkAddressTable.TABLE, NetworkAddressTable.COLUMN_ADDRESS_ID);
Object[] params = new Object[] {addressId};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
NetworkAddress networkAddress = new NetworkAddress();
networkAddress.setId(rs.getString(NetworkAddressTable.COLUMN_ID));
networkAddress.setAddressId(rs.getInt(NetworkAddressTable.COLUMN_ADDRESS_ID));
networkAddress.setNetworkAddress(rs.getString(NetworkAddressTable.COLUMN_NETWORK_ADDRESS));
networkAddress.setSpanLayer(rs.getInt(NetworkAddressTable.COLUMN_SPAN_LAYER));
networkAddress.setServerType(rs.getInt(NetworkAddressTable.COLUMN_SERVER_TYPE));
return networkAddress;
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return null;
}
}
......@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddressTable;
import org.slf4j.Logger;
......@@ -60,6 +61,7 @@ public class NetworkAddressRegisterH2DAO extends H2DAO implements INetworkAddres
source.put(NetworkAddressTable.COLUMN_NETWORK_ADDRESS, networkAddress.getNetworkAddress());
source.put(NetworkAddressTable.COLUMN_ADDRESS_ID, networkAddress.getAddressId());
source.put(NetworkAddressTable.COLUMN_SPAN_LAYER, networkAddress.getSpanLayer());
source.put(NetworkAddressTable.COLUMN_SERVER_TYPE, networkAddress.getServerType());
String sql = SqlBuilder.buildBatchInsertSql(NetworkAddressTable.TABLE, source.keySet());
Object[] params = source.values().toArray(new Object[0]);
......@@ -69,4 +71,21 @@ public class NetworkAddressRegisterH2DAO extends H2DAO implements INetworkAddres
logger.error(e.getMessage(), e);
}
}
@Override public void update(String id, int spanLayer, int serverType) {
H2Client client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(NetworkAddressTable.COLUMN_SPAN_LAYER, spanLayer);
source.put(NetworkAddressTable.COLUMN_SERVER_TYPE, serverType);
String sql = SqlBuilder.buildBatchUpdateSql(InstanceTable.TABLE, source.keySet(), InstanceTable.COLUMN_INSTANCE_ID);
Object[] params = source.values().toArray(new Object[] {id});
try {
client.execute(sql, params);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -20,12 +20,14 @@ package org.apache.skywalking.apm.collector.storage.h2.dao.ui;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddressTable;
import org.apache.skywalking.apm.collector.storage.ui.overview.ConjecturalApp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -54,4 +56,8 @@ public class NetworkAddressH2UIDAO extends H2DAO implements INetworkAddressUIDAO
}
return 0;
}
@Override public List<ConjecturalApp> getConjecturalApps() {
return null;
}
}
......@@ -34,6 +34,8 @@ public class NetworkAddressH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(NetworkAddressTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NetworkAddressTable.COLUMN_NETWORK_ADDRESS, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NetworkAddressTable.COLUMN_SPAN_LAYER, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NetworkAddressTable.COLUMN_SERVER_TYPE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NetworkAddressTable.COLUMN_ADDRESS_ID, H2ColumnDefine.Type.Int.name()));
}
}
......@@ -115,8 +115,11 @@ public class OverViewLayerQuery implements Query {
return getAlarmService().getApplicationAlarmTrend(duration.getStep(), start, end);
}
public ConjecturalAppBrief getConjecturalApps(Duration duration) {
return null;
public ConjecturalAppBrief getConjecturalApps(Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getStart());
long end = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getEnd());
return getApplicationService().getConjecturalApps(duration.getStep(), start, end);
}
public List<ServiceMetric> getTopNSlowService(Duration duration, int topN) throws ParseException {
......
......@@ -27,11 +27,15 @@ import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.register.ServerTypeDefine;
import org.apache.skywalking.apm.collector.storage.ui.application.Application;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.overview.ApplicationTPS;
import org.apache.skywalking.apm.collector.storage.ui.overview.ConjecturalApp;
import org.apache.skywalking.apm.collector.storage.ui.overview.ConjecturalAppBrief;
import org.apache.skywalking.apm.collector.storage.ui.service.ServiceMetric;
/**
......@@ -42,6 +46,7 @@ public class ApplicationService {
private final IInstanceUIDAO instanceDAO;
private final IServiceMetricUIDAO serviceMetricUIDAO;
private final IApplicationMetricUIDAO applicationMetricUIDAO;
private final INetworkAddressUIDAO networkAddressUIDAO;
private final ApplicationCacheService applicationCacheService;
private final ServiceNameCacheService serviceNameCacheService;
......@@ -49,6 +54,7 @@ public class ApplicationService {
this.instanceDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceUIDAO.class);
this.serviceMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceMetricUIDAO.class);
this.applicationMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationMetricUIDAO.class);
this.networkAddressUIDAO = moduleManager.find(StorageModule.NAME).getService(INetworkAddressUIDAO.class);
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
}
......@@ -84,4 +90,17 @@ public class ApplicationService {
});
return applicationThroughput;
}
public ConjecturalAppBrief getConjecturalApps(Step step, long start, long end) throws ParseException {
List<ConjecturalApp> conjecturalApps = networkAddressUIDAO.getConjecturalApps();
conjecturalApps.forEach(conjecturalApp -> {
String name = ServerTypeDefine.getInstance().getServerType(conjecturalApp.getId());
conjecturalApp.setName(name);
});
ConjecturalAppBrief conjecturalAppBrief = new ConjecturalAppBrief();
conjecturalAppBrief.setApps(conjecturalApps);
return conjecturalAppBrief;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册