diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java index cde3e4c69ef4fa02df4244cc7e55efef38a7d68f..2b20ce3911893748318fc03fa28e342571a5791f 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/InstanceDiscoveryServiceHandler.java @@ -66,6 +66,8 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp int instanceId = request.getApplicationInstanceId(); long heartBeatTime = request.getHeartbeatTime(); this.instanceHeartBeatService.heartBeat(instanceId, heartBeatTime); + responseObserver.onNext(Downstream.getDefaultInstance()); + responseObserver.onCompleted(); } private String buildOsInfo(OSInfo osinfo) { diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java index 6bf5d16909d47233e990eb882d776c3fd6c8a792..11c7f070aaada95c646503d161f78b9b44ce29dd 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java @@ -20,8 +20,12 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler.mock; import io.grpc.ManagedChannel; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.skywalking.apm.network.proto.Application; import org.apache.skywalking.apm.network.proto.ApplicationInstance; +import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat; +import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping; import org.apache.skywalking.apm.network.proto.ApplicationMapping; import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc; import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc; @@ -29,18 +33,24 @@ import org.apache.skywalking.apm.network.proto.OSInfo; import org.apache.skywalking.apm.network.proto.ServiceNameCollection; import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc; import org.apache.skywalking.apm.network.proto.ServiceNameElement; +import org.apache.skywalking.apm.network.proto.ServiceNameMappingCollection; +import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ class RegisterMock { + private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class); + private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub; private InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub; private ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub; - void mock(ManagedChannel channel) { + void mock(ManagedChannel channel) throws InterruptedException { applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel); instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel); serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel); @@ -48,10 +58,17 @@ class RegisterMock { registerProvider(); } - private void registerConsumer() { + private void registerConsumer() throws InterruptedException { Application.Builder application = Application.newBuilder(); application.setApplicationCode("dubbox-consumer"); - ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build()); + + ApplicationMapping applicationMapping; + do { + applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build()); + logger.debug("application id: {}", applicationMapping.getApplication().getValue()); + Thread.sleep(20); + } + while (applicationMapping.getApplication().getValue() == 0); ApplicationInstance.Builder instance = ApplicationInstance.newBuilder(); instance.setApplicationId(applicationMapping.getApplication().getValue()); @@ -65,20 +82,37 @@ class RegisterMock { osInfo.addIpv4S("10.0.0.3"); osInfo.addIpv4S("10.0.0.4"); instance.setOsinfo(osInfo); - instanceDiscoveryServiceBlockingStub.registerInstance(instance.build()); + + ApplicationInstanceMapping instanceMapping; + do { + instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build()); + logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId()); + Thread.sleep(20); + } + while (instanceMapping.getApplicationInstanceId() == 0); ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder(); ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder(); serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue()); serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"); serviceNameCollection.addElements(serviceNameElement); - serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build()); + + registerServiceName(serviceNameCollection); + + heartBeatScheduled(instanceMapping.getApplicationInstanceId()); } - private void registerProvider() { + private void registerProvider() throws InterruptedException { Application.Builder application = Application.newBuilder(); application.setApplicationCode("dubbox-provider"); - ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build()); + + ApplicationMapping applicationMapping; + do { + applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(application.build()); + logger.debug("application id: {}", applicationMapping.getApplication().getValue()); + Thread.sleep(20); + } + while (applicationMapping.getApplication().getValue() == 0); ApplicationInstance.Builder instance = ApplicationInstance.newBuilder(); instance.setApplicationId(applicationMapping.getApplication().getValue()); @@ -92,13 +126,51 @@ class RegisterMock { osInfo.addIpv4S("10.0.0.1"); osInfo.addIpv4S("10.0.0.2"); instance.setOsinfo(osInfo); - instanceDiscoveryServiceBlockingStub.registerInstance(instance.build()); + + ApplicationInstanceMapping instanceMapping; + do { + instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(instance.build()); + logger.debug("instance id: {}", instanceMapping.getApplicationInstanceId()); + Thread.sleep(20); + } + while (instanceMapping.getApplicationInstanceId() == 0); ServiceNameCollection.Builder serviceNameCollection = ServiceNameCollection.newBuilder(); ServiceNameElement.Builder serviceNameElement = ServiceNameElement.newBuilder(); serviceNameElement.setApplicationId(applicationMapping.getApplication().getValue()); serviceNameElement.setServiceName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"); serviceNameCollection.addElements(serviceNameElement); - serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build()); + + registerServiceName(serviceNameCollection); + + heartBeatScheduled(instanceMapping.getApplicationInstanceId()); + } + + private void registerServiceName(ServiceNameCollection.Builder serviceNameCollection) throws InterruptedException { + ServiceNameMappingCollection serviceNameMappingCollection; + do { + serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.discovery(serviceNameCollection.build()); + logger.debug("service name mapping collection size: {}", serviceNameMappingCollection.getElementsCount()); + if (serviceNameMappingCollection.getElementsCount() > 0) { + logger.debug("service id: {}", serviceNameMappingCollection.getElements(0).getServiceId()); + } + Thread.sleep(20); + } + while (serviceNameMappingCollection.getElementsCount() == 0 || serviceNameMappingCollection.getElements(0).getServiceId() == 0); + } + + private void heartBeatScheduled(int instanceId) { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + new RunnableWithExceptionProtection(() -> heartBeat(instanceId), + t -> logger.error("instance heart beat scheduled error.", t)), 4, 1, TimeUnit.SECONDS); + } + + private void heartBeat(int instanceId) { + long now = System.currentTimeMillis(); + logger.debug("instance heart beat, instance id: {}, time: {}", instanceId, now); + ApplicationInstanceHeartbeat.Builder heartbeat = ApplicationInstanceHeartbeat.newBuilder(); + heartbeat.setApplicationInstanceId(instanceId); + heartbeat.setHeartbeatTime(now); + instanceDiscoveryServiceBlockingStub.heartbeat(heartbeat.build()); } } diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java index 8fe9ee82a095d6826412b246572cf44948b3677c..c290bd30ad26727745fe6f5dc23d5fb37439d784 100644 --- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java +++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/TraceSegmentMock.java @@ -83,6 +83,8 @@ public class TraceSegmentMock { while (sleeping.getValue()) { Thread.sleep(200); } + + Thread.sleep(200000); } static class Sleeping {