提交 ef63bc63 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

The heartbeat implementation for all the scopes. (#1742)

上级 b6431900
......@@ -18,14 +18,17 @@
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.List;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import java.util.*;
import lombok.*;
@Setter(AccessLevel.PUBLIC)
@Getter(AccessLevel.PUBLIC)
public class Rules {
private List<AlarmRule> rules;
private List<String> webhooks;
public Rules() {
this.rules = new ArrayList<>();
this.webhooks = new ArrayList<>();
}
}
......@@ -18,11 +18,8 @@
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.io.*;
import java.util.*;
import org.yaml.snakeyaml.Yaml;
/**
......@@ -46,39 +43,42 @@ public class RulesReader {
public Rules readRules() {
Rules rules = new Rules();
Map rulesData = (Map)yamlData.get("rules");
if (rulesData != null) {
rules.setRules(new ArrayList<>());
rulesData.forEach((k, v) -> {
if (((String)k).endsWith("_rule")) {
AlarmRule alarmRule = new AlarmRule();
alarmRule.setAlarmRuleName((String)k);
Map settings = (Map)v;
Object indicatorName = settings.get("indicator-name");
if (indicatorName == null) {
throw new IllegalArgumentException("indicator-name can't be null");
}
if (Objects.nonNull(yamlData)) {
Map rulesData = (Map)yamlData.get("rules");
if (rulesData != null) {
rules.setRules(new ArrayList<>());
rulesData.forEach((k, v) -> {
if (((String)k).endsWith("_rule")) {
AlarmRule alarmRule = new AlarmRule();
alarmRule.setAlarmRuleName((String)k);
Map settings = (Map)v;
Object indicatorName = settings.get("indicator-name");
if (indicatorName == null) {
throw new IllegalArgumentException("indicator-name can't be null");
}
alarmRule.setIndicatorName((String)indicatorName);
alarmRule.setIncludeNames((ArrayList)settings.getOrDefault("include-names", new ArrayList(0)));
alarmRule.setThreshold(settings.get("threshold").toString());
alarmRule.setOp((String)settings.get("op"));
alarmRule.setPeriod((Integer)settings.getOrDefault("period", 1));
alarmRule.setCount((Integer)settings.getOrDefault("count", 1));
alarmRule.setSilencePeriod((Integer)settings.getOrDefault("silence-period", alarmRule.getPeriod()));
alarmRule.setMessage((String)settings.getOrDefault("message", "Alarm caused by Rule " + alarmRule.getAlarmRuleName()));
alarmRule.setIndicatorName((String)indicatorName);
alarmRule.setIncludeNames((ArrayList)settings.getOrDefault("include-names", new ArrayList(0)));
alarmRule.setThreshold(settings.get("threshold").toString());
alarmRule.setOp((String)settings.get("op"));
alarmRule.setPeriod((Integer)settings.getOrDefault("period", 1));
alarmRule.setCount((Integer)settings.getOrDefault("count", 1));
alarmRule.setSilencePeriod((Integer)settings.getOrDefault("silence-period", alarmRule.getPeriod()));
alarmRule.setMessage((String)settings.getOrDefault("message", "Alarm caused by Rule " + alarmRule.getAlarmRuleName()));
rules.getRules().add(alarmRule);
}
});
}
List webhooks = (List)yamlData.get("webhooks");
if (webhooks != null) {
rules.setWebhooks(new ArrayList<>());
webhooks.forEach(url -> {
rules.getWebhooks().add((String)url);
});
rules.getRules().add(alarmRule);
}
});
}
List webhooks = (List)yamlData.get("webhooks");
if (webhooks != null) {
rules.setWebhooks(new ArrayList<>());
webhooks.forEach(url -> {
rules.getWebhooks().add((String)url);
});
}
}
return rules;
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.register.service;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
......@@ -70,4 +71,15 @@ public class EndpointInventoryRegister implements IEndpointInventoryRegister {
@Override public int get(int serviceId, String endpointName) {
return getCacheService().getEndpointId(serviceId, endpointName);
}
@Override public void heartbeat(int endpointId, long heartBeatTime) {
EndpointInventory endpointInventory = getCacheService().get(endpointId);
if (Objects.nonNull(endpointInventory)) {
endpointInventory.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(endpointInventory);
} else {
logger.warn("Endpoint {} heartbeat, but not found in storage.");
}
}
}
......@@ -29,4 +29,6 @@ public interface IEndpointInventoryRegister extends Service {
int getOrCreate(int serviceId, String endpointName, DetectPoint detectPoint);
int get(int serviceId, String endpointName);
void heartbeat(int endpointId, long heartBeatTime);
}
......@@ -29,4 +29,6 @@ public interface INetworkAddressInventoryRegister extends Service {
int get(String networkAddress);
void update(int addressId, int srcLayer, int serverType);
void heartbeat(int addressId, long heartBeatTime);
}
......@@ -30,4 +30,6 @@ public interface IServiceInstanceInventoryRegister extends Service {
ServiceInstanceInventory.AgentOsInfo osInfo);
int getOrCreate(int serviceId, int addressId, long registerTime);
void heartbeat(int serviceInstanceId, long heartBeatTime);
}
......@@ -28,4 +28,6 @@ public interface IServiceInventoryRegister extends Service {
int getOrCreate(String serviceName);
int getOrCreate(int addressId);
void heartbeat(int serviceId, long heartBeatTime);
}
......@@ -18,11 +18,13 @@
package org.apache.skywalking.oap.server.core.register.service;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
import static java.util.Objects.*;
......@@ -31,6 +33,8 @@ import static java.util.Objects.*;
*/
public class NetworkAddressInventoryRegister implements INetworkAddressInventoryRegister {
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryRegister.class);
private final ModuleManager moduleManager;
private NetworkAddressInventoryCache networkAddressInventoryCache;
private IServiceInventoryRegister serviceInventoryRegister;
......@@ -111,4 +115,15 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
}
return true;
}
@Override public void heartbeat(int addressId, long heartBeatTime) {
NetworkAddressInventory networkAddress = getNetworkAddressInventoryCache().get(addressId);
if (Objects.nonNull(networkAddress)) {
networkAddress.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(networkAddress);
} else {
logger.warn("Network address {} heartbeat, but not found in storage.");
}
}
}
......@@ -18,11 +18,11 @@
package org.apache.skywalking.oap.server.core.register.service;
import org.apache.skywalking.oap.server.core.Const;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
......@@ -37,17 +37,17 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryRegister.class);
private final ModuleManager moduleManager;
private IServiceInstanceInventoryCacheDAO cacheDAO;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
public ServiceInstanceInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IServiceInstanceInventoryCacheDAO getCacheDAO() {
if (isNull(cacheDAO)) {
cacheDAO = moduleManager.find(StorageModule.NAME).getService(IServiceInstanceInventoryCacheDAO.class);
private ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
if (isNull(serviceInstanceInventoryCache)) {
serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
}
return cacheDAO;
return serviceInstanceInventoryCache;
}
@Override public int getOrCreate(int serviceId, String serviceInstanceName, long registerTime,
......@@ -56,7 +56,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
logger.debug("Get or create service instance by service instance name, service id: {}, service instance name: {}, registerTime: {}", serviceId, serviceInstanceName, registerTime);
}
int serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, serviceInstanceName);
int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, serviceInstanceName);
if (serviceInstanceId == Const.NONE) {
ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
......@@ -83,7 +83,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
logger.debug("get or create service instance by address id, service id: {}, address id: {}, registerTime: {}", serviceId, addressId, registerTime);
}
int serviceInstanceId = getCacheDAO().getServiceInstanceId(serviceId, addressId);
int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, addressId);
if (serviceInstanceId == Const.NONE) {
ServiceInstanceInventory serviceInstanceInventory = new ServiceInstanceInventory();
......@@ -99,4 +99,14 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
}
return serviceInstanceId;
}
@Override public void heartbeat(int serviceInstanceId, long heartBeatTime) {
ServiceInstanceInventory serviceInstanceInventory = getServiceInstanceInventoryCache().get(serviceInstanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInstanceInventory.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(serviceInstanceInventory);
} else {
logger.warn("Service instance {} heartbeat, but not found in storage.");
}
}
}
......@@ -18,12 +18,14 @@
package org.apache.skywalking.oap.server.core.register.service;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
import static java.util.Objects.isNull;
......@@ -32,6 +34,8 @@ import static java.util.Objects.isNull;
*/
public class ServiceInventoryRegister implements IServiceInventoryRegister {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryRegister.class);
private final ModuleManager moduleManager;
private ServiceInventoryCache serviceInventoryCache;
......@@ -81,4 +85,15 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
}
return serviceId;
}
@Override public void heartbeat(int serviceId, long heartBeatTime) {
ServiceInventory serviceInventory = getServiceInventoryCache().get(serviceId);
if (Objects.nonNull(serviceInventory)) {
serviceInventory.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(serviceInventory);
} else {
logger.warn("Service {} heartbeat, but not found in storage.");
}
}
}
......@@ -62,7 +62,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
try {
RegisterSource newSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(newSource)) {
newSource.combine(newSource);
newSource.combine(source);
registerDAO.forceUpdate(modelName, newSource);
} else {
int sequence = registerDAO.max(modelName);
......
......@@ -35,11 +35,9 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
// private final IInstanceHeartBeatService instanceHeartBeatService;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
// this.instanceHeartBeatService = moduleManager.find(CoreModule.NAME).getService(IInstanceHeartBeatService.class);
}
@Override
......@@ -61,10 +59,10 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
}
@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
// int instanceId = request.getApplicationInstanceId();
// long heartBeatTime = request.getHeartbeatTime();
// this.instanceHeartBeatService.heartBeat(instanceId, heartBeatTime);
// responseObserver.onNext(Downstream.getDefaultInstance());
// responseObserver.onCompleted();
int serviceInstanceId = request.getApplicationInstanceId();
long heartBeatTime = request.getHeartbeatTime();
serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);
responseObserver.onNext(Downstream.getDefaultInstance());
responseObserver.onCompleted();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5;
import io.grpc.*;
import org.apache.skywalking.apm.network.language.agent.*;
/**
* @author peng-yongsheng
*/
public class InstanceHeartBeatTestCase {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub stub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
ApplicationInstanceHeartbeat.Builder builder = ApplicationInstanceHeartbeat.newBuilder();
builder.setApplicationInstanceId(2);
builder.setHeartbeatTime(System.currentTimeMillis() + 5 * 1000 * 60);
Downstream heartbeat = stub.heartbeat(builder.build());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册