提交 b11d4793 编写于 作者: 刘新元 Liu XinYuan 提交者: wu-sheng

Add service and instance reset function (#1790)

* Manually trigger agent registration and write registration status to file

* Delete unused variables

* Code optimization, encapsulating some methods

* add clear DataCarrier function

* revert SkyWalkingAgent

* Optimize code and logic, including updating applicatin_id in unRegisterOperationNames

* Server end metadata loss notification

* Server end metadata loss notification

* fix file stream close and other problem

* revert to old

* fix some bug
Signed-off-by: NLiu-XinYuan <879928098@qq.com>

* fix some bugs in agent reset

* add close inputStream and fileChannel and fix some bug

* fixed some bugs
上级 072ee397
......@@ -152,4 +152,10 @@ public class DataCarrier<T> {
consumerPool.close();
}
}
public void clear() {
for (int i = 0; i < channels.getChannelSize(); i++) {
channels.getBuffer(i).clear();
}
}
}
......@@ -91,4 +91,10 @@ public class Buffer<T> {
return result;
}
public void clear() {
for (Object obj : buffer) {
obj = null;
}
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.conf;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
......@@ -70,6 +69,21 @@ public class Config {
* Skywalking team may ask for these files in order to resolve compatible problem.
*/
public static boolean IS_OPEN_DEBUGGING_CLASS = false;
/**
* Specify register.status dir ,This is an option, the default is AGENT_HOME/option/reset.status.
*/
public static String REGISTER_STATUS_DIR = "skywalking-agent/option";
/**
* Specify instance_uuid to ensure that the whole show is unique, for example: applicationName_ip_12
*/
public static String INSTANCE_UUID = "";
/**
* enabled means that the reset function is enabled, and disabled means that the reset function is not enabled. A reset can be triggered by modifying the configuration file only if the reset feature is enabled.
*/
public static String RESETER_LISTENER = "disabled";
}
public static class Collector {
......@@ -80,7 +94,7 @@ public class Config {
/**
* application and service registry check interval
*/
public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 3;
public static long SERVICE_AND_ENDPOINT_REGISTER_CHECK_INTERVAL = 3;
/**
* Collector skywalking trace receiver service addresses.
*/
......
......@@ -41,24 +41,23 @@ import org.apache.skywalking.apm.util.StringUtil;
*/
public class SnifferConfigInitializer {
private static final ILog logger = LogManager.getLogger(SnifferConfigInitializer.class);
private static String SPECIFIED_CONFIG_PATH = "skywalking_config";
private static String DEFAULT_CONFIG_FILE_NAME = "/config/agent.config";
private static String ENV_KEY_PREFIX = "skywalking.";
private static final String SPECIFIED_CONFIG_PATH = "skywalking_config";
private static final String DEFAULT_CONFIG_FILE_NAME = "/config/agent.config";
private static final String ENV_KEY_PREFIX = "skywalking.";
private static final String INSTANCE_UUID_NAME = "agent.instance_uuid";
private static final String REGISTER_STATUS_DIR = "agent.register_status_dir";
private static boolean IS_INIT_COMPLETED = false;
/**
* If the specified agent config path is set, the agent will try to locate the specified agent config.
* If the specified agent config path is not set , the agent will try to locate `agent.config`, which should be in the /config dictionary of agent package.
* <p>
* Also try to override the config by system.env and system.properties. All the keys in these two places should
* start with {@link #ENV_KEY_PREFIX}. e.g. in env `skywalking.agent.application_code=yourAppName` to override
* `agent.application_code` in config file.
* <p>
* At the end, `agent.application_code` and `collector.servers` must be not blank.
* If the specified agent config path is set, the agent will try to locate the specified agent config. If the
* specified agent config path is not set , the agent will try to locate `agent.config`, which should be in the
* /config dictionary of agent package. <p> Also try to override the config by system.env and system.properties. All
* the keys in these two places should start with {@link #ENV_KEY_PREFIX}. e.g. in env
* `skywalking.agent.application_code=yourAppName` to override `agent.application_code` in config file. <p> At the
* end, `agent.application_code` and `collector.servers` must be not blank.
*/
public static void initialize() throws ConfigNotFoundException, AgentPackageNotFoundException {
InputStreamReader configFileStream;
try {
configFileStream = loadConfig();
Properties properties = new Properties();
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.dictionary;
import io.netty.util.internal.ConcurrentSet;
......@@ -49,6 +48,12 @@ public enum NetworkAddressDictionary {
}
}
public void clearApplicationDictionary() {
unRegisterApplications.clear();
applicationDictionary.clear();
}
public void syncRemoteDictionary(
NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub) {
if (unRegisterApplications.size() > 0) {
......
......@@ -61,6 +61,13 @@ public enum OperationNameDictionary {
}
}
public void clearOperationNameDictionary() {
unRegisterOperationNames.clear();
operationNameDictionary.clear();
}
public void syncRemoteDictionary(
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub) {
if (unRegisterOperationNames.size() > 0) {
......@@ -106,6 +113,10 @@ public enum OperationNameDictionary {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
public String getOperationName() {
return operationName;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.listener;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
* @author liu-xinyuan
**/
@DefaultImplementor
public class ResetConfListener implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(ResetConfListener.class);
private File configFile = null;
@Override public void prepare() throws Throwable {
}
@Override public void boot() {
if ("enabled".equals(Config.Agent.RESETER_LISTENER)) {
Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ResetConfListener"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.SERVICE_AND_ENDPOINT_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
} else {
logger.info("Since the agent.register_status variable is not set correctly, the reset service is not started -> Agent reset service is inactive.");
}
}
@Override public void onComplete() throws Throwable {
}
@Override public void shutdown() throws Throwable {
}
@Override public void run() {
logger.debug("ResetConfListener running.");
try {
if (Reseter.INSTANCE.predicateReset())
Reseter.INSTANCE.setStatus(ResetStatus.DONE).clearID().reportToRegisterFile();
} catch (SecurityException e) {
logger.warn(e, "Denise read access to the file {}", configFile);
} catch (FileNotFoundException e) {
logger.warn(e, "not found file {}", configFile);
} catch (IOException e) {
logger.warn(e.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.listener;
/**
* @author liu-xinyuan
**/
public enum ResetStatus {
OFF("OFF"), ON("ON"), DONE("DONE");
private String label;
ResetStatus(String label) {
this.label = label;
}
public String value() {
return this.label;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.listener;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Properties;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
/**
* @author liu-xinyuan
**/
public enum Reseter {
INSTANCE;
private static final ILog logger = LogManager.getLogger(Reseter.class);
private static final String APPLICATION_ID_NAM = "application_id";
private static final String INSTANCE_ID_NAME = "instance_id";
private static final String STATUS_NAME = "status";
private static final String RESET_CHILD_DIR = "/reset.status";
private static final String COMMENT = "#Status has three values: ON (trigger reset), DONE(reset complete), OFF(agent fist boot).\n" +
"Application_id: application_id of the current agent.\n" +
"Instance_id: the instanceid of the current agent.";
private volatile Properties properties = new Properties();
private String resetPath;
private ResetStatus status = ResetStatus.OFF;
private int isFirstRun = 0;
private int detectDuration = 5;
public Reseter setStatus(ResetStatus status) {
this.status = status;
return this;
}
public String getResetPath() throws IOException {
if (isFirstRun == 0) {
File statusDir = new File(Config.Agent.REGISTER_STATUS_DIR);
if (statusDir.exists() && statusDir.isDirectory()) {
resetPath = statusDir.getAbsolutePath() + RESET_CHILD_DIR;
} else {
statusDir.mkdir();
}
init();
isFirstRun = 1;
}
return resetPath;
}
public void reportToRegisterFile() throws IOException {
FileOutputStream outputStream = null;
try {
File configFile = new File(resetPath);
properties.setProperty(APPLICATION_ID_NAM, RemoteDownstreamConfig.Agent.APPLICATION_ID + "");
properties.setProperty(INSTANCE_ID_NAME, RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID + "");
properties.setProperty(STATUS_NAME, status.value());
outputStream = new FileOutputStream(configFile);
properties.store(outputStream, COMMENT);
} finally {
closeFileStream(outputStream);
}
}
public Reseter clearID() {
RemoteDownstreamConfig.Agent.APPLICATION_ID = DictionaryUtil.nullValue();
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID = DictionaryUtil.nullValue();
OperationNameDictionary.INSTANCE.clearOperationNameDictionary();
NetworkAddressDictionary.INSTANCE.clearApplicationDictionary();
ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class).clearCache();
status = ResetStatus.DONE;
logger.info("clear id successfully,begin trigger reset.");
return this;
}
public Boolean predicateReset() throws IOException {
File resetFile = new File(getResetPath());
FileInputStream inputStream = null;
FileLock fileLock = null;
FileChannel fileChannel = null;
if (System.currentTimeMillis() - resetFile.lastModified() < detectDuration * 1000) {
try {
logger.info("The file reset.status was detected to have been modified in the last {} seconds.", detectDuration);
inputStream = new FileInputStream(resetFile);
fileChannel = inputStream.getChannel();
fileLock = fileChannel.tryLock(0, resetFile.length(), true);
if (fileLock == null) {
return false;
}
properties.clear();
properties.load(inputStream);
} finally {
fileLock.release();
fileChannel.close();
closeFileStream(inputStream);
}
if (properties.get(STATUS_NAME) != null && properties.getProperty(STATUS_NAME).equals(ResetStatus.ON.value())) {
return true;
}
}
return false;
}
public void init() throws IOException {
FileOutputStream outputStream = null;
try {
properties.setProperty(APPLICATION_ID_NAM, RemoteDownstreamConfig.Agent.APPLICATION_ID + "");
properties.setProperty(INSTANCE_ID_NAME, RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID + "");
properties.setProperty(STATUS_NAME, status.value());
File file = new File(resetPath);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdir();
}
outputStream = new FileOutputStream(file);
properties.store(outputStream, COMMENT);
} finally {
closeFileStream(outputStream);
}
}
public void closeFileStream(Closeable stream) throws IOException {
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
throw new IOException("file close failed.", e);
}
} else {
throw new IOException("create file outputstream failed");
}
}
}
......@@ -31,16 +31,21 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.listener.ResetStatus;
import org.apache.skywalking.apm.agent.core.listener.Reseter;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.register.ServiceInstancePingGrpc;
import org.apache.skywalking.apm.network.register.ServiceInstancePingPkg;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.util.StringUtil;
/**
* @author wusheng
......@@ -48,13 +53,13 @@ import java.util.concurrent.TimeUnit;
@DefaultImplementor
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub;
private volatile ServiceInstancePingGrpc.ServiceInstancePingBlockingStub serviceInstancePingBlockingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
private volatile long lastSegmentTime = -1;
......@@ -66,6 +71,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
networkAddressRegisterServiceBlockingStub = NetworkAddressRegisterServiceGrpc.newBlockingStub(channel);
serviceInstancePingBlockingStub = ServiceInstancePingGrpc.newBlockingStub(channel);
} else {
applicationRegisterServiceBlockingStub = null;
instanceDiscoveryServiceBlockingStub = null;
......@@ -82,13 +88,13 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.SERVICE_AND_ENDPOINT_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......@@ -105,15 +111,17 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
public void run() {
logger.debug("AppAndServiceRegisterClient running, status:{}.", status);
boolean shouldTry = true;
String instanceUUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString().replaceAll("-", "") : Config.Agent.INSTANCE_UUID;
while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
try {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping != null) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
Reseter.INSTANCE.reportToRegisterFile();
shouldTry = true;
}
}
......@@ -122,21 +130,24 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(PROCESS_UUID)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(OSUtil.buildOSInfo())
.build());
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(instanceUUID)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(OSUtil.buildOSInfo())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
= instanceMapping.getApplicationInstanceId();
Reseter.INSTANCE.setStatus(ResetStatus.OFF).reportToRegisterFile();
}
} else {
if (lastSegmentTime - System.currentTimeMillis() > 60 * 1000) {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
serviceInstancePingBlockingStub.doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setServiceInstanceUUID(instanceUUID)
.setTime(System.currentTimeMillis())
.build());
}
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
......
......@@ -25,9 +25,11 @@ import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
......@@ -155,7 +157,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
if (traceSegment.isIgnore() || RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue() || RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
return;
}
if (!carrier.produce(traceSegment)) {
......@@ -173,4 +175,9 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
this.status = status;
}
public void clearCache() {
carrier.clear();
}
}
......@@ -23,3 +23,4 @@ org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.listener.ResetConfListener
\ No newline at end of file
......@@ -55,7 +55,7 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(7));
assertThat(registryService.size(), is(8));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
......
......@@ -43,3 +43,12 @@ collector.backend_service=127.0.0.1:10800
# Logging level
logging.level=DEBUG
#Specify register.status dir,if dir not exists or it is a file then default AGENT_HOME/option
#agent.register_status_dir=register_dir
#Specify instance_uuid to ensure that the whole show is unique, for example: applicationName_ip_12
#agent.instance_uuid = applicationName_ip_1
#enabled means that the reset function is enabled, and disabled means that the reset function is not enabled. A reset can be triggered by modifying the configuration file only if the reset feature is enabled.
#agent.reseter_listener = disabled
......@@ -109,4 +109,4 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
logger.warn("Service instance {} heartbeat, but not found in storage.");
}
}
}
}
\ No newline at end of file
Subproject commit 1122e97b5604ae96447bd58ecdb248d7e02952aa
Subproject commit 3a83be79a9c23aad6576ed2a4a04b82de6d7a829
......@@ -22,7 +22,6 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.register.module.RegisterModule;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.grpc.*;
import org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.rest.*;
/**
......@@ -46,12 +45,6 @@ public class RegisterModuleProvider extends ModuleProvider {
}
@Override public void start() {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.register.ServiceInstancePingGrpc;
import org.apache.skywalking.apm.network.register.ServiceInstancePingPkg;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServiceInstancePingPkgHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingPkgHandler.class);
private final IServiceInstanceInventoryCacheDAO instanceInventoryCacheDAO;
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
public ServiceInstancePingPkgHandler(ModuleManager moduleManager) {
this.instanceInventoryCacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
}
@Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {
ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCacheDAO.get(request.getServiceInstanceId());
if (request.getServiceInstanceUUID().equals(serviceInstanceInventory.getName()) || serviceInstanceInventory.getServiceId() == Const.NONE) {
logger.error("Your metadata loss,please set the status in reset.status in the agent {} to ON to trigger a reset!", request.getServiceInstanceUUID());
} else {
serviceInstanceInventoryRegister.heartbeat(request.getServiceInstanceId(), request.getTime());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册