diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java index 566198c8f159edfe194bd54acaff4be99169b051..d91a47ac972f64384d7aefb64f21c5e7cbcde2ad 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java @@ -36,5 +36,6 @@ public class Const { public static final String SEGMENT_SPAN_SPLIT = "S"; public static final String UNKNOWN = "Unknown"; public static final String EMPTY_STRING = ""; + public static final String EMPTY_JSON_OBJECT_STRING = "{}"; public static final String DOMAIN_OPERATION_NAME = "{domain}"; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java index 5aa9dbd53509f78f2d57880bc0423575e47ef36b..edd4ccf19b940e640fd91f4835ba8ffc957c3e58 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java @@ -22,10 +22,10 @@ import java.util.*; import java.util.concurrent.*; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.register.ServiceInventory; +import org.apache.skywalking.oap.server.core.register.*; import org.apache.skywalking.oap.server.core.storage.StorageModule; -import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.core.storage.cache.*; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.*; /** @@ -38,34 +38,60 @@ public enum CacheUpdateTimer { private Boolean isStarted = false; - public void start(ModuleManager moduleManager) { - logger.info("Cache update timer start"); + public void start(ModuleDefineHolder moduleDefineHolder) { + logger.info("Cache updateServiceInventory timer start"); - final long timeInterval = 3; + final long timeInterval = 10; if (!isStarted) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( - new RunnableWithExceptionProtection(() -> update(moduleManager), + new RunnableWithExceptionProtection(() -> update(moduleDefineHolder), t -> logger.error("Cache update failure.", t)), 1, timeInterval, TimeUnit.SECONDS); this.isStarted = true; } } - private void update(ModuleManager moduleManager) { - IServiceInventoryCacheDAO serviceInventoryCacheDAO = moduleManager.find(StorageModule.NAME).provider().getService(IServiceInventoryCacheDAO.class); - ServiceInventoryCache serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); - List serviceInventories = serviceInventoryCacheDAO.loadLastMappingUpdate(); + private void update(ModuleDefineHolder moduleDefineHolder) { + updateServiceInventory(moduleDefineHolder); + updateNetAddressInventory(moduleDefineHolder); + } + + private void updateServiceInventory(ModuleDefineHolder moduleDefineHolder) { + IServiceInventoryCacheDAO serviceInventoryCacheDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IServiceInventoryCacheDAO.class); + ServiceInventoryCache serviceInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); + List serviceInventories = serviceInventoryCacheDAO.loadLastUpdate(System.currentTimeMillis() - 60000); serviceInventories.forEach(serviceInventory -> { - logger.info("Update mapping service id in the cache of service inventory, service id: {}, mapping service id: {}", serviceInventory.getSequence(), serviceInventory.getMappingServiceId()); ServiceInventory cache = serviceInventoryCache.get(serviceInventory.getSequence()); if (Objects.nonNull(cache)) { - cache.setMappingServiceId(serviceInventory.getMappingServiceId()); - cache.setMappingLastUpdateTime(serviceInventory.getMappingLastUpdateTime()); + if (cache.getMappingServiceId() != serviceInventory.getMappingServiceId()) { + cache.setMappingServiceId(serviceInventory.getMappingServiceId()); + cache.setServiceNodeType(serviceInventory.getServiceNodeType()); + cache.setProperties(serviceInventory.getProperties()); + logger.info("Update the cache of service inventory, service id: {}", serviceInventory.getSequence()); + } } else { logger.warn("Unable to found the id of {} in service inventory cache.", serviceInventory.getSequence()); } }); } + + private void updateNetAddressInventory(ModuleDefineHolder moduleDefineHolder) { + INetworkAddressInventoryCacheDAO addressInventoryCacheDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(INetworkAddressInventoryCacheDAO.class); + NetworkAddressInventoryCache addressInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class); + List addressInventories = addressInventoryCacheDAO.loadLastUpdate(System.currentTimeMillis() - 60000); + + addressInventories.forEach(addressInventory -> { + NetworkAddressInventory cache = addressInventoryCache.get(addressInventory.getSequence()); + if (Objects.nonNull(cache)) { + if (!cache.getNetworkAddressNodeType().equals(addressInventory.getNetworkAddressNodeType())) { + cache.setNetworkAddressNodeType(addressInventory.getNetworkAddressNodeType()); + logger.info("Update the cache of net address inventory, address id: {}", addressInventory.getSequence()); + } + } else { + logger.warn("Unable to found the id of {} in net address inventory cache.", addressInventory.getSequence()); + } + }); + } } \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java index 8e906bcd6bfae7185644ee7b0c84c4477439344b..7f0edae37acfe34ef3cb790750a9d85d50939587 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java @@ -91,6 +91,7 @@ public class EndpointInventory extends RegisterSource { remoteBuilder.addDataLongs(getRegisterTime()); remoteBuilder.addDataLongs(getHeartbeatTime()); + remoteBuilder.addDataLongs(getLastUpdateTime()); remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name); return remoteBuilder; @@ -103,6 +104,7 @@ public class EndpointInventory extends RegisterSource { setRegisterTime(remoteData.getDataLongs(0)); setHeartbeatTime(remoteData.getDataLongs(1)); + setLastUpdateTime(remoteData.getDataLongs(2)); setName(remoteData.getDataStrings(0)); } @@ -115,12 +117,13 @@ public class EndpointInventory extends RegisterSource { @Override public EndpointInventory map2Data(Map dbMap) { EndpointInventory inventory = new EndpointInventory(); - inventory.setSequence((Integer)dbMap.get(SEQUENCE)); - inventory.setServiceId((Integer)dbMap.get(SERVICE_ID)); + inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue()); + inventory.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); inventory.setName((String)dbMap.get(NAME)); - inventory.setDetectPoint((Integer)dbMap.get(DETECT_POINT)); - inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME)); - inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME)); + inventory.setDetectPoint(((Number)dbMap.get(DETECT_POINT)).intValue()); + inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue()); + inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue()); + inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue()); return inventory; } @@ -132,6 +135,7 @@ public class EndpointInventory extends RegisterSource { map.put(DETECT_POINT, storageData.getDetectPoint()); map.put(REGISTER_TIME, storageData.getRegisterTime()); map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime()); + map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime()); return map; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java index d0f09faf3fee60d94736edd55a0567e80a7c92ec..7a4eb73682f86e906849048470565b9de04d05e7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java @@ -88,6 +88,7 @@ public class NetworkAddressInventory extends RegisterSource { inventory.setSequence(getSequence()); inventory.setRegisterTime(getRegisterTime()); inventory.setHeartbeatTime(getHeartbeatTime()); + inventory.setLastUpdateTime(getLastUpdateTime()); inventory.setName(name); inventory.setNodeType(nodeType); @@ -95,14 +96,14 @@ public class NetworkAddressInventory extends RegisterSource { } @Override public boolean combine(RegisterSource registerSource) { - boolean isCombine = super.combine(registerSource); + boolean isChanged = super.combine(registerSource); NetworkAddressInventory inventory = (NetworkAddressInventory)registerSource; - if (nodeType != inventory.nodeType) { - setNodeType(inventory.nodeType); + if (this.nodeType != inventory.getNodeType() && inventory.getLastUpdateTime() >= this.getLastUpdateTime()) { + setNodeType(inventory.getNodeType()); return true; } else { - return isCombine; + return isChanged; } } @@ -113,6 +114,7 @@ public class NetworkAddressInventory extends RegisterSource { remoteBuilder.addDataLongs(getRegisterTime()); remoteBuilder.addDataLongs(getHeartbeatTime()); + remoteBuilder.addDataLongs(getLastUpdateTime()); remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name); return remoteBuilder; @@ -124,6 +126,7 @@ public class NetworkAddressInventory extends RegisterSource { setRegisterTime(remoteData.getDataLongs(0)); setHeartbeatTime(remoteData.getDataLongs(1)); + setLastUpdateTime(remoteData.getDataLongs(2)); setName(remoteData.getDataStrings(0)); } @@ -136,11 +139,12 @@ public class NetworkAddressInventory extends RegisterSource { @Override public NetworkAddressInventory map2Data(Map dbMap) { NetworkAddressInventory inventory = new NetworkAddressInventory(); - inventory.setSequence((Integer)dbMap.get(SEQUENCE)); + inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue()); inventory.setName((String)dbMap.get(NAME)); - inventory.setNodeType((Integer)dbMap.get(NODE_TYPE)); - inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME)); - inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME)); + inventory.setNodeType(((Number)dbMap.get(NODE_TYPE)).intValue()); + inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue()); + inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue()); + inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue()); return inventory; } @@ -151,6 +155,7 @@ public class NetworkAddressInventory extends RegisterSource { map.put(NODE_TYPE, storageData.getNodeType()); map.put(REGISTER_TIME, storageData.getRegisterTime()); map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime()); + map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime()); return map; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java index 210c17039d2b881228ef9d2ad57dfa560095137f..328a060a6172b5b3dac69b09040f044fbcf6ba08 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java @@ -31,17 +31,24 @@ public abstract class RegisterSource extends StreamData implements StorageData { public static final String SEQUENCE = "sequence"; public static final String REGISTER_TIME = "register_time"; public static final String HEARTBEAT_TIME = "heartbeat_time"; + public static final String LAST_UPDATE_TIME = "last_update_time"; @Getter @Setter @Column(columnName = SEQUENCE) private int sequence; - @Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime; - @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long heartbeatTime; + @Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime = 0L; + @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long heartbeatTime = 0L; + @Setter @Getter @Column(columnName = LAST_UPDATE_TIME) private long lastUpdateTime = 0L; public boolean combine(RegisterSource registerSource) { + boolean isChanged = false; if (heartbeatTime < registerSource.getHeartbeatTime()) { heartbeatTime = registerSource.getHeartbeatTime(); - return true; - } else { - return false; + isChanged = true; } + + if (lastUpdateTime < registerSource.getLastUpdateTime()) { + lastUpdateTime = registerSource.getLastUpdateTime(); + isChanged = true; + } + return isChanged; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java index 28cd596a7814be4e02c45ce22eb9380d2aab3f79..f5254f0ef49e11c5a4ff28638313ea95fe7edc03 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java @@ -51,8 +51,7 @@ public class ServiceInstanceInventory extends RegisterSource { public static final String PROPERTIES = "properties"; private static final Gson GSON = new Gson(); - @Setter @Getter @Column(columnName = INSTANCE_UUID, matchQuery = true) - private String instanceUUID = Const.EMPTY_STRING; + @Setter @Getter @Column(columnName = INSTANCE_UUID, matchQuery = true) private String instanceUUID = Const.EMPTY_STRING; @Setter @Getter @Column(columnName = NAME) private String name = Const.EMPTY_STRING; @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId; @Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress; @@ -133,6 +132,7 @@ public class ServiceInstanceInventory extends RegisterSource { remoteBuilder.addDataLongs(getRegisterTime()); remoteBuilder.addDataLongs(getHeartbeatTime()); + remoteBuilder.addDataLongs(getLastUpdateTime()); remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name); remoteBuilder.addDataStrings(Strings.isNullOrEmpty(instanceUUID) ? Const.EMPTY_STRING : instanceUUID); @@ -148,6 +148,7 @@ public class ServiceInstanceInventory extends RegisterSource { setRegisterTime(remoteData.getDataLongs(0)); setHeartbeatTime(remoteData.getDataLongs(1)); + setLastUpdateTime(remoteData.getDataLongs(2)); setName(remoteData.getDataStrings(0)); setInstanceUUID(remoteData.getDataStrings(1)); @@ -162,13 +163,14 @@ public class ServiceInstanceInventory extends RegisterSource { @Override public ServiceInstanceInventory map2Data(Map dbMap) { ServiceInstanceInventory inventory = new ServiceInstanceInventory(); - inventory.setSequence((Integer)dbMap.get(SEQUENCE)); - inventory.setServiceId((Integer)dbMap.get(SERVICE_ID)); - inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS)); - inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID)); + inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue()); + inventory.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); + inventory.setIsAddress(((Number)dbMap.get(IS_ADDRESS)).intValue()); + inventory.setAddressId(((Number)dbMap.get(ADDRESS_ID)).intValue()); - inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME)); - inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME)); + inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue()); + inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue()); + inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue()); inventory.setName((String)dbMap.get(NAME)); inventory.setInstanceUUID((String)dbMap.get(INSTANCE_UUID)); @@ -185,6 +187,7 @@ public class ServiceInstanceInventory extends RegisterSource { map.put(REGISTER_TIME, storageData.getRegisterTime()); map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime()); + map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime()); map.put(NAME, storageData.getName()); map.put(INSTANCE_UUID, storageData.getInstanceUUID()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java index 380d479c9c5bd7e7584d85ea5965513859940504..5441cc137a90a73ab0c4a8fe6d0f49485ea68616 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java @@ -47,7 +47,6 @@ public class ServiceInventory extends RegisterSource { private static final String ADDRESS_ID = "address_id"; public static final String NODE_TYPE = "node_type"; public static final String MAPPING_SERVICE_ID = "mapping_service_id"; - public static final String MAPPING_LAST_UPDATE_TIME = "mapping_last_update_time"; public static final String PROPERTIES = "properties"; private static final Gson GSON = new Gson(); @@ -56,8 +55,7 @@ public class ServiceInventory extends RegisterSource { @Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId; @Setter(AccessLevel.PRIVATE) @Getter(AccessLevel.PRIVATE) @Column(columnName = NODE_TYPE) private int nodeType; @Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId; - @Setter @Getter @Column(columnName = MAPPING_LAST_UPDATE_TIME) private long mappingLastUpdateTime; - @Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private String prop; + @Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private String prop = Const.EMPTY_JSON_OBJECT_STRING; @Getter private JsonObject properties; public NodeType getServiceNodeType() { @@ -119,7 +117,7 @@ public class ServiceInventory extends RegisterSource { inventory.setIsAddress(isAddress); inventory.setNodeType(nodeType); inventory.setAddressId(addressId); - inventory.setMappingLastUpdateTime(mappingLastUpdateTime); + inventory.setLastUpdateTime(getLastUpdateTime()); inventory.setMappingServiceId(mappingServiceId); inventory.setProp(prop); @@ -155,7 +153,7 @@ public class ServiceInventory extends RegisterSource { remoteBuilder.addDataLongs(getRegisterTime()); remoteBuilder.addDataLongs(getHeartbeatTime()); - remoteBuilder.addDataLongs(getMappingLastUpdateTime()); + remoteBuilder.addDataLongs(getLastUpdateTime()); remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name); remoteBuilder.addDataStrings(Strings.isNullOrEmpty(prop) ? Const.EMPTY_STRING : prop); @@ -171,7 +169,7 @@ public class ServiceInventory extends RegisterSource { setRegisterTime(remoteData.getDataLongs(0)); setHeartbeatTime(remoteData.getDataLongs(1)); - setMappingLastUpdateTime(remoteData.getDataLongs(2)); + setLastUpdateTime(remoteData.getDataLongs(2)); setName(remoteData.getDataStrings(0)); setProp(remoteData.getDataStrings(1)); @@ -183,21 +181,22 @@ public class ServiceInventory extends RegisterSource { } @Override public boolean combine(RegisterSource registerSource) { - super.combine(registerSource); + boolean isChanged = super.combine(registerSource); ServiceInventory serviceInventory = (ServiceInventory)registerSource; - nodeType = serviceInventory.nodeType; - setProp(serviceInventory.getProp()); - if (Const.NONE != serviceInventory.getMappingServiceId() && serviceInventory.getMappingLastUpdateTime() >= this.getMappingLastUpdateTime()) { - this.mappingServiceId = serviceInventory.getMappingServiceId(); - this.mappingLastUpdateTime = serviceInventory.getMappingLastUpdateTime(); + if (serviceInventory.getLastUpdateTime() >= this.getLastUpdateTime()) { + this.nodeType = serviceInventory.getNodeType(); + setProp(serviceInventory.getProp()); + if (Const.NONE != serviceInventory.getMappingServiceId()) { + this.mappingServiceId = serviceInventory.getMappingServiceId(); + } + isChanged = true; } - return true; + return isChanged; } public static class PropertyUtil { - public static final String DATABASE = "database"; } @@ -205,15 +204,15 @@ public class ServiceInventory extends RegisterSource { @Override public ServiceInventory map2Data(Map dbMap) { ServiceInventory inventory = new ServiceInventory(); - inventory.setSequence((Integer)dbMap.get(SEQUENCE)); - inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS)); - inventory.setMappingServiceId((Integer)dbMap.get(MAPPING_SERVICE_ID)); + inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue()); + inventory.setIsAddress(((Number)dbMap.get(IS_ADDRESS)).intValue()); + inventory.setMappingServiceId(((Number)dbMap.get(MAPPING_SERVICE_ID)).intValue()); inventory.setName((String)dbMap.get(NAME)); - inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID)); - inventory.setNodeType((Integer)dbMap.get(NODE_TYPE)); - inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME)); - inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME)); - inventory.setMappingLastUpdateTime((Long)dbMap.get(MAPPING_LAST_UPDATE_TIME)); + inventory.setAddressId(((Number)dbMap.get(ADDRESS_ID)).intValue()); + inventory.setNodeType(((Number)dbMap.get(NODE_TYPE)).intValue()); + inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue()); + inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue()); + inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue()); inventory.setProp((String)dbMap.get(PROPERTIES)); return inventory; } @@ -228,7 +227,7 @@ public class ServiceInventory extends RegisterSource { map.put(NODE_TYPE, storageData.getNodeType()); map.put(REGISTER_TIME, storageData.getRegisterTime()); map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime()); - map.put(MAPPING_LAST_UPDATE_TIME, storageData.getMappingLastUpdateTime()); + map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime()); map.put(PROPERTIES, storageData.getProp()); return map; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java index 4cf831dea8a381872d8742ae8ad8767a20c85640..2f3e4e6231a277d8ce33047993acf65257cd1bdc 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java @@ -115,7 +115,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory if (!this.compare(networkAddress, nodeType)) { NetworkAddressInventory newNetworkAddress = networkAddress.getClone(); newNetworkAddress.setNetworkAddressNodeType(nodeType); - newNetworkAddress.setHeartbeatTime(System.currentTimeMillis()); + newNetworkAddress.setLastUpdateTime(System.currentTimeMillis()); InventoryStreamProcessor.getInstance().in(newNetworkAddress); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java index bc6deb89c5aa6d81f69911c969490f34e410dc5a..b5640b37d9eee2c2ac2a6a97c64dce224a862b75 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java @@ -64,7 +64,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister { serviceInventory.setRegisterTime(now); serviceInventory.setHeartbeatTime(now); serviceInventory.setMappingServiceId(Const.NONE); - serviceInventory.setMappingLastUpdateTime(now); + serviceInventory.setLastUpdateTime(now); serviceInventory.setProperties(properties); InventoryStreamProcessor.getInstance().in(serviceInventory); @@ -84,7 +84,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister { long now = System.currentTimeMillis(); serviceInventory.setRegisterTime(now); serviceInventory.setHeartbeatTime(now); - serviceInventory.setMappingLastUpdateTime(now); + serviceInventory.setLastUpdateTime(now); InventoryStreamProcessor.getInstance().in(serviceInventory); } @@ -98,7 +98,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister { serviceInventory = serviceInventory.getClone(); serviceInventory.setServiceNodeType(nodeType); serviceInventory.setProperties(properties); - serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis()); + serviceInventory.setLastUpdateTime(System.currentTimeMillis()); InventoryStreamProcessor.getInstance().in(serviceInventory); } @@ -124,7 +124,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister { if (Objects.nonNull(serviceInventory)) { serviceInventory = serviceInventory.getClone(); serviceInventory.setMappingServiceId(mappingServiceId); - serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis()); + serviceInventory.setLastUpdateTime(System.currentTimeMillis()); InventoryStreamProcessor.getInstance().in(serviceInventory); } else { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java index 614f97efd8da5ed03eec1fd185e4dd0d5f2fb935..f14bd3cbef947ecb074237441d2b6e61a9f5882a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.storage.cache; +import java.util.List; import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory; import org.apache.skywalking.oap.server.core.storage.DAO; @@ -29,4 +30,6 @@ public interface INetworkAddressInventoryCacheDAO extends DAO { int getAddressId(String networkAddress); NetworkAddressInventory get(int addressId); + + List loadLastUpdate(long lastUpdateTime); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java index a1ea367565200968ad2a835b49695b7d81d107f9..e547dbdffe63035ad22ccd1c674fcf125e0da9ea 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java @@ -33,5 +33,5 @@ public interface IServiceInventoryCacheDAO extends DAO { ServiceInventory get(int serviceId); - List loadLastMappingUpdate(); + List loadLastUpdate(long lastUpdateTime); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java index 2e8b98ce5635579f2f60c15b8b3e4a50b75da91d..f1d72aebfc1ca3be6a8aba478907730c3f94f5fe 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java @@ -116,10 +116,6 @@ public class ReferenceIdExchanger implements IdExchanger { * Need to try to get the id by assuming the endpoint name is detected at server, local or client. * * If agent does the exchange, then always use endpoint id. - * - * @param serviceId - * @param endpointName - * @return */ private int getEndpointId(int serviceId, String endpointName) { int endpointId = endpointInventoryRegister.get(serviceId, endpointName, DetectPoint.SERVER.ordinal()); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java index 42bf56c8d1e27784ff5725114656ce24097a3c1b..f61850bfc73385e354224ea791280950e90c7af7 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java @@ -103,12 +103,11 @@ public class SpanIdExchanger implements IdExchanger { NodeType nodeType = NodeType.fromSpanLayerValue(spanLayerValue); networkAddressInventoryRegister.update(peerId, nodeType); - /** + /* * In some case, conjecture node, such as Database node, could be registered by agents. * At here, if the target service properties need to be updated, * it will only be updated at the first time for now. */ - JsonObject properties = null; ServiceInventory newServiceInventory = serviceInventoryCacheDAO.get(serviceInventoryCacheDAO.getServiceId(peerId)); if (SpanLayer.Database.equals(standardBuilder.getSpanLayer())) { diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml index cf83c5bb74d8035d6d69177fd7ba4739fee3bcc2..72dcf4319073c5d6af580e64c281272ceaa79905 100644 --- a/oap-server/server-starter/src/main/resources/log4j2.xml +++ b/oap-server/server-starter/src/main/resources/log4j2.xml @@ -36,7 +36,7 @@ - + diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java index 5dfc591fd8ea3911e8fd0967b82a1a3cdb6e2613..b53d024cf65e307aa4b5dc5c167d370aed990723 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache; +import java.util.*; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO; @@ -76,4 +77,24 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetwork return null; } } + + @Override public List loadLastUpdate(long lastUpdateTime) { + List addressInventories = new ArrayList<>(); + + try { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.rangeQuery(NetworkAddressInventory.LAST_UPDATE_TIME).gte(lastUpdateTime)); + searchSourceBuilder.size(500); + + SearchResponse response = getClient().search(NetworkAddressInventory.INDEX_NAME, searchSourceBuilder); + + for (SearchHit searchHit : response.getHits().getHits()) { + addressInventories.add(this.builder.map2Data(searchHit.getSourceAsMap())); + } + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + + return addressInventories; + } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java index f1f4de3f8b6d42b6933cdf1609000b725904cb69..395a98bae162d258f5847179fda424754970b8f6 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java @@ -88,7 +88,7 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento } } - @Override public List loadLastMappingUpdate() { + @Override public List loadLastUpdate(long lastUpdateTime) { List serviceInventories = new ArrayList<>(); try { @@ -96,10 +96,10 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); boolQuery.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.TRUE)); - boolQuery.must().add(QueryBuilders.rangeQuery(ServiceInventory.MAPPING_LAST_UPDATE_TIME).gte(System.currentTimeMillis() - 30 * 60 * 1000)); + boolQuery.must().add(QueryBuilders.rangeQuery(ServiceInventory.LAST_UPDATE_TIME).gte(lastUpdateTime)); searchSourceBuilder.query(boolQuery); - searchSourceBuilder.size(50); + searchSourceBuilder.size(500); SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, searchSourceBuilder); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java index d390088ac858eadc5e2c77fb43b51f80bbef26c0..d30df1327f93ca980797dd48083179a262c71b43 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java @@ -19,11 +19,12 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.io.IOException; -import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory; +import java.sql.*; +import java.util.*; +import org.apache.skywalking.oap.server.core.register.*; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author wusheng @@ -49,4 +50,32 @@ public class H2NetworkAddressInventoryCacheDAO extends H2SQLExecutor implements return null; } } + + @Override public List loadLastUpdate(long lastUpdateTime) { + List addressInventories = new ArrayList<>(); + + try { + StringBuilder sql = new StringBuilder("select * from "); + sql.append(NetworkAddressInventory.INDEX_NAME); + sql.append(" where ").append(NetworkAddressInventory.LAST_UPDATE_TIME).append(">?"); + + try (Connection connection = h2Client.getConnection()) { + try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), lastUpdateTime)) { + NetworkAddressInventory addressInventory; + do { + addressInventory = (NetworkAddressInventory)toStorageData(resultSet, NetworkAddressInventory.INDEX_NAME, new ServiceInventory.Builder()); + if (addressInventory != null) { + addressInventories.add(addressInventory); + } + } + while (addressInventory != null); + } + } catch (SQLException e) { + throw new IOException(e); + } + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + return addressInventories; + } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java index a52e2160fc8bb457ba2e28a78c47753afe5d483e..bf8bd5f56c7400877f55a15b97f134e80971fab6 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java @@ -57,17 +57,17 @@ public class H2ServiceInventoryCacheDAO extends H2SQLExecutor implements IServic } } - @Override public List loadLastMappingUpdate() { + @Override public List loadLastUpdate(long lastUpdateTime) { List serviceInventories = new ArrayList<>(); try { StringBuilder sql = new StringBuilder("select * from "); sql.append(ServiceInventory.INDEX_NAME); sql.append(" where ").append(ServiceInventory.IS_ADDRESS).append("=? "); - sql.append(" and ").append(ServiceInventory.MAPPING_LAST_UPDATE_TIME).append(">?"); + sql.append(" and ").append(ServiceInventory.LAST_UPDATE_TIME).append(">?"); try (Connection connection = h2Client.getConnection()) { - try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), BooleanUtils.TRUE, System.currentTimeMillis() - 30 * 60 * 1000)) { + try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), BooleanUtils.TRUE, lastUpdateTime)) { ServiceInventory serviceInventory; do { serviceInventory = (ServiceInventory)toStorageData(resultSet, ServiceInventory.INDEX_NAME, new ServiceInventory.Builder());