提交 16b82365 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

1. All the inventories add last_update_time attribute, delete the...

1. All the inventories add last_update_time attribute,  delete the mapping_last_update_time attribute in service inventory. (#3168)

2. Fixed the service inventory and net address inventory update bugs.
上级 c76fb405
......@@ -36,5 +36,6 @@ public class Const {
public static final String SEGMENT_SPAN_SPLIT = "S";
public static final String UNKNOWN = "Unknown";
public static final String EMPTY_STRING = "";
public static final String EMPTY_JSON_OBJECT_STRING = "{}";
public static final String DOMAIN_OPERATION_NAME = "{domain}";
}
......@@ -22,10 +22,10 @@ import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
......@@ -38,34 +38,60 @@ public enum CacheUpdateTimer {
private Boolean isStarted = false;
public void start(ModuleManager moduleManager) {
logger.info("Cache update timer start");
public void start(ModuleDefineHolder moduleDefineHolder) {
logger.info("Cache updateServiceInventory timer start");
final long timeInterval = 3;
final long timeInterval = 10;
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> update(moduleManager),
new RunnableWithExceptionProtection(() -> update(moduleDefineHolder),
t -> logger.error("Cache update failure.", t)), 1, timeInterval, TimeUnit.SECONDS);
this.isStarted = true;
}
}
private void update(ModuleManager moduleManager) {
IServiceInventoryCacheDAO serviceInventoryCacheDAO = moduleManager.find(StorageModule.NAME).provider().getService(IServiceInventoryCacheDAO.class);
ServiceInventoryCache serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
List<ServiceInventory> serviceInventories = serviceInventoryCacheDAO.loadLastMappingUpdate();
private void update(ModuleDefineHolder moduleDefineHolder) {
updateServiceInventory(moduleDefineHolder);
updateNetAddressInventory(moduleDefineHolder);
}
private void updateServiceInventory(ModuleDefineHolder moduleDefineHolder) {
IServiceInventoryCacheDAO serviceInventoryCacheDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IServiceInventoryCacheDAO.class);
ServiceInventoryCache serviceInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
List<ServiceInventory> serviceInventories = serviceInventoryCacheDAO.loadLastUpdate(System.currentTimeMillis() - 60000);
serviceInventories.forEach(serviceInventory -> {
logger.info("Update mapping service id in the cache of service inventory, service id: {}, mapping service id: {}", serviceInventory.getSequence(), serviceInventory.getMappingServiceId());
ServiceInventory cache = serviceInventoryCache.get(serviceInventory.getSequence());
if (Objects.nonNull(cache)) {
cache.setMappingServiceId(serviceInventory.getMappingServiceId());
cache.setMappingLastUpdateTime(serviceInventory.getMappingLastUpdateTime());
if (cache.getMappingServiceId() != serviceInventory.getMappingServiceId()) {
cache.setMappingServiceId(serviceInventory.getMappingServiceId());
cache.setServiceNodeType(serviceInventory.getServiceNodeType());
cache.setProperties(serviceInventory.getProperties());
logger.info("Update the cache of service inventory, service id: {}", serviceInventory.getSequence());
}
} else {
logger.warn("Unable to found the id of {} in service inventory cache.", serviceInventory.getSequence());
}
});
}
private void updateNetAddressInventory(ModuleDefineHolder moduleDefineHolder) {
INetworkAddressInventoryCacheDAO addressInventoryCacheDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(INetworkAddressInventoryCacheDAO.class);
NetworkAddressInventoryCache addressInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
List<NetworkAddressInventory> addressInventories = addressInventoryCacheDAO.loadLastUpdate(System.currentTimeMillis() - 60000);
addressInventories.forEach(addressInventory -> {
NetworkAddressInventory cache = addressInventoryCache.get(addressInventory.getSequence());
if (Objects.nonNull(cache)) {
if (!cache.getNetworkAddressNodeType().equals(addressInventory.getNetworkAddressNodeType())) {
cache.setNetworkAddressNodeType(addressInventory.getNetworkAddressNodeType());
logger.info("Update the cache of net address inventory, address id: {}", addressInventory.getSequence());
}
} else {
logger.warn("Unable to found the id of {} in net address inventory cache.", addressInventory.getSequence());
}
});
}
}
\ No newline at end of file
......@@ -91,6 +91,7 @@ public class EndpointInventory extends RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name);
return remoteBuilder;
......@@ -103,6 +104,7 @@ public class EndpointInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
}
......@@ -115,12 +117,13 @@ public class EndpointInventory extends RegisterSource {
@Override public EndpointInventory map2Data(Map<String, Object> dbMap) {
EndpointInventory inventory = new EndpointInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
inventory.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
inventory.setName((String)dbMap.get(NAME));
inventory.setDetectPoint((Integer)dbMap.get(DETECT_POINT));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
inventory.setDetectPoint(((Number)dbMap.get(DETECT_POINT)).intValue());
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
return inventory;
}
......@@ -132,6 +135,7 @@ public class EndpointInventory extends RegisterSource {
map.put(DETECT_POINT, storageData.getDetectPoint());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
return map;
}
}
......
......@@ -88,6 +88,7 @@ public class NetworkAddressInventory extends RegisterSource {
inventory.setSequence(getSequence());
inventory.setRegisterTime(getRegisterTime());
inventory.setHeartbeatTime(getHeartbeatTime());
inventory.setLastUpdateTime(getLastUpdateTime());
inventory.setName(name);
inventory.setNodeType(nodeType);
......@@ -95,14 +96,14 @@ public class NetworkAddressInventory extends RegisterSource {
}
@Override public boolean combine(RegisterSource registerSource) {
boolean isCombine = super.combine(registerSource);
boolean isChanged = super.combine(registerSource);
NetworkAddressInventory inventory = (NetworkAddressInventory)registerSource;
if (nodeType != inventory.nodeType) {
setNodeType(inventory.nodeType);
if (this.nodeType != inventory.getNodeType() && inventory.getLastUpdateTime() >= this.getLastUpdateTime()) {
setNodeType(inventory.getNodeType());
return true;
} else {
return isCombine;
return isChanged;
}
}
......@@ -113,6 +114,7 @@ public class NetworkAddressInventory extends RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name);
return remoteBuilder;
......@@ -124,6 +126,7 @@ public class NetworkAddressInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
}
......@@ -136,11 +139,12 @@ public class NetworkAddressInventory extends RegisterSource {
@Override public NetworkAddressInventory map2Data(Map<String, Object> dbMap) {
NetworkAddressInventory inventory = new NetworkAddressInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
inventory.setName((String)dbMap.get(NAME));
inventory.setNodeType((Integer)dbMap.get(NODE_TYPE));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
inventory.setNodeType(((Number)dbMap.get(NODE_TYPE)).intValue());
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
return inventory;
}
......@@ -151,6 +155,7 @@ public class NetworkAddressInventory extends RegisterSource {
map.put(NODE_TYPE, storageData.getNodeType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
return map;
}
}
......
......@@ -31,17 +31,24 @@ public abstract class RegisterSource extends StreamData implements StorageData {
public static final String SEQUENCE = "sequence";
public static final String REGISTER_TIME = "register_time";
public static final String HEARTBEAT_TIME = "heartbeat_time";
public static final String LAST_UPDATE_TIME = "last_update_time";
@Getter @Setter @Column(columnName = SEQUENCE) private int sequence;
@Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime;
@Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long heartbeatTime;
@Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime = 0L;
@Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long heartbeatTime = 0L;
@Setter @Getter @Column(columnName = LAST_UPDATE_TIME) private long lastUpdateTime = 0L;
public boolean combine(RegisterSource registerSource) {
boolean isChanged = false;
if (heartbeatTime < registerSource.getHeartbeatTime()) {
heartbeatTime = registerSource.getHeartbeatTime();
return true;
} else {
return false;
isChanged = true;
}
if (lastUpdateTime < registerSource.getLastUpdateTime()) {
lastUpdateTime = registerSource.getLastUpdateTime();
isChanged = true;
}
return isChanged;
}
}
......@@ -51,8 +51,7 @@ public class ServiceInstanceInventory extends RegisterSource {
public static final String PROPERTIES = "properties";
private static final Gson GSON = new Gson();
@Setter @Getter @Column(columnName = INSTANCE_UUID, matchQuery = true)
private String instanceUUID = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = INSTANCE_UUID, matchQuery = true) private String instanceUUID = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = NAME) private String name = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress;
......@@ -133,6 +132,7 @@ public class ServiceInstanceInventory extends RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name);
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(instanceUUID) ? Const.EMPTY_STRING : instanceUUID);
......@@ -148,6 +148,7 @@ public class ServiceInstanceInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
setInstanceUUID(remoteData.getDataStrings(1));
......@@ -162,13 +163,14 @@ public class ServiceInstanceInventory extends RegisterSource {
@Override public ServiceInstanceInventory map2Data(Map<String, Object> dbMap) {
ServiceInstanceInventory inventory = new ServiceInstanceInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
inventory.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
inventory.setIsAddress(((Number)dbMap.get(IS_ADDRESS)).intValue());
inventory.setAddressId(((Number)dbMap.get(ADDRESS_ID)).intValue());
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
inventory.setName((String)dbMap.get(NAME));
inventory.setInstanceUUID((String)dbMap.get(INSTANCE_UUID));
......@@ -185,6 +187,7 @@ public class ServiceInstanceInventory extends RegisterSource {
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
map.put(NAME, storageData.getName());
map.put(INSTANCE_UUID, storageData.getInstanceUUID());
......
......@@ -47,7 +47,6 @@ public class ServiceInventory extends RegisterSource {
private static final String ADDRESS_ID = "address_id";
public static final String NODE_TYPE = "node_type";
public static final String MAPPING_SERVICE_ID = "mapping_service_id";
public static final String MAPPING_LAST_UPDATE_TIME = "mapping_last_update_time";
public static final String PROPERTIES = "properties";
private static final Gson GSON = new Gson();
......@@ -56,8 +55,7 @@ public class ServiceInventory extends RegisterSource {
@Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId;
@Setter(AccessLevel.PRIVATE) @Getter(AccessLevel.PRIVATE) @Column(columnName = NODE_TYPE) private int nodeType;
@Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId;
@Setter @Getter @Column(columnName = MAPPING_LAST_UPDATE_TIME) private long mappingLastUpdateTime;
@Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private String prop;
@Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private String prop = Const.EMPTY_JSON_OBJECT_STRING;
@Getter private JsonObject properties;
public NodeType getServiceNodeType() {
......@@ -119,7 +117,7 @@ public class ServiceInventory extends RegisterSource {
inventory.setIsAddress(isAddress);
inventory.setNodeType(nodeType);
inventory.setAddressId(addressId);
inventory.setMappingLastUpdateTime(mappingLastUpdateTime);
inventory.setLastUpdateTime(getLastUpdateTime());
inventory.setMappingServiceId(mappingServiceId);
inventory.setProp(prop);
......@@ -155,7 +153,7 @@ public class ServiceInventory extends RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
remoteBuilder.addDataLongs(getMappingLastUpdateTime());
remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ? Const.EMPTY_STRING : name);
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(prop) ? Const.EMPTY_STRING : prop);
......@@ -171,7 +169,7 @@ public class ServiceInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setMappingLastUpdateTime(remoteData.getDataLongs(2));
setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
setProp(remoteData.getDataStrings(1));
......@@ -183,21 +181,22 @@ public class ServiceInventory extends RegisterSource {
}
@Override public boolean combine(RegisterSource registerSource) {
super.combine(registerSource);
boolean isChanged = super.combine(registerSource);
ServiceInventory serviceInventory = (ServiceInventory)registerSource;
nodeType = serviceInventory.nodeType;
setProp(serviceInventory.getProp());
if (Const.NONE != serviceInventory.getMappingServiceId() && serviceInventory.getMappingLastUpdateTime() >= this.getMappingLastUpdateTime()) {
this.mappingServiceId = serviceInventory.getMappingServiceId();
this.mappingLastUpdateTime = serviceInventory.getMappingLastUpdateTime();
if (serviceInventory.getLastUpdateTime() >= this.getLastUpdateTime()) {
this.nodeType = serviceInventory.getNodeType();
setProp(serviceInventory.getProp());
if (Const.NONE != serviceInventory.getMappingServiceId()) {
this.mappingServiceId = serviceInventory.getMappingServiceId();
}
isChanged = true;
}
return true;
return isChanged;
}
public static class PropertyUtil {
public static final String DATABASE = "database";
}
......@@ -205,15 +204,15 @@ public class ServiceInventory extends RegisterSource {
@Override public ServiceInventory map2Data(Map<String, Object> dbMap) {
ServiceInventory inventory = new ServiceInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
inventory.setMappingServiceId((Integer)dbMap.get(MAPPING_SERVICE_ID));
inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
inventory.setIsAddress(((Number)dbMap.get(IS_ADDRESS)).intValue());
inventory.setMappingServiceId(((Number)dbMap.get(MAPPING_SERVICE_ID)).intValue());
inventory.setName((String)dbMap.get(NAME));
inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
inventory.setNodeType((Integer)dbMap.get(NODE_TYPE));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
inventory.setMappingLastUpdateTime((Long)dbMap.get(MAPPING_LAST_UPDATE_TIME));
inventory.setAddressId(((Number)dbMap.get(ADDRESS_ID)).intValue());
inventory.setNodeType(((Number)dbMap.get(NODE_TYPE)).intValue());
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
inventory.setProp((String)dbMap.get(PROPERTIES));
return inventory;
}
......@@ -228,7 +227,7 @@ public class ServiceInventory extends RegisterSource {
map.put(NODE_TYPE, storageData.getNodeType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
map.put(MAPPING_LAST_UPDATE_TIME, storageData.getMappingLastUpdateTime());
map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
map.put(PROPERTIES, storageData.getProp());
return map;
}
......
......@@ -115,7 +115,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
if (!this.compare(networkAddress, nodeType)) {
NetworkAddressInventory newNetworkAddress = networkAddress.getClone();
newNetworkAddress.setNetworkAddressNodeType(nodeType);
newNetworkAddress.setHeartbeatTime(System.currentTimeMillis());
newNetworkAddress.setLastUpdateTime(System.currentTimeMillis());
InventoryStreamProcessor.getInstance().in(newNetworkAddress);
}
......
......@@ -64,7 +64,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingServiceId(Const.NONE);
serviceInventory.setMappingLastUpdateTime(now);
serviceInventory.setLastUpdateTime(now);
serviceInventory.setProperties(properties);
InventoryStreamProcessor.getInstance().in(serviceInventory);
......@@ -84,7 +84,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingLastUpdateTime(now);
serviceInventory.setLastUpdateTime(now);
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
......@@ -98,7 +98,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory = serviceInventory.getClone();
serviceInventory.setServiceNodeType(nodeType);
serviceInventory.setProperties(properties);
serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis());
serviceInventory.setLastUpdateTime(System.currentTimeMillis());
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
......@@ -124,7 +124,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
if (Objects.nonNull(serviceInventory)) {
serviceInventory = serviceInventory.getClone();
serviceInventory.setMappingServiceId(mappingServiceId);
serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis());
serviceInventory.setLastUpdateTime(System.currentTimeMillis());
InventoryStreamProcessor.getInstance().in(serviceInventory);
} else {
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.cache;
import java.util.List;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.DAO;
......@@ -29,4 +30,6 @@ public interface INetworkAddressInventoryCacheDAO extends DAO {
int getAddressId(String networkAddress);
NetworkAddressInventory get(int addressId);
List<NetworkAddressInventory> loadLastUpdate(long lastUpdateTime);
}
......@@ -33,5 +33,5 @@ public interface IServiceInventoryCacheDAO extends DAO {
ServiceInventory get(int serviceId);
List<ServiceInventory> loadLastMappingUpdate();
List<ServiceInventory> loadLastUpdate(long lastUpdateTime);
}
......@@ -116,10 +116,6 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
* Need to try to get the id by assuming the endpoint name is detected at server, local or client.
*
* If agent does the exchange, then always use endpoint id.
*
* @param serviceId
* @param endpointName
* @return
*/
private int getEndpointId(int serviceId, String endpointName) {
int endpointId = endpointInventoryRegister.get(serviceId, endpointName, DetectPoint.SERVER.ordinal());
......
......@@ -103,12 +103,11 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
NodeType nodeType = NodeType.fromSpanLayerValue(spanLayerValue);
networkAddressInventoryRegister.update(peerId, nodeType);
/**
/*
* In some case, conjecture node, such as Database node, could be registered by agents.
* At here, if the target service properties need to be updated,
* it will only be updated at the first time for now.
*/
JsonObject properties = null;
ServiceInventory newServiceInventory = serviceInventoryCacheDAO.get(serviceInventoryCacheDAO.getServiceId(peerId));
if (SpanLayer.Database.equals(standardBuilder.getSpanLayer())) {
......
......@@ -36,7 +36,7 @@
<logger name="org.apache.skywalking.oap.server.core.remote.client" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.so11y" level="DEBUG" />
<Root level="ERROR">
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
......@@ -76,4 +77,24 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetwork
return null;
}
}
@Override public List<NetworkAddressInventory> loadLastUpdate(long lastUpdateTime) {
List<NetworkAddressInventory> addressInventories = new ArrayList<>();
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.rangeQuery(NetworkAddressInventory.LAST_UPDATE_TIME).gte(lastUpdateTime));
searchSourceBuilder.size(500);
SearchResponse response = getClient().search(NetworkAddressInventory.INDEX_NAME, searchSourceBuilder);
for (SearchHit searchHit : response.getHits().getHits()) {
addressInventories.add(this.builder.map2Data(searchHit.getSourceAsMap()));
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
return addressInventories;
}
}
......@@ -88,7 +88,7 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
}
}
@Override public List<ServiceInventory> loadLastMappingUpdate() {
@Override public List<ServiceInventory> loadLastUpdate(long lastUpdateTime) {
List<ServiceInventory> serviceInventories = new ArrayList<>();
try {
......@@ -96,10 +96,10 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.TRUE));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceInventory.MAPPING_LAST_UPDATE_TIME).gte(System.currentTimeMillis() - 30 * 60 * 1000));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceInventory.LAST_UPDATE_TIME).gte(lastUpdateTime));
searchSourceBuilder.query(boolQuery);
searchSourceBuilder.size(50);
searchSourceBuilder.size(500);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, searchSourceBuilder);
......
......@@ -19,11 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author wusheng
......@@ -49,4 +50,32 @@ public class H2NetworkAddressInventoryCacheDAO extends H2SQLExecutor implements
return null;
}
}
@Override public List<NetworkAddressInventory> loadLastUpdate(long lastUpdateTime) {
List<NetworkAddressInventory> addressInventories = new ArrayList<>();
try {
StringBuilder sql = new StringBuilder("select * from ");
sql.append(NetworkAddressInventory.INDEX_NAME);
sql.append(" where ").append(NetworkAddressInventory.LAST_UPDATE_TIME).append(">?");
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), lastUpdateTime)) {
NetworkAddressInventory addressInventory;
do {
addressInventory = (NetworkAddressInventory)toStorageData(resultSet, NetworkAddressInventory.INDEX_NAME, new ServiceInventory.Builder());
if (addressInventory != null) {
addressInventories.add(addressInventory);
}
}
while (addressInventory != null);
}
} catch (SQLException e) {
throw new IOException(e);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
return addressInventories;
}
}
......@@ -57,17 +57,17 @@ public class H2ServiceInventoryCacheDAO extends H2SQLExecutor implements IServic
}
}
@Override public List<ServiceInventory> loadLastMappingUpdate() {
@Override public List<ServiceInventory> loadLastUpdate(long lastUpdateTime) {
List<ServiceInventory> serviceInventories = new ArrayList<>();
try {
StringBuilder sql = new StringBuilder("select * from ");
sql.append(ServiceInventory.INDEX_NAME);
sql.append(" where ").append(ServiceInventory.IS_ADDRESS).append("=? ");
sql.append(" and ").append(ServiceInventory.MAPPING_LAST_UPDATE_TIME).append(">?");
sql.append(" and ").append(ServiceInventory.LAST_UPDATE_TIME).append(">?");
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), BooleanUtils.TRUE, System.currentTimeMillis() - 30 * 60 * 1000)) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), BooleanUtils.TRUE, lastUpdateTime)) {
ServiceInventory serviceInventory;
do {
serviceInventory = (ServiceInventory)toStorageData(resultSet, ServiceInventory.INDEX_NAME, new ServiceInventory.Builder());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册