提交 3fa77d5c 编写于 作者: P peng-yongsheng

The core implementation of register and cache for scope inventory.

上级 4ea4e04e
......@@ -64,13 +64,17 @@ public class CoreModule extends ModuleDefine {
}
private void addRegisterService(List<Class> classes) {
classes.add(IEndpointInventoryRegister.class);
classes.add(IServiceInventoryRegister.class);
classes.add(IServiceInstanceInventoryRegister.class);
classes.add(IEndpointInventoryRegister.class);
classes.add(INetworkAddressInventoryRegister.class);
}
private void addCacheService(List<Class> classes) {
classes.add(ServiceInventoryCache.class);
classes.add(ServiceInstanceInventoryCache.class);
classes.add(EndpointInventoryCache.class);
classes.add(NetworkAddressInventoryCache.class);
}
private void addReceiverInterface(List<Class> classes) {
......
......@@ -95,9 +95,15 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
this.registerServiceImplementation(ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager()));
this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager()));
this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
......
......@@ -42,25 +42,31 @@ public class EndpointInventoryCache implements Service {
this.moduleManager = moduleManager;
}
private final Cache<String, Integer> idCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private final Cache<String, Integer> endpointNameCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private final Cache<Integer, EndpointInventory> sequenceCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private final Cache<Integer, EndpointInventory> endpointIdCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
public int get(int serviceId, String serviceName, int srcSpanType) {
String id = serviceId + Const.ID_SPLIT + serviceName + Const.ID_SPLIT + srcSpanType;
private IEndpointInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
cacheDAO = moduleManager.find(StorageModule.NAME).getService(IEndpointInventoryCacheDAO.class);
}
return cacheDAO;
}
public int getEndpointId(int serviceId, String endpointName) {
String id = EndpointInventory.buildId(serviceId, endpointName);
int endpointId = Const.NONE;
try {
endpointId = idCache.get(id, () -> getCacheDAO().get(id));
endpointId = endpointNameCache.get(EndpointInventory.buildId(serviceId, endpointName), () -> getCacheDAO().getEndpointId(serviceId, endpointName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == Const.NONE) {
endpointId = getCacheDAO().get(id);
endpointId = getCacheDAO().getEndpointId(serviceId, endpointName);
if (endpointId != Const.NONE) {
idCache.put(id, endpointId);
endpointNameCache.put(id, endpointId);
}
}
return endpointId;
......@@ -69,7 +75,7 @@ public class EndpointInventoryCache implements Service {
public EndpointInventory get(int endpointId) {
EndpointInventory endpointInventory = null;
try {
endpointInventory = sequenceCache.get(endpointId, () -> getCacheDAO().get(endpointId));
endpointInventory = endpointIdCache.get(endpointId, () -> getCacheDAO().get(endpointId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
......@@ -77,7 +83,7 @@ public class EndpointInventoryCache implements Service {
if (isNull(endpointInventory)) {
endpointInventory = getCacheDAO().get(endpointId);
if (nonNull(endpointInventory)) {
sequenceCache.put(endpointId, endpointInventory);
endpointIdCache.put(endpointId, endpointInventory);
} else {
logger.warn("EndpointInventory id {} is not in cache and persistent storage.", endpointId);
}
......@@ -85,11 +91,4 @@ public class EndpointInventoryCache implements Service {
return endpointInventory;
}
private IEndpointInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
cacheDAO = moduleManager.find(StorageModule.NAME).getService(IEndpointInventoryCacheDAO.class);
}
return cacheDAO;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.StringUtils;
import org.slf4j.*;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
public class NetworkAddressInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryCache.class);
private final Cache<String, Integer> networkAddressCache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(5000).build();
private final Cache<Integer, NetworkAddressInventory> addressIdCache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(5000).build();
private final ModuleManager moduleManager;
private INetworkAddressInventoryCacheDAO cacheDAO;
public NetworkAddressInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private INetworkAddressInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
this.cacheDAO = moduleManager.find(StorageModule.NAME).getService(INetworkAddressInventoryCacheDAO.class);
}
return this.cacheDAO;
}
public int getAddressId(String networkAddress) {
int addressId = Const.NONE;
try {
addressId = networkAddressCache.get(NetworkAddressInventory.buildId(networkAddress), () -> getCacheDAO().getAddressId(networkAddress));
if (addressId == Const.NONE) {
addressId = getCacheDAO().getAddressId(networkAddress);
if (addressId != Const.NONE) {
networkAddressCache.put(NetworkAddressInventory.buildId(networkAddress), addressId);
}
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
return addressId;
}
public NetworkAddressInventory get(int addressId) {
NetworkAddressInventory networkAddress = null;
try {
networkAddress = addressIdCache.get(addressId, () -> getCacheDAO().get(addressId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (isNull(networkAddress)) {
networkAddress = getCacheDAO().get(addressId);
if (StringUtils.isNotEmpty(networkAddress)) {
addressIdCache.put(addressId, networkAddress);
}
}
return networkAddress;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
public class ServiceInstanceInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryCache.class);
private final Cache<Integer, Integer> serviceInstanceIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final Cache<String, Integer> serviceInstanceNameCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final Cache<String, Integer> addressIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final ModuleManager moduleManager;
private IServiceInstanceInventoryCacheDAO cacheDAO;
public ServiceInstanceInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IServiceInstanceInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
this.cacheDAO = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
}
return this.cacheDAO;
}
public int getServiceId(int serviceInstanceId) {
int serviceId = Const.NONE;
try {
serviceId = serviceInstanceIdCache.get(serviceInstanceId, () -> getCacheDAO().getServiceId(serviceInstanceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == Const.NONE) {
serviceId = getCacheDAO().getServiceId(serviceInstanceId);
if (serviceId != Const.NONE) {
serviceInstanceIdCache.put(serviceInstanceId, serviceId);
}
}
return serviceId;
}
public int getServiceInstanceId(int serviceId, String serviceInstanceName) {
int serviceInstanceId = Const.NONE;
try {
serviceInstanceId = serviceInstanceNameCache.get(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName), () -> getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceInstanceId == Const.NONE) {
serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName);
if (serviceId != Const.NONE) {
serviceInstanceNameCache.put(ServiceInstanceInventory.buildId(serviceId, serviceInstanceName), serviceInstanceId);
}
}
return serviceInstanceId;
}
public int getServiceInstanceId(int serviceId, int addressId) {
int serviceInstanceId = Const.NONE;
try {
serviceInstanceId = addressIdCache.get(ServiceInstanceInventory.buildId(serviceId, addressId), () -> getCacheDAO().getServiceInstanceId(serviceId, addressId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceInstanceId == Const.NONE) {
serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, addressId);
if (serviceId != Const.NONE) {
addressIdCache.put(ServiceInstanceInventory.buildId(serviceId, addressId), serviceInstanceId);
}
}
return serviceInstanceId;
}
}
......@@ -34,9 +34,9 @@ public class ServiceInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCache.class);
private final Cache<String, Integer> idCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<Integer, ServiceInventory> sequenceCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<Integer, Integer> addressIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<String, Integer> serviceNameCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<String, Integer> addressIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<Integer, ServiceInventory> serviceIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final ModuleManager moduleManager;
private IServiceInventoryCacheDAO cacheDAO;
......@@ -52,54 +52,54 @@ public class ServiceInventoryCache implements Service {
return this.cacheDAO;
}
public int get(String serviceName) {
public int getServiceId(String serviceName) {
int serviceId = 0;
try {
serviceId = idCache.get(serviceName, () -> getCacheDAO().get(serviceName));
serviceId = serviceNameCache.get(ServiceInventory.buildId(serviceName), () -> getCacheDAO().getServiceId(serviceName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = getCacheDAO().get(serviceName);
serviceId = getCacheDAO().getServiceId(serviceName);
if (serviceId != 0) {
idCache.put(serviceName, serviceId);
serviceNameCache.put(ServiceInventory.buildId(serviceName), serviceId);
}
}
return serviceId;
}
public ServiceInventory get(int serviceId) {
ServiceInventory serviceInventory = null;
public int getServiceId(int addressId) {
int serviceId = 0;
try {
serviceInventory = sequenceCache.get(serviceId, () -> getCacheDAO().get(serviceId));
serviceId = addressIdCache.get(ServiceInventory.buildId(addressId), () -> getCacheDAO().getServiceId(addressId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (isNull(serviceInventory)) {
serviceInventory = getCacheDAO().get(serviceId);
if (nonNull(serviceInventory)) {
sequenceCache.put(serviceId, serviceInventory);
if (serviceId == 0) {
serviceId = getCacheDAO().getServiceId(addressId);
if (serviceId != 0) {
addressIdCache.put(ServiceInventory.buildId(addressId), serviceId);
}
}
return serviceInventory;
return serviceId;
}
public int getServiceIdByAddressId(int addressId) {
int serviceId = 0;
public ServiceInventory get(int serviceId) {
ServiceInventory serviceInventory = null;
try {
serviceId = addressIdCache.get(addressId, () -> getCacheDAO().getServiceIdByAddressId(addressId));
serviceInventory = serviceIdCache.get(serviceId, () -> getCacheDAO().get(serviceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = getCacheDAO().getServiceIdByAddressId(addressId);
if (serviceId != 0) {
addressIdCache.put(addressId, serviceId);
if (isNull(serviceInventory)) {
serviceInventory = getCacheDAO().get(serviceId);
if (nonNull(serviceInventory)) {
serviceIdCache.put(serviceId, serviceInventory);
}
}
return serviceId;
return serviceInventory;
}
}
......@@ -43,18 +43,21 @@ public class EndpointInventory extends RegisterSource {
private static final String SRC_SPAN_TYPE = "src_span_type";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name;
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = SRC_SPAN_TYPE) private int srcSpanType;
public static String buildId(int serviceId, String endpointName) {
return serviceId + Const.ID_SPLIT + endpointName;
}
@Override public String id() {
return String.valueOf(serviceId) + Const.ID_SPLIT + name + Const.ID_SPLIT + String.valueOf(srcSpanType);
return buildId(serviceId, name);
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + serviceId;
result = 31 * result + name.hashCode();
result = 31 * result + srcSpanType;
return result;
}
......@@ -71,8 +74,6 @@ public class EndpointInventory extends RegisterSource {
return false;
if (name.equals(source.getName()))
return false;
if (srcSpanType != source.getSrcSpanType())
return false;
return true;
}
......@@ -98,20 +99,20 @@ public class EndpointInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setName(remoteData.getDataStrings(1));
setName(remoteData.getDataStrings(0));
}
public static class Builder implements StorageBuilder<EndpointInventory> {
@Override public EndpointInventory map2Data(Map<String, Object> dbMap) {
EndpointInventory endpointInventory = new EndpointInventory();
endpointInventory.setSequence((Integer)dbMap.get(SEQUENCE));
endpointInventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
endpointInventory.setName((String)dbMap.get(NAME));
endpointInventory.setSrcSpanType((Integer)dbMap.get(SRC_SPAN_TYPE));
endpointInventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
endpointInventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return endpointInventory;
EndpointInventory inventory = new EndpointInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
inventory.setName((String)dbMap.get(NAME));
inventory.setSrcSpanType((Integer)dbMap.get(SRC_SPAN_TYPE));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return inventory;
}
@Override public Map<String, Object> data2Map(EndpointInventory storageData) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* @author peng-yongsheng
*/
@InventoryType(scope = Scope.NetworkAddress)
@StreamData
@StorageEntity(name = NetworkAddressInventory.MODEL_NAME, builder = NetworkAddressInventory.Builder.class)
public class NetworkAddressInventory extends RegisterSource {
public static final String MODEL_NAME = "network_address_inventory";
private static final String NAME = "name";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
public static String buildId(String networkAddress) {
return networkAddress;
}
@Override public String id() {
return buildId(name);
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + name.hashCode();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
NetworkAddressInventory source = (NetworkAddressInventory)obj;
if (name.equals(source.getName()))
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSequence());
remoteBuilder.setDataLongs(0, getRegisterTime());
remoteBuilder.setDataLongs(1, getHeartbeatTime());
remoteBuilder.setDataStrings(0, name);
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setSequence(remoteData.getDataIntegers(0));
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setName(remoteData.getDataStrings(0));
}
public static class Builder implements StorageBuilder<NetworkAddressInventory> {
@Override public NetworkAddressInventory map2Data(Map<String, Object> dbMap) {
NetworkAddressInventory inventory = new NetworkAddressInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setName((String)dbMap.get(NAME));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return inventory;
}
@Override public Map<String, Object> data2Map(NetworkAddressInventory storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SEQUENCE, storageData.getSequence());
map.put(NAME, storageData.getName());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
/**
* @author peng-yongsheng
*/
@InventoryType(scope = Scope.ServiceInstance)
@StreamData
@StorageEntity(name = ServiceInstanceInventory.MODEL_NAME, builder = ServiceInstanceInventory.Builder.class)
public class ServiceInstanceInventory extends RegisterSource {
public static final String MODEL_NAME = "service_instance_inventory";
public static final String NAME = "name";
public static final String SERVICE_ID = "service_id";
public static final String IS_ADDRESS = "is_address";
public static final String ADDRESS_ID = "address_id";
private static final String OS_NAME = "os_name";
private static final String HOST_NAME = "host_name";
private static final String PROCESS_NO = "process_no";
private static final String IPV4 = "ipv4";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress;
@Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId;
@Setter @Getter @Column(columnName = OS_NAME) private String osName;
@Setter @Getter @Column(columnName = HOST_NAME) private String hostName;
@Setter @Getter @Column(columnName = PROCESS_NO) private int processNo;
@Setter @Getter @Column(columnName = IPV4) private String ipv4;
public static String buildId(int serviceId, String serviceInstanceName) {
return serviceId + Const.ID_SPLIT + serviceInstanceName + Const.ID_SPLIT + BooleanUtils.FALSE + Const.ID_SPLIT + Const.NONE;
}
public static String buildId(int serviceId, int addressId) {
return serviceId + Const.ID_SPLIT + BooleanUtils.TRUE + Const.ID_SPLIT + addressId;
}
@Override public String id() {
if (BooleanUtils.TRUE == isAddress) {
return buildId(serviceId, addressId);
} else {
return buildId(serviceId, name);
}
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + serviceId;
result = 31 * result + name.hashCode();
result = 31 * result + isAddress;
result = 31 * result + addressId;
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceInstanceInventory source = (ServiceInstanceInventory)obj;
if (serviceId != source.getServiceId())
return false;
if (name.equals(source.getName()))
return false;
if (isAddress != source.getIsAddress())
return false;
if (addressId != source.getAddressId())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSequence());
remoteBuilder.setDataIntegers(1, serviceId);
remoteBuilder.setDataIntegers(2, isAddress);
remoteBuilder.setDataIntegers(3, addressId);
remoteBuilder.setDataIntegers(4, processNo);
remoteBuilder.setDataLongs(0, getRegisterTime());
remoteBuilder.setDataLongs(1, getHeartbeatTime());
remoteBuilder.setDataStrings(0, name);
remoteBuilder.setDataStrings(1, osName);
remoteBuilder.setDataStrings(2, hostName);
remoteBuilder.setDataStrings(3, ipv4);
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setSequence(remoteData.getDataIntegers(0));
setServiceId(remoteData.getDataIntegers(1));
setIsAddress(remoteData.getDataIntegers(2));
setAddressId(remoteData.getDataIntegers(3));
setProcessNo(remoteData.getDataIntegers(4));
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setName(remoteData.getDataStrings(0));
setOsName(remoteData.getDataStrings(1));
setHostName(remoteData.getDataStrings(2));
setIpv4(remoteData.getDataStrings(3));
}
public static class Builder implements StorageBuilder<ServiceInstanceInventory> {
@Override public ServiceInstanceInventory map2Data(Map<String, Object> dbMap) {
ServiceInstanceInventory inventory = new ServiceInstanceInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
inventory.setProcessNo((Integer)dbMap.get(PROCESS_NO));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
inventory.setName((String)dbMap.get(NAME));
inventory.setOsName((String)dbMap.get(OS_NAME));
inventory.setHostName((String)dbMap.get(HOST_NAME));
inventory.setIpv4((String)dbMap.get(IPV4));
return inventory;
}
@Override public Map<String, Object> data2Map(ServiceInstanceInventory storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SEQUENCE, storageData.getSequence());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(IS_ADDRESS, storageData.getIsAddress());
map.put(ADDRESS_ID, storageData.getAddressId());
map.put(PROCESS_NO, storageData.getProcessNo());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
map.put(NAME, storageData.getName());
map.put(OS_NAME, storageData.getOsName());
map.put(HOST_NAME, storageData.getHostName());
map.put(IPV4, storageData.getIpv4());
return map;
}
}
}
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
/**
* @author peng-yongsheng
......@@ -42,12 +43,24 @@ public class ServiceInventory extends RegisterSource {
private static final String IS_ADDRESS = "is_address";
private static final String ADDRESS_ID = "address_id";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name;
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress;
@Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId;
public static String buildId(String serviceName) {
return serviceName + Const.ID_SPLIT + BooleanUtils.FALSE + Const.ID_SPLIT + Const.NONE;
}
public static String buildId(int addressId) {
return BooleanUtils.TRUE + Const.ID_SPLIT + addressId;
}
@Override public String id() {
return name + Const.ID_SPLIT + String.valueOf(isAddress) + Const.ID_SPLIT + String.valueOf(addressId);
if (BooleanUtils.TRUE == isAddress) {
return buildId(addressId);
} else {
return buildId(name);
}
}
@Override public int hashCode() {
......@@ -98,20 +111,20 @@ public class ServiceInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setName(remoteData.getDataStrings(1));
setName(remoteData.getDataStrings(0));
}
public static class Builder implements StorageBuilder<ServiceInventory> {
@Override public ServiceInventory map2Data(Map<String, Object> dbMap) {
ServiceInventory endpointInventory = new ServiceInventory();
endpointInventory.setSequence((Integer)dbMap.get(SEQUENCE));
endpointInventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
endpointInventory.setName((String)dbMap.get(NAME));
endpointInventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
endpointInventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
endpointInventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return endpointInventory;
ServiceInventory inventory = new ServiceInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
inventory.setName((String)dbMap.get(NAME));
inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return inventory;
}
@Override public Map<String, Object> data2Map(ServiceInventory storageData) {
......
......@@ -48,7 +48,7 @@ public class EndpointInventoryRegister implements IEndpointInventoryRegister {
}
@Override public int getOrCreate(int serviceId, String endpointName, int srcSpanType) {
int endpointId = getCacheService().get(serviceId, endpointName, srcSpanType);
int endpointId = getCacheService().getEndpointId(serviceId, endpointName);
if (endpointId == Const.NONE) {
EndpointInventory endpointInventory = new EndpointInventory();
......@@ -63,7 +63,7 @@ public class EndpointInventoryRegister implements IEndpointInventoryRegister {
return endpointId;
}
@Override public int get(int serviceId, String endpointName, int srcSpanType) {
return getCacheService().get(serviceId, endpointName, srcSpanType);
@Override public int get(int serviceId, String endpointName) {
return getCacheService().getEndpointId(serviceId, endpointName);
}
}
......@@ -27,5 +27,5 @@ public interface IEndpointInventoryRegister extends Service {
int getOrCreate(int serviceId, String endpointName, int srcSpanType);
int get(int serviceId, String endpointName, int srcSpanType);
int get(int serviceId, String endpointName);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface INetworkAddressInventoryRegister extends Service {
int getOrCreate(String networkAddress);
int get(String networkAddress);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface IServiceInstanceInventoryRegister extends Service {
int getOrCreate(int serviceId, String serviceInstanceName, long registerTime);
int getOrCreate(int serviceId, int addressId, long registerTime);
}
......@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface IServiceInventoryRegister extends Service {
int getOrCreateByServiceName(String serviceName);
int getOrCreate(String serviceName);
// int getOrCreateByAddressId(int addressId, String networkAddress);
int getOrCreate(int addressId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
public class NetworkAddressInventoryRegister implements INetworkAddressInventoryRegister {
private final ModuleManager moduleManager;
private NetworkAddressInventoryCache networkAddressInventoryCache;
private IServiceInventoryRegister serviceInventoryRegister;
private IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
public NetworkAddressInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private NetworkAddressInventoryCache getNetworkAddressInventoryCache() {
if (isNull(networkAddressInventoryCache)) {
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).getService(NetworkAddressInventoryCache.class);
}
return this.networkAddressInventoryCache;
}
private IServiceInventoryRegister getServiceInventoryRegister() {
if (isNull(serviceInventoryRegister)) {
this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
}
return this.serviceInventoryRegister;
}
private IServiceInstanceInventoryRegister getServiceInstanceInventoryRegister() {
if (isNull(serviceInstanceInventoryRegister)) {
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
}
return this.serviceInstanceInventoryRegister;
}
@Override public int getOrCreate(String networkAddress) {
int addressId = getNetworkAddressInventoryCache().getAddressId(networkAddress);
if (addressId != Const.NONE) {
int serviceId = getServiceInventoryRegister().getOrCreate(addressId);
if (serviceId != Const.NONE) {
int serviceInstanceId = getServiceInstanceInventoryRegister().getOrCreate(serviceId, addressId, System.currentTimeMillis());
if (serviceInstanceId != Const.NONE) {
return addressId;
}
}
} else {
NetworkAddressInventory newNetworkAddress = new NetworkAddressInventory();
newNetworkAddress.setName(networkAddress);
long now = System.currentTimeMillis();
newNetworkAddress.setRegisterTime(now);
newNetworkAddress.setHeartbeatTime(now);
InventoryProcess.INSTANCE.in(newNetworkAddress);
}
return Const.NONE;
}
@Override public int get(String networkAddress) {
return getNetworkAddressInventoryCache().getAddressId(networkAddress);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
public class ServiceInstanceInventoryRegister implements IServiceInstanceInventoryRegister {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryRegister.class);
private final ModuleManager moduleManager;
private IServiceInstanceInventoryCacheDAO cacheDAO;
public ServiceInstanceInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IServiceInstanceInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
cacheDAO = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
}
return cacheDAO;
}
@Override public int getOrCreate(int serviceId, String serviceInstanceName, long registerTime) {
if (logger.isDebugEnabled()) {
logger.debug("Get or create service instance by service instance name, service id: {}, service instance name: {}, registerTime: {}", serviceId, serviceInstanceName, registerTime);
}
int serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName);
if (serviceInstanceId == Const.NONE) {
ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
serviceInstanceInventory.setServiceId(serviceId);
serviceInstanceInventory.setName(serviceInstanceName);
serviceInstanceInventory.setIsAddress(BooleanUtils.FALSE);
serviceInstanceInventory.setAddressId(Const.NONE);
serviceInstanceInventory.setRegisterTime(registerTime);
serviceInstanceInventory.setHeartbeatTime(registerTime);
InventoryProcess.INSTANCE.in(serviceInstanceInventory);
}
return serviceInstanceId;
}
@Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
if (logger.isDebugEnabled()) {
logger.debug("get or create service instance by address id, service id: {}, address id: {}, registerTime: {}", serviceId, addressId, registerTime);
}
int serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, addressId);
if (serviceInstanceId == Const.NONE) {
ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
serviceInstanceInventory.setServiceId(serviceId);
serviceInstanceInventory.setName(Const.EMPTY_STRING);
serviceInstanceInventory.setIsAddress(BooleanUtils.TRUE);
serviceInstanceInventory.setAddressId(addressId);
serviceInstanceInventory.setRegisterTime(registerTime);
serviceInstanceInventory.setHeartbeatTime(registerTime);
InventoryProcess.INSTANCE.in(serviceInstanceInventory);
}
return serviceInstanceId;
}
}
......@@ -46,8 +46,8 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
return serviceInventoryCache;
}
@Override public int getOrCreateByServiceName(String serviceName) {
int serviceId = getServiceInventoryCache().get(serviceName);
@Override public int getOrCreate(String serviceName) {
int serviceId = getServiceInventoryCache().getServiceId(serviceName);
if (serviceId == Const.NONE) {
ServiceInventory serviceInventory = new ServiceInventory();
......@@ -63,4 +63,22 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
}
return serviceId;
}
@Override public int getOrCreate(int addressId) {
int serviceId = getServiceInventoryCache().getServiceId(addressId);
if (serviceId == Const.NONE) {
ServiceInventory serviceInventory = new ServiceInventory();
serviceInventory.setName(Const.EMPTY_STRING);
serviceInventory.setAddressId(addressId);
serviceInventory.setIsAddress(BooleanUtils.TRUE);
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
InventoryProcess.INSTANCE.in(serviceInventory);
}
return serviceId;
}
}
......@@ -22,5 +22,5 @@ package org.apache.skywalking.oap.server.core.source;
* @author peng-yongsheng
*/
public enum Scope {
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress
}
......@@ -33,6 +33,9 @@ public class StorageModule extends ModuleDefine {
}
@Override public Class[] services() {
return new Class[] {IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class, IEndpointInventoryCacheDAO.class, IServiceInventoryCacheDAO.class};
return new Class[] {
IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class};
}
}
......@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.DAO;
*/
public interface IEndpointInventoryCacheDAO extends DAO {
int get(String id);
int getEndpointId(int serviceId, String endpointName);
EndpointInventory get(int sequence);
EndpointInventory get(int endpointId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.cache;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
public interface INetworkAddressInventoryCacheDAO extends DAO {
int getAddressId(String networkAddress);
NetworkAddressInventory get(int addressId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.cache;
import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
public interface IServiceInstanceInventoryCacheDAO extends DAO {
int getServiceId(int serviceInstanceId);
int getServiceInstanceId(int serviceId, String serviceInstanceName);
int getServiceInstanceId(int serviceId, int addressId);
}
......@@ -26,9 +26,9 @@ import org.apache.skywalking.oap.server.core.storage.DAO;
*/
public interface IServiceInventoryCacheDAO extends DAO {
int get(String id);
int getServiceId(String serviceName);
ServiceInventory get(int sequence);
int getServiceId(int addressId);
int getServiceIdByAddressId(int addressId);
ServiceInventory get(int serviceId);
}
......@@ -47,7 +47,7 @@ public class ApplicationRegisterHandler extends ApplicationRegisterServiceGrpc.A
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
String serviceName = request.getApplicationCode();
int serviceId = serviceInventoryRegister.getOrCreateByServiceName(serviceName);
int serviceId = serviceInventoryRegister.getOrCreate(serviceName);
if (serviceId != Const.NONE) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(serviceName).setValue(serviceId).build();
......
......@@ -49,7 +49,7 @@ public class ServiceNameDiscoveryHandler extends ServiceNameDiscoveryServiceGrpc
int serviceId = serviceNameElement.getApplicationId();
String endpointName = serviceNameElement.getServiceName();
int srcSpanType = serviceNameElement.getSrcSpanTypeValue();
int endpointId = inventoryService.get(serviceId, endpointName, srcSpanType);
int endpointId = inventoryService.getOrCreate(serviceId, endpointName, srcSpanType);
if (endpointId != Const.NONE) {
ServiceNameMappingElement.Builder mappingElement = ServiceNameMappingElement.newBuilder();
......
......@@ -69,7 +69,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient, 1000));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient));
}
@Override
......
......@@ -18,11 +18,16 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
......@@ -32,25 +37,43 @@ public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInven
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCacheEsDAO.class);
private final EndpointInventory.Builder builder = new EndpointInventory.Builder();
public EndpointInventoryCacheEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public int get(String id) {
@Override public int getEndpointId(int serviceId, String endpointName) {
try {
String id = EndpointInventory.buildId(serviceId, endpointName);
GetResponse response = getClient().get(EndpointInventory.MODEL_NAME, id);
if (response.isExists()) {
return response.getField(RegisterSource.SEQUENCE).getValue();
} else {
return 0;
return Const.NONE;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return 0;
return Const.NONE;
}
}
@Override public EndpointInventory get(int sequence) {
return null;
@Override public EndpointInventory get(int endpointId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(EndpointInventory.SEQUENCE, endpointId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(EndpointInventory.MODEL_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetworkAddressInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryCacheEsDAO.class);
private final NetworkAddressInventory.Builder builder = new NetworkAddressInventory.Builder();
public NetworkAddressInventoryCacheEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getAddressId(String networkAddress) {
try {
String id = NetworkAddressInventory.buildId(networkAddress);
GetResponse response = getClient().get(NetworkAddressInventory.MODEL_NAME, id);
if (response.isExists()) {
return response.getField(RegisterSource.SEQUENCE).getValue();
} else {
return Const.NONE;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return Const.NONE;
}
}
@Override public NetworkAddressInventory get(int addressId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(NetworkAddressInventory.SEQUENCE, addressId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(NetworkAddressInventory.MODEL_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ServiceInstanceInventoryCacheDAO extends EsDAO implements IServiceInstanceInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryCacheDAO.class);
private final ServiceInstanceInventory.Builder builder = new ServiceInstanceInventory.Builder();
public ServiceInstanceInventoryCacheDAO(ElasticSearchClient client) {
super(client);
}
@Override public int getServiceId(int serviceInstanceId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(ServiceInstanceInventory.SEQUENCE, serviceInstanceId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(ServiceInstanceInventory.MODEL_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap()).getServiceId();
} else {
return Const.NONE;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return Const.NONE;
}
}
@Override public int getServiceInstanceId(int serviceId, String serviceInstanceName) {
String id = ServiceInstanceInventory.buildId(serviceId, serviceInstanceName);
return get(id);
}
@Override public int getServiceInstanceId(int serviceId, int addressId) {
String id = ServiceInstanceInventory.buildId(serviceId, addressId);
return get(id);
}
private int get(String id) {
try {
GetResponse response = getClient().get(ServiceInstanceInventory.MODEL_NAME, id);
if (response.isExists()) {
return response.getField(RegisterSource.SEQUENCE).getValue();
} else {
return Const.NONE;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return Const.NONE;
}
}
}
......@@ -18,11 +18,16 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
......@@ -32,29 +37,52 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCacheEsDAO.class);
private final ServiceInventory.Builder builder = new ServiceInventory.Builder();
public ServiceInventoryCacheEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public int get(String id) {
@Override public int getServiceId(String serviceName) {
String id = ServiceInventory.buildId(serviceName);
return get(id);
}
@Override public int getServiceId(int addressId) {
String id = ServiceInventory.buildId(addressId);
return get(id);
}
private int get(String id) {
try {
GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, id);
if (response.isExists()) {
return response.getField(RegisterSource.SEQUENCE).getValue();
} else {
return 0;
return Const.NONE;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return 0;
return Const.NONE;
}
}
@Override public ServiceInventory get(int sequence) {
return null;
}
@Override public ServiceInventory get(int serviceId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(ServiceInventory.SEQUENCE, serviceId));
searchSourceBuilder.size(1);
@Override public int getServiceIdByAddressId(int addressId) {
return 0;
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册