未验证 提交 c7884694 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Move node type from network address to service level[Need storage module update] (#2051)

* Change srcLayer to nodeType. Add nodeType in service inventory entity.

* NodeType value refactor.

* Format codes.

* Fix compile issue.

* Fix bugs, and make query on service inventory instead of network inventory
上级 2840f8e5
......@@ -20,11 +20,10 @@ package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -68,9 +67,9 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
ClusterBrief clusterBrief = new ClusterBrief();
clusterBrief.setNumOfService(getMetadataQueryDAO().numOfService(startTimestamp, endTimestamp));
clusterBrief.setNumOfEndpoint(getMetadataQueryDAO().numOfEndpoint(startTimestamp, endTimestamp));
clusterBrief.setNumOfDatabase(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, SpanLayer.Database_VALUE));
clusterBrief.setNumOfCache(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, SpanLayer.Cache_VALUE));
clusterBrief.setNumOfMQ(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, SpanLayer.MQ_VALUE));
clusterBrief.setNumOfDatabase(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, NodeType.Database.value()));
clusterBrief.setNumOfCache(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, NodeType.Cache.value()));
clusterBrief.setNumOfMQ(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, NodeType.MQ.value()));
return clusterBrief;
}
......
......@@ -40,10 +40,18 @@ public class NetworkAddressInventory extends RegisterSource {
public static final String MODEL_NAME = "network_address_inventory";
private static final String NAME = "name";
public static final String SRC_LAYER = "src_layer";
public static final String NODE_TYPE = "node_type";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = SRC_LAYER) private int srcLayer;
@Setter(AccessLevel.PRIVATE) @Getter(AccessLevel.PRIVATE) @Column(columnName = NODE_TYPE) private int nodeType;
public void setNetworkAddressNodeType(NodeType nodeType) {
this.nodeType = nodeType.value();
}
public NodeType getNetworkAddressNodeType() {
return NodeType.get(this.nodeType);
}
public static String buildId(String networkAddress) {
return networkAddress;
......@@ -77,13 +85,13 @@ public class NetworkAddressInventory extends RegisterSource {
@Override public void combine(RegisterSource registerSource) {
super.combine(registerSource);
NetworkAddressInventory inventory = (NetworkAddressInventory)registerSource;
setSrcLayer(inventory.srcLayer);
setNodeType(inventory.nodeType);
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataIntegers(getSequence());
remoteBuilder.addDataIntegers(getSrcLayer());
remoteBuilder.addDataIntegers(getNodeType());
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
......@@ -94,7 +102,7 @@ public class NetworkAddressInventory extends RegisterSource {
@Override public void deserialize(RemoteData remoteData) {
setSequence(remoteData.getDataIntegers(0));
setSrcLayer(remoteData.getDataIntegers(1));
setNodeType(remoteData.getDataIntegers(1));
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
......@@ -112,7 +120,7 @@ public class NetworkAddressInventory extends RegisterSource {
NetworkAddressInventory inventory = new NetworkAddressInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setName((String)dbMap.get(NAME));
inventory.setSrcLayer((Integer)dbMap.get(SRC_LAYER));
inventory.setNodeType((Integer)dbMap.get(NODE_TYPE));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return inventory;
......@@ -122,7 +130,7 @@ public class NetworkAddressInventory extends RegisterSource {
Map<String, Object> map = new HashMap<>();
map.put(SEQUENCE, storageData.getSequence());
map.put(NAME, storageData.getName());
map.put(SRC_LAYER, storageData.getSrcLayer());
map.put(NODE_TYPE, storageData.getNodeType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
return map;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register;
import org.apache.skywalking.oap.server.core.UnexpectedException;
/**
* Node type describe which kind of node of Service or Network address represents to.
*
* The value comes from 'org.apache.skywalking.apm.network.language.agent.SpanLayer' at first place, but most likely it
* will extend and be used directly from different sources, such as Mesh.
*
* @author wusheng
*/
public enum NodeType {
/**
* <code>Unknown = 0;</code>
*/
Unknown(0),
/**
* <code>Database = 1;</code>
*/
Database(1),
/**
* <code>RPCFramework = 2;</code>
*/
RPCFramework(2),
/**
* <code>Http = 3;</code>
*/
Http(3),
/**
* <code>MQ = 4;</code>
*/
MQ(4),
/**
* <code>Cache = 5;</code>
*/
Cache(5),
UNRECOGNIZED(-1);
private final int value;
NodeType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static NodeType get(int value) {
switch (value) {
case 0:
return Unknown;
case 1:
return Database;
case 2:
return RPCFramework;
case 3:
return Http;
case 4:
return MQ;
case 5:
return Cache;
case -1:
return UNRECOGNIZED;
default:
throw new UnexpectedException("Unknown NodeType value");
}
}
/**
* Right now, spanLayerValue is exact same as NodeType value.
*
* @param spanLayerValue
* @return
*/
public static NodeType fromSpanLayerValue(int spanLayerValue) {
return get(spanLayerValue);
}
}
......@@ -43,15 +43,21 @@ public class ServiceInventory extends RegisterSource {
public static final String NAME = "name";
public static final String IS_ADDRESS = "is_address";
private static final String ADDRESS_ID = "address_id";
public static final String NODE_TYPE = "node_type";
public static final String MAPPING_SERVICE_ID = "mapping_service_id";
public static final String MAPPING_LAST_UPDATE_TIME = "mapping_last_update_time";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress;
@Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId;
@Setter(AccessLevel.PRIVATE) @Getter(AccessLevel.PRIVATE) @Column(columnName = NODE_TYPE) private int nodeType;
@Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId;
@Setter @Getter @Column(columnName = MAPPING_LAST_UPDATE_TIME) private long mappingLastUpdateTime;
public NodeType getServiceNodeType() {
return NodeType.get(this.nodeType);
}
public static String buildId(String serviceName) {
return serviceName + Const.ID_SPLIT + BooleanUtils.FALSE + Const.ID_SPLIT + Const.NONE;
}
......@@ -60,6 +66,10 @@ public class ServiceInventory extends RegisterSource {
return BooleanUtils.TRUE + Const.ID_SPLIT + addressId;
}
public void setServiceNodeType(NodeType nodeType) {
this.nodeType = nodeType.value();
}
@Override public String id() {
if (BooleanUtils.TRUE == isAddress) {
return buildId(addressId);
......@@ -83,6 +93,7 @@ public class ServiceInventory extends RegisterSource {
inventory.setHeartbeatTime(getHeartbeatTime());
inventory.setName(name);
inventory.setIsAddress(isAddress);
inventory.setNodeType(nodeType);
inventory.setAddressId(addressId);
inventory.setMappingLastUpdateTime(mappingLastUpdateTime);
inventory.setMappingServiceId(mappingServiceId);
......@@ -115,6 +126,7 @@ public class ServiceInventory extends RegisterSource {
remoteBuilder.addDataIntegers(isAddress);
remoteBuilder.addDataIntegers(addressId);
remoteBuilder.addDataIntegers(mappingServiceId);
remoteBuilder.addDataIntegers(nodeType);
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
......@@ -129,6 +141,7 @@ public class ServiceInventory extends RegisterSource {
setIsAddress(remoteData.getDataIntegers(1));
setAddressId(remoteData.getDataIntegers(2));
setMappingServiceId(remoteData.getDataIntegers(3));
setNodeType(remoteData.getDataIntegers(4));
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
......@@ -144,6 +157,7 @@ public class ServiceInventory extends RegisterSource {
@Override public void combine(RegisterSource registerSource) {
super.combine(registerSource);
ServiceInventory serviceInventory = (ServiceInventory)registerSource;
nodeType = serviceInventory.nodeType;
if (Const.NONE != serviceInventory.getMappingServiceId() && serviceInventory.getMappingLastUpdateTime() >= this.getMappingLastUpdateTime()) {
this.mappingServiceId = serviceInventory.getMappingServiceId();
this.mappingLastUpdateTime = serviceInventory.getMappingLastUpdateTime();
......@@ -159,6 +173,7 @@ public class ServiceInventory extends RegisterSource {
inventory.setMappingServiceId((Integer)dbMap.get(MAPPING_SERVICE_ID));
inventory.setName((String)dbMap.get(NAME));
inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
inventory.setNodeType((Integer)dbMap.get(NODE_TYPE));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
inventory.setMappingLastUpdateTime((Long)dbMap.get(MAPPING_LAST_UPDATE_TIME));
......@@ -172,6 +187,7 @@ public class ServiceInventory extends RegisterSource {
map.put(MAPPING_SERVICE_ID, storageData.getMappingServiceId());
map.put(NAME, storageData.getName());
map.put(ADDRESS_ID, storageData.getAddressId());
map.put(NODE_TYPE, storageData.getNodeType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
map.put(MAPPING_LAST_UPDATE_TIME, storageData.getMappingLastUpdateTime());
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.library.module.Service;
/**
......@@ -30,5 +31,5 @@ public interface INetworkAddressInventoryRegister extends Service {
void heartbeat(int addressId, long heartBeatTime);
void update(int addressId, int srcLayer);
void update(int addressId, NodeType nodeType);
}
......@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.register.service;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
......@@ -36,6 +36,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryRegister.class);
private final ModuleManager moduleManager;
private ServiceInventoryCache serviceInventoryCache;
private NetworkAddressInventoryCache networkAddressInventoryCache;
private IServiceInventoryRegister serviceInventoryRegister;
private IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
......@@ -44,6 +45,13 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
this.moduleManager = moduleManager;
}
private ServiceInventoryCache getServiceInventoryCache() {
if (isNull(serviceInventoryCache)) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
}
return this.serviceInventoryCache;
}
private NetworkAddressInventoryCache getNetworkAddressInventoryCache() {
if (isNull(networkAddressInventoryCache)) {
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
......@@ -107,21 +115,36 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
}
}
@Override public void update(int addressId, int srcLayer) {
if (!this.compare(addressId, srcLayer)) {
@Override public void update(int addressId, NodeType nodeType) {
NetworkAddressInventory networkAddress = getNetworkAddressInventoryCache().get(addressId);
if (!this.compare(networkAddress, nodeType)) {
NetworkAddressInventory newNetworkAddress = getNetworkAddressInventoryCache().get(addressId);
newNetworkAddress.setSrcLayer(srcLayer);
newNetworkAddress.setNetworkAddressNodeType(nodeType);
newNetworkAddress.setHeartbeatTime(System.currentTimeMillis());
InventoryProcess.INSTANCE.in(newNetworkAddress);
}
ServiceInventory newServiceInventory = getServiceInventoryCache().get(getServiceInventoryCache().getServiceId(networkAddress.getSequence()));
if (!this.compare(newServiceInventory, nodeType)) {
newServiceInventory.setServiceNodeType(nodeType);
newServiceInventory.setHeartbeatTime(System.currentTimeMillis());
InventoryProcess.INSTANCE.in(newServiceInventory);
}
}
private boolean compare(int addressId, int srcLayer) {
NetworkAddressInventory networkAddress = getNetworkAddressInventoryCache().get(addressId);
private boolean compare(NetworkAddressInventory newNetworkAddress, NodeType nodeType) {
if (Objects.nonNull(newNetworkAddress)) {
return nodeType == newNetworkAddress.getNetworkAddressNodeType();
}
return true;
}
if (Objects.nonNull(networkAddress)) {
return srcLayer == networkAddress.getSrcLayer();
private boolean compare(ServiceInventory newServiceInventory, NodeType nodeType) {
if (Objects.nonNull(newServiceInventory)) {
return nodeType == newServiceInventory.getServiceNodeType();
}
return true;
}
......
......@@ -32,7 +32,7 @@ public interface IMetadataQueryDAO extends DAO {
int numOfEndpoint(final long startTimestamp, final long endTimestamp) throws IOException;
int numOfConjectural(final long startTimestamp, final long endTimestamp, final int srcLayer) throws IOException;
int numOfConjectural(final long startTimestamp, final long endTimestamp, final int nodeTypeValue) throws IOException;
List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException;
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -81,8 +82,8 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
standardBuilder.setPeerId(peerId);
standardBuilder.setPeer(Const.EMPTY_STRING);
int spanLayer = standardBuilder.getSpanLayerValue();
networkAddressInventoryRegister.update(peerId, spanLayer);
int spanLayerValue = standardBuilder.getSpanLayerValue();
networkAddressInventoryRegister.update(peerId, NodeType.fromSpanLayerValue(spanLayerValue));
}
}
......
......@@ -72,13 +72,13 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
return (int)response.getHits().getTotalHits();
}
@Override public int numOfConjectural(long startTimestamp, long endTimestamp, int srcLayer) throws IOException {
@Override public int numOfConjectural(long startTimestamp, long endTimestamp, int nodeTypeValue) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(NetworkAddressInventory.SRC_LAYER, srcLayer));
sourceBuilder.query(QueryBuilders.termQuery(ServiceInventory.NODE_TYPE, nodeTypeValue));
sourceBuilder.size(0);
SearchResponse response = getClient().search(NetworkAddressInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
return (int)response.getHits().getTotalHits();
}
......
......@@ -78,12 +78,12 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
}
@Override public int numOfConjectural(long startTimestamp, long endTimestamp,
int srcLayer) throws IOException {
int nodeTypeValue) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(NetworkAddressInventory.MODEL_NAME).append(" where ");
sql.append(NetworkAddressInventory.SRC_LAYER).append("=?");
condition.add(srcLayer);
sql.append("select count(*) num from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append(ServiceInventory.NODE_TYPE).append("=?");
condition.add(nodeTypeValue);
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册