提交 be32ab22 编写于 作者: wu-sheng's avatar wu-sheng

Finish codes about sync dictionary of Application and ServiceName.

上级 046102ef
......@@ -4,6 +4,10 @@ import io.netty.util.internal.ConcurrentSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
/**
* Map of application id to application code, which is from the collector side.
......@@ -13,15 +17,29 @@ import java.util.concurrent.ConcurrentHashMap;
public enum ApplicationDictionary {
INSTANCE;
private Map<String, Integer> applicationDictionary = new ConcurrentHashMap<String, Integer>();
private Set<String> unRegisterApplication = new ConcurrentSet<String>();
private Set<String> unRegisterApplications = new ConcurrentSet<String>();
public PossibleFound find(String applicationCode) {
Integer applicationId = applicationDictionary.get(applicationCode);
if (applicationId != null) {
return new Found(applicationId);
} else {
unRegisterApplication.add(applicationCode);
unRegisterApplications.add(applicationCode);
return new NotFound();
}
}
public void syncRemoteDictionary(
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub) {
if (unRegisterApplications.size() > 0) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addAllApplicationCode(unRegisterApplications).build());
if (applicationMapping.getApplicationCount() > 0) {
for (KeyWithIntegerValue keyWithIntegerValue : applicationMapping.getApplicationList()) {
unRegisterApplications.remove(keyWithIntegerValue.getKey());
applicationDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
}
}
}
}
}
......@@ -4,6 +4,11 @@ import io.netty.util.internal.ConcurrentSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.ServiceNameMappingElement;
/**
* @author wusheng
......@@ -11,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
public enum OperationNameDictionary {
INSTANCE;
private Map<OperationNameKey, Integer> operationNameDictionary = new ConcurrentHashMap<OperationNameKey, Integer>();
private Set<OperationNameKey> unRegisterOperationName = new ConcurrentSet<OperationNameKey>();
private Set<OperationNameKey> unRegisterOperationNames = new ConcurrentSet<OperationNameKey>();
public PossibleFound find(int applicationId, String operationName) {
OperationNameKey key = new OperationNameKey(applicationId, operationName);
......@@ -19,11 +24,35 @@ public enum OperationNameDictionary {
if (operationId != null) {
return new Found(applicationId);
} else {
unRegisterOperationName.add(key);
unRegisterOperationNames.add(key);
return new NotFound();
}
}
public void syncRemoteDictionary(
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub) {
if (unRegisterOperationNames.size() > 0) {
ServiceNameCollection.Builder builder = ServiceNameCollection.newBuilder();
for (OperationNameKey operationNameKey : unRegisterOperationNames) {
ServiceNameElement serviceNameElement = ServiceNameElement.newBuilder()
.setApplicationId(operationNameKey.getApplicationId())
.setServiceName(operationNameKey.getOperationName())
.build();
builder.addElements(serviceNameElement);
}
ServiceNameMappingCollection serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(builder.build());
if (serviceNameMappingCollection.getElementsCount() > 0) {
for (ServiceNameMappingElement serviceNameMappingElement : serviceNameMappingCollection.getElementsList()) {
OperationNameKey key = new OperationNameKey(
serviceNameMappingElement.getElement().getApplicationId(),
serviceNameMappingElement.getElement().getServiceName());
unRegisterOperationNames.remove(key);
operationNameDictionary.put(key, serviceNameMappingElement.getServiceId());
}
}
}
}
private class OperationNameKey {
private int applicationId;
private String operationName;
......@@ -33,6 +62,14 @@ public enum OperationNameDictionary {
this.operationName = operationName;
}
public int getApplicationId() {
return applicationId;
}
public String getOperationName() {
return operationName;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
......
......@@ -11,7 +11,11 @@ import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.TracingContext;
import org.skywalking.apm.agent.core.context.TracingContextListener;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.dictionary.ApplicationDictionary;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
......@@ -20,16 +24,20 @@ import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
/**
* @author wusheng
*/
public class ApplicationRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
private volatile boolean needRegisterRecover = false;
private volatile long lastSegmentTime = -1;
......@@ -38,16 +46,16 @@ public class ApplicationRegisterClient implements BootService, GRPCChannelListen
public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
} else {
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()) {
needRegisterRecover = true;
}
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID != DictionaryUtil.nullValue()) {
needRegisterRecover = true;
}
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
} else {
applicationRegisterServiceBlockingStub = null;
instanceDiscoveryServiceBlockingStub = null;
serviceNameDiscoveryServiceBlockingStub = null;
}
this.status = status;
}
......@@ -72,42 +80,51 @@ public class ApplicationRegisterClient implements BootService, GRPCChannelListen
@Override
public void run() {
if (CONNECTED.equals(status)) {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication(0).getValue();
}
}
} else {
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
if (instanceDiscoveryServiceBlockingStub != null) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.register(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setRegisterTime(System.currentTimeMillis())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
try {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication(0).getValue();
}
}
} else {
if (needRegisterRecover) {
instanceDiscoveryServiceBlockingStub.registerRecover(ApplicationInstanceRecover.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setRegisterTime(System.currentTimeMillis())
.build());
} else {
if (lastSegmentTime - System.currentTimeMillis() > 60 * 1000) {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
if (instanceDiscoveryServiceBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.register(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setRegisterTime(System.currentTimeMillis())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
}
} else {
if (needRegisterRecover) {
instanceDiscoveryServiceBlockingStub.registerRecover(ApplicationInstanceRecover.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setRegisterTime(System.currentTimeMillis())
.build());
} else {
if (lastSegmentTime - System.currentTimeMillis() > 60 * 1000) {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
}
}
ApplicationDictionary.INSTANCE.syncRemoteDictionary(applicationRegisterServiceBlockingStub);
OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
}
}
}
} catch (Throwable t) {
logger.error(t, "AppAndServiceRegisterClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
......
......@@ -10,6 +10,9 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.logging.ILog;
......@@ -21,11 +24,12 @@ import org.skywalking.apm.logging.LogManager;
public class GRPCChannelManager implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(DiscoveryRestServiceClient.class);
private volatile Thread channelManagerThread = null;
private volatile ManagedChannel managedChannel = null;
private volatile long nextStartTime = 0;
private volatile ScheduledFuture<?> connectCheckFuture;
private volatile boolean reconnect = true;
private Random random = new Random();
private List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<GRPCChannelListener>());
private final int retryCycle = 30;
@Override
public void beforeBoot() throws Throwable {
......@@ -34,7 +38,9 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
this.connectInBackground(false);
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this, 0, retryCycle, TimeUnit.SECONDS);
}
@Override
......@@ -42,38 +48,9 @@ public class GRPCChannelManager implements BootService, Runnable {
}
private void connectInBackground(boolean forceStart) {
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
synchronized (this) {
if (forceStart) {
/**
* The startup has invoked in 30 seconds before, don't invoke again.
*/
if (System.currentTimeMillis() < nextStartTime) {
return;
}
}
resetNextStartTime();
if (channelManagerThread == null || !channelManagerThread.isAlive()) {
if (forceStart || managedChannel == null || managedChannel.isTerminated() || managedChannel.isShutdown()) {
if (managedChannel != null) {
managedChannel.shutdownNow();
notify(GRPCChannelStatus.DISCONNECT);
}
Thread channelManagerThread = new Thread(this, "ChannelManagerThread");
channelManagerThread.setDaemon(true);
channelManagerThread.start();
}
}
}
}
}
@Override
public void run() {
while (true) {
resetNextStartTime();
if (reconnect) {
if (RemoteDownstreamConfig.Collector.GRPC_SERVERS.size() > 0) {
int index = random.nextInt() % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
String server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
......@@ -85,17 +62,15 @@ public class GRPCChannelManager implements BootService, Runnable {
.maxInboundMessageSize(1024 * 1024 * 50)
.usePlaintext(true);
managedChannel = channelBuilder.build();
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
break;
return;
} catch (Throwable t) {
logger.error(t, "Create channel to {} fail.", server);
}
}
resetNextStartTime();
int waitTime = 5 * 1000;
logger.debug("Selected collector grpc service is not available. Wait {} millis to try", waitTime);
try2Sleep(waitTime);
logger.debug("Selected collector grpc service is not available. Wait {} seconds to retry", retryCycle);
}
}
......@@ -114,13 +89,17 @@ public class GRPCChannelManager implements BootService, Runnable {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
this.connectInBackground(true);
reconnect = true;
}
}
private void notify(GRPCChannelStatus status) {
for (GRPCChannelListener listener : listeners) {
listener.statusChanged(status);
try {
listener.statusChanged(status);
} catch (Throwable t) {
logger.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
}
}
}
......@@ -146,21 +125,4 @@ public class GRPCChannelManager implements BootService, Runnable {
}
return false;
}
private void resetNextStartTime() {
nextStartTime = System.currentTimeMillis() + 20 * 1000;
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
}
......@@ -4,4 +4,4 @@ org.skywalking.apm.agent.core.remote.CollectorDiscoveryService
org.skywalking.apm.agent.core.sampling.SamplingService
org.skywalking.apm.agent.core.remote.GRPCChannelManager
org.skywalking.apm.agent.core.jvm.JVMService
org.skywalking.apm.agent.core.remote.ApplicationRegisterClient
org.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册