AppAndServiceRegisterClient.java 8.2 KB
Newer Older
1
/*
wu-sheng's avatar
wu-sheng 已提交
2 3 4 5 6 7
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
8 9 10 11 12 13 14 15 16 17 18
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

19
package org.apache.skywalking.apm.agent.core.remote;
20

21
import io.grpc.Channel;
22
import org.apache.skywalking.apm.agent.core.boot.BootService;
23
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
24 25 26
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
27 28
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
29 30 31
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
32
import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
33 34 35
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
36
import org.apache.skywalking.apm.agent.core.os.OSUtil;
37
import org.apache.skywalking.apm.network.proto.*;
wu-sheng's avatar
wu-sheng 已提交
38
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
39

40 41 42 43 44
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

45 46 47
/**
 * @author wusheng
 */
48
@DefaultImplementor
49 50
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
    private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
wu-sheng's avatar
wu-sheng 已提交
51
    private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");
52 53 54 55 56

    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
    private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
    private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
    private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
57
    private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub;
58 59 60 61 62
    private volatile ScheduledFuture<?> applicationRegisterFuture;
    private volatile long lastSegmentTime = -1;

    @Override
    public void statusChanged(GRPCChannelStatus status) {
63
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
64
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
65 66 67
            applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
            instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
            serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
68
            networkAddressRegisterServiceBlockingStub = NetworkAddressRegisterServiceGrpc.newBlockingStub(channel);
69 70 71 72 73 74 75 76 77
        } else {
            applicationRegisterServiceBlockingStub = null;
            instanceDiscoveryServiceBlockingStub = null;
            serviceNameDiscoveryServiceBlockingStub = null;
        }
        this.status = status;
    }

    @Override
78
    public void prepare() throws Throwable {
79 80 81 82 83 84
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
    }

    @Override
    public void boot() throws Throwable {
        applicationRegisterFuture = Executors
85
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
wu-sheng's avatar
wu-sheng 已提交
86 87 88 89 90
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
91 92 93
    }

    @Override
94
    public void onComplete() throws Throwable {
95 96 97
        TracingContext.ListenerManager.add(this);
    }

98 99 100 101 102
    @Override
    public void shutdown() throws Throwable {
        applicationRegisterFuture.cancel(true);
    }

103 104
    @Override
    public void run() {
105
        logger.debug("AppAndServiceRegisterClient running, status:{}.", status);
wu-sheng's avatar
wu-sheng 已提交
106
        boolean shouldTry = true;
107
        while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
wu-sheng's avatar
wu-sheng 已提交
108
            shouldTry = false;
109 110 111
            try {
                if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
                    if (applicationRegisterServiceBlockingStub != null) {
112
                        ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
wu-sheng's avatar
wu-sheng 已提交
113
                            Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
114 115
                        if (applicationMapping != null) {
                            RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
wu-sheng's avatar
wu-sheng 已提交
116
                            shouldTry = true;
117 118 119 120 121 122
                        }
                    }
                } else {
                    if (instanceDiscoveryServiceBlockingStub != null) {
                        if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {

wu-sheng's avatar
wu-sheng 已提交
123
                            ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
124
                                .setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
wu-sheng's avatar
wu-sheng 已提交
125
                                .setAgentUUID(PROCESS_UUID)
126
                                .setRegisterTime(System.currentTimeMillis())
127
                                .setOsinfo(OSUtil.buildOSInfo())
128 129 130 131 132 133
                                .build());
                            if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
                                RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
                                    = instanceMapping.getApplicationInstanceId();
                            }
                        } else {
134
                            if ( System.currentTimeMillis() - lastSegmentTime > 60 * 1000) {
135
                                instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
136
                                    .setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
137
                                    .setHeartbeatTime(System.currentTimeMillis())
138 139 140
                                    .build());
                            }

141
                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
                            OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
                        }
                    }
                }
            } catch (Throwable t) {
                logger.error(t, "AppAndServiceRegisterClient execute fail.");
                ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
            }
        }
    }

    @Override
    public void afterFinished(TraceSegment traceSegment) {
        lastSegmentTime = System.currentTimeMillis();
    }
}