未验证 提交 4ea4e04e 编写于 作者: 彭勇升 pengys 提交者: GitHub

Service inventory register test success. (#1552)

* Register start up.

* Service inventory register test success.
上级 f80de428
......@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.core;
import java.util.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
......@@ -43,6 +45,8 @@ public class CoreModule extends ModuleDefine {
addServerInterface(classes);
addReceiverInterface(classes);
addInsideService(classes);
addRegisterService(classes);
addCacheService(classes);
return classes.toArray(new Class[] {});
}
......@@ -59,6 +63,16 @@ public class CoreModule extends ModuleDefine {
classes.add(RemoteSenderService.class);
}
private void addRegisterService(List<Class> classes) {
classes.add(IEndpointInventoryRegister.class);
classes.add(IServiceInventoryRegister.class);
}
private void addCacheService(List<Class> classes) {
classes.add(ServiceInventoryCache.class);
classes.add(EndpointInventoryCache.class);
}
private void addReceiverInterface(List<Class> classes) {
classes.add(SourceReceiver.class);
}
......
......@@ -21,8 +21,10 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
......@@ -46,6 +48,7 @@ public class CoreModuleProvider extends ModuleProvider {
private final CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private JettyServer jettyServer;
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageAnnotationListener storageAnnotationListener;
private final StreamAnnotationListener streamAnnotationListener;
......@@ -86,18 +89,27 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager()));
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener);
this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager()));
this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
}
@Override public void start() throws ModuleStartException {
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
remoteClientManager.start();
try {
annotationScan.scan(() -> {
......
......@@ -31,25 +31,25 @@ import static java.util.Objects.*;
/**
* @author peng-yongsheng
*/
public class EndpointInventoryCacheService implements Service {
public class EndpointInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCacheService.class);
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCache.class);
private final ModuleManager moduleManager;
private IEndpointInventoryCacheDAO cacheDAO;
public EndpointInventoryCacheService(ModuleManager moduleManager) {
public EndpointInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private final Cache<String, Integer> idCache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(100000).build();
private final Cache<String, Integer> idCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
private final Cache<Integer, EndpointInventory> sequenceCache = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(100000).build();
private final Cache<Integer, EndpointInventory> sequenceCache = CacheBuilder.newBuilder().initialCapacity(5000).maximumSize(100000).build();
public int get(int serviceId, String serviceName, int srcSpanType) {
String id = serviceId + Const.ID_SPLIT + serviceName + Const.ID_SPLIT + srcSpanType;
int endpointId = 0;
int endpointId = Const.NONE;
try {
endpointId = idCache.get(id, () -> getCacheDAO().get(id));
......@@ -57,9 +57,9 @@ public class EndpointInventoryCacheService implements Service {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
if (serviceId == Const.NONE) {
endpointId = getCacheDAO().get(id);
if (endpointId != 0) {
if (endpointId != Const.NONE) {
idCache.put(id, endpointId);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.cache;
import com.google.common.cache.*;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
import static java.util.Objects.*;
/**
* @author peng-yongsheng
*/
public class ServiceInventoryCache implements Service {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCache.class);
private final Cache<String, Integer> idCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<Integer, ServiceInventory> sequenceCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final Cache<Integer, Integer> addressIdCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
private final ModuleManager moduleManager;
private IServiceInventoryCacheDAO cacheDAO;
public ServiceInventoryCache(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IServiceInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
this.cacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceInventoryCacheDAO.class);
}
return this.cacheDAO;
}
public int get(String serviceName) {
int serviceId = 0;
try {
serviceId = idCache.get(serviceName, () -> getCacheDAO().get(serviceName));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = getCacheDAO().get(serviceName);
if (serviceId != 0) {
idCache.put(serviceName, serviceId);
}
}
return serviceId;
}
public ServiceInventory get(int serviceId) {
ServiceInventory serviceInventory = null;
try {
serviceInventory = sequenceCache.get(serviceId, () -> getCacheDAO().get(serviceId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (isNull(serviceInventory)) {
serviceInventory = getCacheDAO().get(serviceId);
if (nonNull(serviceInventory)) {
sequenceCache.put(serviceId, serviceInventory);
}
}
return serviceInventory;
}
public int getServiceIdByAddressId(int addressId) {
int serviceId = 0;
try {
serviceId = addressIdCache.get(addressId, () -> getCacheDAO().getServiceIdByAddressId(addressId));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (serviceId == 0) {
serviceId = getCacheDAO().getServiceIdByAddressId(addressId);
if (serviceId != 0) {
addressIdCache.put(addressId, serviceId);
}
}
return serviceId;
}
}
......@@ -33,9 +33,11 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
*/
@InventoryType(scope = Scope.Endpoint)
@StreamData
@StorageEntity(name = "endpoint_inventory", builder = EndpointInventory.Builder.class)
@StorageEntity(name = EndpointInventory.MODEL_NAME, builder = EndpointInventory.Builder.class)
public class EndpointInventory extends RegisterSource {
public static final String MODEL_NAME = "endpoint_inventory";
private static final String SERVICE_ID = "service_id";
private static final String NAME = "name";
private static final String SRC_SPAN_TYPE = "src_span_type";
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* @author peng-yongsheng
*/
@InventoryType(scope = Scope.Service)
@StreamData
@StorageEntity(name = ServiceInventory.MODEL_NAME, builder = ServiceInventory.Builder.class)
public class ServiceInventory extends RegisterSource {
public static final String MODEL_NAME = "service_inventory";
private static final String NAME = "name";
private static final String IS_ADDRESS = "is_address";
private static final String ADDRESS_ID = "address_id";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name;
@Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress;
@Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId;
@Override public String id() {
return name + Const.ID_SPLIT + String.valueOf(isAddress) + Const.ID_SPLIT + String.valueOf(addressId);
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + name.hashCode();
result = 31 * result + isAddress;
result = 31 * result + addressId;
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceInventory source = (ServiceInventory)obj;
if (name.equals(source.getName()))
return false;
if (isAddress != source.getIsAddress())
return false;
if (addressId != source.getAddressId())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSequence());
remoteBuilder.setDataIntegers(1, isAddress);
remoteBuilder.setDataIntegers(2, addressId);
remoteBuilder.setDataLongs(0, getRegisterTime());
remoteBuilder.setDataLongs(1, getHeartbeatTime());
remoteBuilder.setDataStrings(0, name);
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setSequence(remoteData.getDataIntegers(0));
setIsAddress(remoteData.getDataIntegers(1));
setAddressId(remoteData.getDataIntegers(2));
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
setName(remoteData.getDataStrings(1));
}
public static class Builder implements StorageBuilder<ServiceInventory> {
@Override public ServiceInventory map2Data(Map<String, Object> dbMap) {
ServiceInventory endpointInventory = new ServiceInventory();
endpointInventory.setSequence((Integer)dbMap.get(SEQUENCE));
endpointInventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
endpointInventory.setName((String)dbMap.get(NAME));
endpointInventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
endpointInventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
endpointInventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return endpointInventory;
}
@Override public Map<String, Object> data2Map(ServiceInventory storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SEQUENCE, storageData.getSequence());
map.put(IS_ADDRESS, storageData.getIsAddress());
map.put(NAME, storageData.getName());
map.put(ADDRESS_ID, storageData.getAddressId());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
public class EndpointInventoryRegister implements IEndpointInventoryRegister {
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryRegister.class);
private final ModuleManager moduleManager;
private EndpointInventoryCache cacheService;
public EndpointInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private EndpointInventoryCache getCacheService() {
if (isNull(cacheService)) {
cacheService = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
}
return cacheService;
}
@Override public int getOrCreate(int serviceId, String endpointName, int srcSpanType) {
int endpointId = getCacheService().get(serviceId, endpointName, srcSpanType);
if (endpointId == Const.NONE) {
EndpointInventory endpointInventory = new EndpointInventory();
endpointInventory.setServiceId(serviceId);
endpointInventory.setName(endpointName);
endpointInventory.setSrcSpanType(srcSpanType);
long now = System.currentTimeMillis();
endpointInventory.setRegisterTime(now);
endpointInventory.setHeartbeatTime(now);
}
return endpointId;
}
@Override public int get(int serviceId, String endpointName, int srcSpanType) {
return getCacheService().get(serviceId, endpointName, srcSpanType);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface IEndpointInventoryRegister extends Service {
int getOrCreate(int serviceId, String endpointName, int srcSpanType);
int get(int serviceId, String endpointName, int srcSpanType);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface IServiceInventoryRegister extends Service {
int getOrCreateByServiceName(String serviceName);
// int getOrCreateByAddressId(int addressId, String networkAddress);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
public class ServiceInventoryRegister implements IServiceInventoryRegister {
private final ModuleManager moduleManager;
private ServiceInventoryCache serviceInventoryCache;
public ServiceInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private ServiceInventoryCache getServiceInventoryCache() {
if (isNull(serviceInventoryCache)) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
}
return serviceInventoryCache;
}
@Override public int getOrCreateByServiceName(String serviceName) {
int serviceId = getServiceInventoryCache().get(serviceName);
if (serviceId == Const.NONE) {
ServiceInventory serviceInventory = new ServiceInventory();
serviceInventory.setName(serviceName);
serviceInventory.setAddressId(Const.NONE);
serviceInventory.setIsAddress(BooleanUtils.FALSE);
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
InventoryProcess.INSTANCE.in(serviceInventory);
}
return serviceId;
}
}
......@@ -55,6 +55,8 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
messageNum++;
if (!sources.containsKey(source)) {
sources.put(source, source);
} else {
sources.get(source).combine(source);
}
......
......@@ -53,23 +53,24 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
if (!sources.containsKey(registerSource)) {
sources.put(registerSource, registerSource);
}
if (registerSource.getEndOfBatchContext().isEndOfBatch()) {
if (registerLockDAO.tryLock(scope)) {
try {
sources.values().forEach(source -> {
try {
RegisterSource newSource = registerDAO.get(modelName, registerSource.id());
RegisterSource newSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(newSource)) {
newSource.combine(newSource);
int sequence = registerDAO.max(modelName);
newSource.setSequence(sequence);
registerDAO.forceInsert(modelName, newSource);
} else {
registerDAO.forceUpdate(modelName, newSource);
} else {
int sequence = registerDAO.max(modelName);
source.setSequence(sequence);
registerDAO.forceInsert(modelName, source);
}
} catch (Throwable t) {
logger.error(t.getMessage());
logger.error(t.getMessage(), t);
}
});
} finally {
......
......@@ -49,7 +49,7 @@ public class StreamDataAnnotationContainer implements StreamDataClassGetter {
}
}
public int findIdByClass(Class streamDataClass) {
@Override public int findIdByClass(Class streamDataClass) {
return classMap.get(streamDataClass);
}
......
......@@ -26,5 +26,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface StreamDataClassGetter extends Service {
int findIdByClass(Class streamDataClass);
Class<StreamData> findClassById(int id);
}
......@@ -23,9 +23,9 @@ import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.slf4j.*;
......@@ -39,11 +39,11 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
private final GRPCClient client;
private final DataCarrier<RemoteMessage> carrier;
private final StreamDataAnnotationContainer streamDataMapper;
private final StreamDataClassGetter streamDataClassGetter;
public GRPCRemoteClient(StreamDataAnnotationContainer streamDataMapper, RemoteInstance remoteInstance, int channelSize,
public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
int bufferSize) {
this.streamDataMapper = streamDataMapper;
this.streamDataClassGetter = streamDataClassGetter;
this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
this.carrier = new DataCarrier<>(channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
......@@ -51,7 +51,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
@Override public void push(int nextWorkerId, StreamData streamData) {
int streamDataId = streamDataMapper.findIdByClass(streamData.getClass());
int streamDataId = streamDataClassGetter.findIdByClass(streamData.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setNextWorkerId(nextWorkerId);
builder.setStreamDataId(streamDataId);
......
......@@ -20,8 +20,9 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
......@@ -33,7 +34,7 @@ public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleManager moduleManager;
private StreamDataAnnotationContainer indicatorMapper;
private StreamDataClassGetter streamDataClassGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
......@@ -48,11 +49,14 @@ public class RemoteClientManager implements Service {
public void start() {
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
this.indicatorMapper = moduleManager.find(ClusterModule.NAME).getService(StreamDataAnnotationContainer.class);
this.streamDataClassGetter = moduleManager.find(CoreModule.NAME).getService(StreamDataClassGetter.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 2, TimeUnit.SECONDS);
}
private void refresh() {
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
Collections.sort(instanceList);
......@@ -98,7 +102,7 @@ public class RemoteClientManager implements Service {
if (remoteInstance.isSelf()) {
client = new SelfRemoteClient(remoteInstance.getHost(), remoteInstance.getPort());
} else {
client = new GRPCRemoteClient(indicatorMapper, remoteInstance, 1, 3000);
client = new GRPCRemoteClient(streamDataClassGetter, remoteInstance, 1, 3000);
}
}
getFreeClients().add(client);
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
......@@ -32,6 +33,6 @@ public class StorageModule extends ModuleDefine {
}
@Override public Class[] services() {
return new Class[] {IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class};
return new Class[] {IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class, IEndpointInventoryCacheDAO.class, IServiceInventoryCacheDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.cache;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
public interface IServiceInventoryCacheDAO extends DAO {
int get(String id);
ServiceInventory get(int sequence);
int getServiceIdByAddressId(int addressId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.util;
/**
* @author peng-yongsheng
*/
public class BooleanUtils {
public static final int TRUE = 1;
public static final int FALSE = 0;
public static boolean valueToBoolean(int value) {
if (TRUE == value) {
return true;
} else if (FALSE == value) {
return false;
} else {
throw new RuntimeException("Boolean value error, must be 0 or 1");
}
}
public static int booleanToValue(Boolean booleanValue) {
if (booleanValue) {
return TRUE;
} else {
return FALSE;
}
}
}
......@@ -34,9 +34,15 @@
<module>zipkin-receiver-plugin</module>
<module>skywalking-mesh-receiver-plugin</module>
<module>skywalking-istio-telemetry-receiver-plugin</module>
<module>skywalking-register-receiver-plugin</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-module</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author peng-yongsheng
*/
public class RegisterModule extends ModuleDefine {
@Override public String name() {
return "receiver-register";
}
@Override public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.register.module.RegisterModule;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.*;
/**
* @author peng-yongsheng
*/
public class RegisterModuleProvider extends ModuleProvider {
@Override public String name() {
return "default";
}
@Override public Class<? extends ModuleDefine> module() {
return RegisterModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return null;
}
@Override public void prepare() {
}
@Override public void start() {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
}
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(ApplicationRegisterHandler.class);
private final IServiceInventoryRegister serviceInventoryRegister;
public ApplicationRegisterHandler(ModuleManager moduleManager) {
serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
}
@Override
public void applicationCodeRegister(Application request, StreamObserver<ApplicationMapping> responseObserver) {
if (logger.isDebugEnabled()) {
logger.debug("register application");
}
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
String serviceName = request.getApplicationCode();
int serviceId = serviceInventoryRegister.getOrCreateByServiceName(serviceName);
if (serviceId != Const.NONE) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(serviceName).setValue(serviceId).build();
builder.setApplication(value);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryHandler extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryHandler.class);
private final IEndpointInventoryRegister inventoryService;
public ServiceNameDiscoveryHandler(ModuleManager moduleManager) {
this.inventoryService = moduleManager.find(CoreModule.NAME).getService(IEndpointInventoryRegister.class);
}
@Override public void discovery(ServiceNameCollection request,
StreamObserver<ServiceNameMappingCollection> responseObserver) {
List<ServiceNameElement> serviceNameElementList = request.getElementsList();
ServiceNameMappingCollection.Builder builder = ServiceNameMappingCollection.newBuilder();
for (ServiceNameElement serviceNameElement : serviceNameElementList) {
int serviceId = serviceNameElement.getApplicationId();
String endpointName = serviceNameElement.getServiceName();
int srcSpanType = serviceNameElement.getSrcSpanTypeValue();
int endpointId = inventoryService.get(serviceId, endpointName, srcSpanType);
if (endpointId != Const.NONE) {
ServiceNameMappingElement.Builder mappingElement = ServiceNameMappingElement.newBuilder();
mappingElement.setServiceId(endpointId);
mappingElement.setElement(serviceNameElement);
builder.addElements(mappingElement);
}
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.register.module.RegisterModule
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.register.provider.RegisterModuleProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
import io.grpc.*;
import org.apache.skywalking.apm.network.language.agent.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterHandlerTestCase {
private static final Logger logger = LoggerFactory.getLogger(ApplicationRegisterHandlerTestCase.class);
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application.Builder application = Application.newBuilder();
application.setApplicationCode("dubbox-consumer");
ApplicationMapping applicationMapping = stub.applicationCodeRegister(application.build());
logger.debug("application id: {}", applicationMapping.getApplication().getValue());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
import io.grpc.*;
import org.apache.skywalking.apm.network.language.agent.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryHandlerTestCase {
private static final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryHandlerTestCase.class);
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub stub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
serviceNameElement.setApplicationId(1);
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
serviceNameElement.setSrcSpanType(SpanType.Entry);
serviceNameCollection.addElements(serviceNameElement);
ServiceNameMappingCollection collection = stub.discovery(serviceNameCollection.build());
for (ServiceNameMappingElement element : collection.getElementsList()) {
logger.debug("service id: {}", element.getServiceId());
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -69,6 +69,11 @@
<artifactId>skywalking-istio-telemetry-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- storage module -->
......
......@@ -49,6 +49,8 @@ storage:
hourMetricDataTTL: 36 # Unit is hour
dayMetricDataTTL: 45 # Unit is day
monthMetricDataTTL: 18 # Unit is month
receiver-register:
default:
service-mesh:
default:
istio-telemetry:
......
......@@ -29,6 +29,7 @@
<logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
......
......@@ -19,10 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*;
import org.slf4j.*;
......@@ -65,6 +67,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient, 1000));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
}
@Override
......
......@@ -91,7 +91,7 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
return 1;
} else {
return id;
}
......
......@@ -18,31 +18,34 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCacheEsDAO.class);
public EndpointInventoryCacheEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public int get(String id) {
try {
GetResponse response = getClient().get("", id);
GetResponse response = getClient().get(EndpointInventory.MODEL_NAME, id);
if (response.isExists()) {
return response.getField(RegisterSource.SEQUENCE).getValue();
} else {
return 0;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return 0;
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.GetResponse;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCacheEsDAO.class);
public ServiceInventoryCacheEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public int get(String id) {
try {
GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, id);
if (response.isExists()) {
return response.getField(RegisterSource.SEQUENCE).getValue();
} else {
return 0;
}
} catch (Throwable e) {
logger.error(e.getMessage());
return 0;
}
}
@Override public ServiceInventory get(int sequence) {
return null;
}
@Override public int getServiceIdByAddressId(int addressId) {
return 0;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
......@@ -47,8 +48,10 @@ public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
try {
GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
if (response.isExists()) {
long expire = response.getField(RegisterLockIndex.COLUMN_EXPIRE).getValue();
boolean lockable = response.getField(RegisterLockIndex.COLUMN_LOCKABLE).getValue();
Map<String, Object> source = response.getSource();
long expire = (long)source.get(RegisterLockIndex.COLUMN_EXPIRE);
boolean lockable = (boolean)source.get(RegisterLockIndex.COLUMN_LOCKABLE);
long version = response.getVersion();
if (lockable) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册