/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package org.apache.skywalking.apm.agent.core.remote; import io.grpc.Channel; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; import org.apache.skywalking.apm.agent.core.boot.ServiceManager; import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig; import org.apache.skywalking.apm.agent.core.context.TracingContext; import org.apache.skywalking.apm.agent.core.context.TracingContextListener; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary; import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.agent.core.os.OSUtil; import org.apache.skywalking.apm.network.proto.*; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * @author wusheng */ @DefaultImplementor public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener { private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class); private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", ""); private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub; private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub; private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub; private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub; private volatile ScheduledFuture applicationRegisterFuture; private volatile long lastSegmentTime = -1; @Override public void statusChanged(GRPCChannelStatus status) { if (GRPCChannelStatus.CONNECTED.equals(status)) { Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel); instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel); serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel); networkAddressRegisterServiceBlockingStub = NetworkAddressRegisterServiceGrpc.newBlockingStub(channel); } else { applicationRegisterServiceBlockingStub = null; instanceDiscoveryServiceBlockingStub = null; serviceNameDiscoveryServiceBlockingStub = null; } this.status = status; } @Override public void prepare() throws Throwable { ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); } @Override public void boot() throws Throwable { applicationRegisterFuture = Executors .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient")) .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() { @Override public void handle(Throwable t) { logger.error("unexpected exception.", t); } }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS); } @Override public void onComplete() throws Throwable { TracingContext.ListenerManager.add(this); } @Override public void shutdown() throws Throwable { applicationRegisterFuture.cancel(true); } @Override public void run() { logger.debug("AppAndServiceRegisterClient running, status:{}.", status); boolean shouldTry = true; while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) { shouldTry = false; try { if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) { if (applicationRegisterServiceBlockingStub != null) { ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister( Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build()); if (applicationMapping != null) { RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue(); shouldTry = true; } } } else { if (instanceDiscoveryServiceBlockingStub != null) { if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) { ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder() .setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID) .setAgentUUID(PROCESS_UUID) .setRegisterTime(System.currentTimeMillis()) .setOsinfo(OSUtil.buildOSInfo()) .build()); if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) { RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID = instanceMapping.getApplicationInstanceId(); } } else { if ( System.currentTimeMillis() - lastSegmentTime > 60 * 1000) { instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder() .setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID) .setHeartbeatTime(System.currentTimeMillis()) .build()); } NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub); OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub); } } } } catch (Throwable t) { logger.error(t, "AppAndServiceRegisterClient execute fail."); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t); } } } @Override public void afterFinished(TraceSegment traceSegment) { lastSegmentTime = System.currentTimeMillis(); } }