未验证 提交 2cef97a8 编写于 作者: 彭勇升 pengys 提交者: GitHub

Merge pull request #817 from peng-yongsheng/feature/heartbeat_test

Instance heart beat tested with elastic search. 
......@@ -66,6 +66,8 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
int instanceId = request.getApplicationInstanceId();
long heartBeatTime = request.getHeartbeatTime();
this.instanceHeartBeatService.heartBeat(instanceId, heartBeatTime);
responseObserver.onNext(Downstream.getDefaultInstance());
responseObserver.onCompleted();
}
private String buildOsInfo(OSInfo osinfo) {
......
......@@ -20,8 +20,12 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler.mock;
import io.grpc.ManagedChannel;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationInstance;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
......@@ -29,18 +33,24 @@ import org.apache.skywalking.apm.network.proto.OSInfo;
import org.apache.skywalking.apm.network.proto.ServiceNameCollection;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameElement;
import org.apache.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
class RegisterMock {
private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class);
private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
private InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
private ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
void mock(ManagedChannel channel) {
void mock(ManagedChannel channel) throws InterruptedException {
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
......@@ -48,10 +58,17 @@ class RegisterMock {
registerProvider();
}
private void registerConsumer() {
private void registerConsumer() throws InterruptedException {
Application.Builder application = Application.newBuilder();
application.setApplicationCode("dubbox-consumer");
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
ApplicationMapping applicationMapping;
do {
applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
logger.debug("application id: {}", applicationMapping.getApplication().getValue());
Thread.sleep(20);
}
while (applicationMapping.getApplication().getValue() == 0);
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationMapping.getApplication().getValue());
......@@ -65,20 +82,37 @@ class RegisterMock {
osInfo.addIpv4S("10.0.0.3");
osInfo.addIpv4S("10.0.0.4");
instance.setOsinfo(osInfo);
instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
ApplicationInstanceMapping instanceMapping;
do {
instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId());
Thread.sleep(20);
}
while (instanceMapping.getApplicationInstanceId() == 0);
ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
serviceNameCollection.addElements(serviceNameElement);
serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
registerServiceName(serviceNameCollection);
heartBeatScheduled(instanceMapping.getApplicationInstanceId());
}
private void registerProvider() {
private void registerProvider() throws InterruptedException {
Application.Builder application = Application.newBuilder();
application.setApplicationCode("dubbox-provider");
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
ApplicationMapping applicationMapping;
do {
applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build());
logger.debug("application id: {}", applicationMapping.getApplication().getValue());
Thread.sleep(20);
}
while (applicationMapping.getApplication().getValue() == 0);
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationMapping.getApplication().getValue());
......@@ -92,13 +126,51 @@ class RegisterMock {
osInfo.addIpv4S("10.0.0.1");
osInfo.addIpv4S("10.0.0.2");
instance.setOsinfo(osInfo);
instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
ApplicationInstanceMapping instanceMapping;
do {
instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build());
logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId());
Thread.sleep(20);
}
while (instanceMapping.getApplicationInstanceId() == 0);
ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder();
ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder();
serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue());
serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
serviceNameCollection.addElements(serviceNameElement);
serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
registerServiceName(serviceNameCollection);
heartBeatScheduled(instanceMapping.getApplicationInstanceId());
}
private void registerServiceName(ServiceNameCollection.Builder serviceNameCollection) throws InterruptedException {
ServiceNameMappingCollection serviceNameMappingCollection;
do {
serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build());
logger.debug("service name mapping collection size: {}", serviceNameMappingCollection.getElementsCount());
if (serviceNameMappingCollection.getElementsCount() > 0) {
logger.debug("service id: {}", serviceNameMappingCollection.getElements(0).getServiceId());
}
Thread.sleep(20);
}
while (serviceNameMappingCollection.getElementsCount() == 0 || serviceNameMappingCollection.getElements(0).getServiceId() == 0);
}
private void heartBeatScheduled(int instanceId) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> heartBeat(instanceId),
t -> logger.error("instance heart beat scheduled error.", t)), 4, 1, TimeUnit.SECONDS);
}
private void heartBeat(int instanceId) {
long now = System.currentTimeMillis();
logger.debug("instance heart beat, instance id: {}, time: {}", instanceId, now);
ApplicationInstanceHeartbeat.Builder heartbeat = ApplicationInstanceHeartbeat.newBuilder();
heartbeat.setApplicationInstanceId(instanceId);
heartbeat.setHeartbeatTime(now);
instanceDiscoveryServiceBlockingStub.heartbeat(heartbeat.build());
}
}
......@@ -83,6 +83,8 @@ public class TraceSegmentMock {
while (sleeping.getValue()) {
Thread.sleep(200);
}
Thread.sleep(200000);
}
static class Sleeping {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册