提交 a23f8ee1 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #276 from ascrutae/zhangxin/feature/support-agent-core-testcase

support agent core testcase
...@@ -17,12 +17,13 @@ public class SkywalkingSpan implements Span { ...@@ -17,12 +17,13 @@ public class SkywalkingSpan implements Span {
/** /**
* Create a shell span for {@link SkywalkingTracer#activeSpan()} * Create a shell span for {@link SkywalkingTracer#activeSpan()}
*
* @param tracer * @param tracer
*/ */
@NeedSnifferActivation( @NeedSnifferActivation(
"1. set the span reference to the dynamic field of enhanced SkywalkingSpan" "1. set the span reference to the dynamic field of enhanced SkywalkingSpan"
) )
public SkywalkingSpan(SkywalkingTracer tracer){ public SkywalkingSpan(SkywalkingTracer tracer) {
} }
......
...@@ -77,6 +77,24 @@ ...@@ -77,6 +77,24 @@
<version>${jetty.version}</version> <version>${jetty.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.4.0</version>
<exclusions>
<exclusion>
<artifactId>mockito-core</artifactId>
<groupId>org.mockito</groupId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<extensions> <extensions>
......
...@@ -34,6 +34,7 @@ public class Config { ...@@ -34,6 +34,7 @@ public class Config {
} }
public static class Collector { public static class Collector {
public static long DISCOVERY_CHECK_INTERVAL = 60 * 1000;
/** /**
* Collector REST-Service address. * Collector REST-Service address.
* e.g. * e.g.
......
...@@ -5,7 +5,6 @@ import org.skywalking.apm.agent.core.boot.ServiceManager; ...@@ -5,7 +5,6 @@ import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config; import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig; import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.skywalking.apm.agent.core.context.trace.TraceSegment; import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.agent.core.sampling.SamplingService; import org.skywalking.apm.agent.core.sampling.SamplingService;
......
...@@ -214,14 +214,14 @@ public abstract class AbstractTracingSpan implements AbstractSpan { ...@@ -214,14 +214,14 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
spanBuilder.setParentSpanId(parentSpanId); spanBuilder.setParentSpanId(parentSpanId);
spanBuilder.setStartTime(startTime); spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(endTime); spanBuilder.setEndTime(endTime);
if (operationId == DictionaryUtil.nullValue()) { if (operationId != DictionaryUtil.nullValue()) {
spanBuilder.setOperationNameId(operationId); spanBuilder.setOperationNameId(operationId);
} else { } else {
spanBuilder.setOperationName(operationName); spanBuilder.setOperationName(operationName);
} }
spanBuilder.setSpanType(SpanType.Entry); spanBuilder.setSpanType(SpanType.Entry);
spanBuilder.setSpanLayerValue(this.layer.getCode()); spanBuilder.setSpanLayerValue(this.layer.getCode());
if (componentId == DictionaryUtil.nullValue()) { if (componentId != DictionaryUtil.nullValue()) {
spanBuilder.setComponentId(componentId); spanBuilder.setComponentId(componentId);
} else { } else {
spanBuilder.setComponent(componentName); spanBuilder.setComponent(componentName);
......
...@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.context.trace; ...@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.context.trace;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.SpanObject; import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.trace.component.Component; import org.skywalking.apm.network.trace.component.Component;
/** /**
...@@ -113,6 +114,7 @@ public class ExitSpan extends AbstractTracingSpan { ...@@ -113,6 +114,7 @@ public class ExitSpan extends AbstractTracingSpan {
} else { } else {
spanBuilder.setPeer(peer); spanBuilder.setPeer(peer);
} }
spanBuilder = spanBuilder.setSpanType(SpanType.Exit);
return spanBuilder; return spanBuilder;
} }
......
package org.skywalking.apm.agent.core.remote; package org.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService; import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.conf.Config;
/** /**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}. * The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
...@@ -15,9 +18,9 @@ public class CollectorDiscoveryService implements BootService { ...@@ -15,9 +18,9 @@ public class CollectorDiscoveryService implements BootService {
@Override @Override
public void boot() throws Throwable { public void boot() throws Throwable {
Thread collectorClientThread = new Thread(new DiscoveryRestServiceClient(), "collectorClientThread"); Executors.newSingleThreadScheduledExecutor()
collectorClientThread.setDaemon(true); .scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
collectorClientThread.start(); Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
} }
@Override @Override
......
...@@ -31,22 +31,25 @@ public class DiscoveryRestServiceClient implements Runnable { ...@@ -31,22 +31,25 @@ public class DiscoveryRestServiceClient implements Runnable {
private volatile int selectedServer = -1; private volatile int selectedServer = -1;
public DiscoveryRestServiceClient() { public DiscoveryRestServiceClient() {
if (Config.Collector.SERVERS == null || Config.Collector.SERVERS.trim().length() == 0) {
logger.warn("Collector server not configured.");
return;
}
serverList = Config.Collector.SERVERS.split(","); serverList = Config.Collector.SERVERS.split(",");
Random r = new Random(); Random r = new Random();
if (serverList.length > 0) { if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length); selectedServer = r.nextInt(serverList.length);
} }
} }
@Override @Override
public void run() { public void run() {
while (true) { try {
try { findServerList();
try2Sleep(60 * 1000); } catch (Throwable t) {
findServerList(); logger.error(t, "Find server list fail.");
} catch (Throwable t) {
logger.error(t, "Find server list fail.");
}
} }
} }
......
...@@ -62,11 +62,16 @@ public class GRPCChannelManager implements BootService, Runnable { ...@@ -62,11 +62,16 @@ public class GRPCChannelManager implements BootService, Runnable {
.maxInboundMessageSize(1024 * 1024 * 50) .maxInboundMessageSize(1024 * 1024 * 50)
.usePlaintext(true); .usePlaintext(true);
managedChannel = channelBuilder.build(); managedChannel = channelBuilder.build();
reconnect = false; if (!managedChannel.isShutdown() && !managedChannel.isTerminated()) {
notify(GRPCChannelStatus.CONNECTED); reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
} else {
notify(GRPCChannelStatus.DISCONNECT);
}
return; return;
} catch (Throwable t) { } catch (Throwable t) {
logger.error(t, "Create channel to {} fail.", server); logger.error(t, "Create channel to {} fail.", server);
notify(GRPCChannelStatus.DISCONNECT);
} }
} }
......
...@@ -26,6 +26,7 @@ import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED; ...@@ -26,6 +26,7 @@ import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
*/ */
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener { public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class); private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);
private static final int TIMEOUT = 30 * 1000;
private volatile DataCarrier<TraceSegment> carrier; private volatile DataCarrier<TraceSegment> carrier;
private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub; private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub;
...@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe ...@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
} }
upstreamSegmentStreamObserver.onCompleted(); upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish(30 * 1000); status.wait4Finish(TIMEOUT);
if (logger.isDebugEnable()) { if (logger.isDebugEnable()) {
logger.debug("{} trace segments have been sent to collector.", data.size()); logger.debug("{} trace segments have been sent to collector.", data.size());
......
...@@ -3,7 +3,10 @@ package org.skywalking.apm.agent.core.boot; ...@@ -3,7 +3,10 @@ package org.skywalking.apm.agent.core.boot;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.context.ContextManager; import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.IgnoredTracerContext; import org.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.skywalking.apm.agent.core.context.TracingContext; import org.skywalking.apm.agent.core.context.TracingContext;
...@@ -14,6 +17,7 @@ import org.skywalking.apm.agent.core.remote.GRPCChannelListener; ...@@ -14,6 +17,7 @@ import org.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient; import org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
import org.skywalking.apm.agent.core.sampling.SamplingService; import org.skywalking.apm.agent.core.sampling.SamplingService;
import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
...@@ -21,12 +25,15 @@ import static org.junit.Assert.assertEquals; ...@@ -21,12 +25,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
public class ServiceManagerTest { public class ServiceManagerTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
@Test @Test
public void testServiceDependencies() throws Exception { public void testServiceDependencies() throws Exception {
ServiceManager.INSTANCE.boot();
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices"); HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(6)); assertThat(registryService.size(), is(7));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)); assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class)); assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
...@@ -48,7 +55,7 @@ public class ServiceManagerTest { ...@@ -48,7 +55,7 @@ public class ServiceManagerTest {
private void assertTracingContextListener() throws Exception { private void assertTracingContextListener() throws Exception {
List<TracingContextListener> LISTENERS = getFieldValue(TracingContext.ListenerManager.class, "LISTENERS"); List<TracingContextListener> LISTENERS = getFieldValue(TracingContext.ListenerManager.class, "LISTENERS");
assertThat(LISTENERS.size(), is(2)); assertThat(LISTENERS.size(), is(3));
assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true)); assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)), is(true)); assertThat(LISTENERS.contains(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)), is(true));
...@@ -62,9 +69,7 @@ public class ServiceManagerTest { ...@@ -62,9 +69,7 @@ public class ServiceManagerTest {
assertNotNull(service); assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service, "listeners"); List<GRPCChannelListener> listeners = getFieldValue(service, "listeners");
assertEquals(listeners.size(), 1); assertEquals(listeners.size(), 3);
assertThat(listeners.get(0), is((GRPCChannelListener)ServiceManager.INSTANCE.
findService(TraceSegmentServiceClient.class)));
} }
private void assertSamplingService(SamplingService service) { private void assertSamplingService(SamplingService service) {
......
package org.skywalking.apm.agent.core.context; package org.skywalking.apm.agent.core.context;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List; import java.util.List;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.skywalking.apm.agent.core.boot.ServiceManager; import org.skywalking.apm.agent.core.boot.ServiceManager;
...@@ -17,11 +19,19 @@ import org.skywalking.apm.agent.core.context.trace.TraceSegment; ...@@ -17,11 +19,19 @@ import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.context.trace.TraceSegmentRef; import org.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.skywalking.apm.agent.core.context.util.AbstractTracingSpanHelper; import org.skywalking.apm.agent.core.context.util.AbstractTracingSpanHelper;
import org.skywalking.apm.agent.core.context.util.SegmentHelper; import org.skywalking.apm.agent.core.context.util.SegmentHelper;
import org.skywalking.apm.agent.core.context.util.SegmentStorage; import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.core.context.util.SegmentStoragePoint; import org.skywalking.apm.agent.core.test.tools.SegmentStorage;
import org.skywalking.apm.agent.core.test.tools.SegmentStoragePoint;
import org.skywalking.apm.agent.core.context.util.TraceSegmentRefHelper; import org.skywalking.apm.agent.core.context.util.TraceSegmentRefHelper;
import org.skywalking.apm.agent.core.context.util.TracingSegmentRunner; import org.skywalking.apm.agent.core.test.tools.TracingSegmentRunner;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.network.trace.component.ComponentsDefine; import org.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
...@@ -35,10 +45,8 @@ public class ContextManagerTest { ...@@ -35,10 +45,8 @@ public class ContextManagerTest {
@SegmentStoragePoint @SegmentStoragePoint
private SegmentStorage tracingData; private SegmentStorage tracingData;
@BeforeClass @Rule
public static void setUpBeforeClass() { public AgentServiceRule agentServiceRule = new AgentServiceRule();
ServiceManager.INSTANCE.boot();
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -186,4 +194,75 @@ public class ContextManagerTest { ...@@ -186,4 +194,75 @@ public class ContextManagerTest {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID = DictionaryUtil.nullValue(); RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID = DictionaryUtil.nullValue();
} }
@Test
public void testTransform() throws InvalidProtocolBufferException {
ContextCarrier contextCarrier = new ContextCarrier().deserialize("S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/|T.1499176688386.581928182.80935.69.2");
assertTrue(contextCarrier.isValid());
AbstractSpan firstEntrySpan = ContextManager.createEntrySpan("/testFirstEntry", contextCarrier);
firstEntrySpan.setComponent(ComponentsDefine.TOMCAT);
Tags.HTTP.METHOD.set(firstEntrySpan, "GET");
Tags.URL.set(firstEntrySpan, "127.0.0.1:8080");
SpanLayer.asHttp(firstEntrySpan);
AbstractSpan secondEntrySpan = ContextManager.createEntrySpan("/testSecondEntry", contextCarrier);
secondEntrySpan.setComponent(ComponentsDefine.DUBBO);
Tags.URL.set(firstEntrySpan, "dubbo://127.0.0.1:8080");
SpanLayer.asRPCFramework(secondEntrySpan);
ContextCarrier injectContextCarrier = new ContextCarrier();
AbstractSpan exitSpan = ContextManager.createExitSpan("/textExitSpan", injectContextCarrier, "127.0.0.1:12800");
exitSpan.errorOccurred();
exitSpan.log(new RuntimeException("exception"));
exitSpan.setComponent(ComponentsDefine.HTTPCLIENT);
SpanLayer.asHttp(exitSpan);
ContextManager.stopSpan();
ContextManager.stopSpan();
ContextManager.stopSpan();
TraceSegment actualSegment = tracingData.getTraceSegments().get(0);
UpstreamSegment upstreamSegment = actualSegment.transform();
assertThat(upstreamSegment.getGlobalTraceIdsCount(), is(1));
TraceSegmentObject traceSegmentObject = TraceSegmentObject.parseFrom(upstreamSegment.getSegment());
TraceSegmentReference reference = traceSegmentObject.getRefs(0);
assertThat(reference.getEntryServiceName(), is("/portal/"));
assertThat(reference.getNetworkAddress(), is("192.168.1.8 :18002"));
assertThat(reference.getParentSpanId(), is(3));
assertThat(traceSegmentObject.getApplicationId(), is(1));
assertThat(traceSegmentObject.getRefsCount(), is(1));
assertThat(traceSegmentObject.getSpansCount(), is(2));
SpanObject actualSpan = traceSegmentObject.getSpans(1);
assertThat(actualSpan.getComponentId(), is(3));
assertThat(actualSpan.getComponent(), is(""));
assertThat(actualSpan.getOperationName(), is("/testSecondEntry"));
assertThat(actualSpan.getParentSpanId(), is(-1));
assertThat(actualSpan.getSpanId(), is(0));
assertThat(actualSpan.getSpanType(), is(SpanType.Entry));
SpanObject exitSpanObject = traceSegmentObject.getSpans(0);
assertThat(exitSpanObject.getComponentId(), is(2));
assertThat(exitSpanObject.getComponent(), is(""));
assertThat(exitSpanObject.getSpanType(), is(SpanType.Exit));
assertThat(exitSpanObject.getOperationName(), is("/textExitSpan"));
assertThat(exitSpanObject.getParentSpanId(), is(0));
assertThat(exitSpanObject.getSpanId(), is(1));
assertThat(exitSpanObject.getLogsCount(), is(1));
LogMessage logMessage = exitSpanObject.getLogs(0);
assertThat(logMessage.getDataCount(), is(4));
List<KeyWithStringValue> values = logMessage.getDataList();
assertThat(values.get(0).getValue(), is("error"));
assertThat(values.get(1).getValue(), is(RuntimeException.class.getName()));
assertThat(values.get(2).getValue(), is("exception"));
assertTrue(values.get(2).getValue().length() <= 4000);
}
} }
...@@ -3,6 +3,7 @@ package org.skywalking.apm.agent.core.context; ...@@ -3,6 +3,7 @@ package org.skywalking.apm.agent.core.context;
import java.util.LinkedList; import java.util.LinkedList;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.skywalking.apm.agent.core.boot.ServiceManager; import org.skywalking.apm.agent.core.boot.ServiceManager;
...@@ -10,9 +11,10 @@ import org.skywalking.apm.agent.core.conf.Config; ...@@ -10,9 +11,10 @@ import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig; import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.NoopSpan; import org.skywalking.apm.agent.core.context.trace.NoopSpan;
import org.skywalking.apm.agent.core.context.util.SegmentStorage; import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.core.context.util.SegmentStoragePoint; import org.skywalking.apm.agent.core.test.tools.SegmentStorage;
import org.skywalking.apm.agent.core.context.util.TracingSegmentRunner; import org.skywalking.apm.agent.core.test.tools.SegmentStoragePoint;
import org.skywalking.apm.agent.core.test.tools.TracingSegmentRunner;
import static junit.framework.TestCase.assertNull; import static junit.framework.TestCase.assertNull;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
...@@ -24,6 +26,9 @@ public class IgnoredTracerContextTest { ...@@ -24,6 +26,9 @@ public class IgnoredTracerContextTest {
@SegmentStoragePoint @SegmentStoragePoint
private SegmentStorage storage; private SegmentStorage storage;
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
RemoteDownstreamConfig.Agent.APPLICATION_ID = 1; RemoteDownstreamConfig.Agent.APPLICATION_ID = 1;
...@@ -57,7 +62,6 @@ public class IgnoredTracerContextTest { ...@@ -57,7 +62,6 @@ public class IgnoredTracerContextTest {
@Test @Test
public void ignoredTraceContextWithExcludeOperationName() { public void ignoredTraceContextWithExcludeOperationName() {
ServiceManager.INSTANCE.boot();
AbstractSpan abstractSpan = ContextManager.createEntrySpan("test.js", null); AbstractSpan abstractSpan = ContextManager.createEntrySpan("test.js", null);
ContextManager.stopSpan(); ContextManager.stopSpan();
...@@ -68,7 +72,6 @@ public class IgnoredTracerContextTest { ...@@ -68,7 +72,6 @@ public class IgnoredTracerContextTest {
@Test @Test
public void ignoredTraceContextWithEmptyOperationName() { public void ignoredTraceContextWithEmptyOperationName() {
ServiceManager.INSTANCE.boot();
ContextCarrier contextCarrier = new ContextCarrier(); ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan abstractSpan = ContextManager.createExitSpan("", contextCarrier, "127.0.0.1:2181"); AbstractSpan abstractSpan = ContextManager.createExitSpan("", contextCarrier, "127.0.0.1:2181");
ContextManager.stopSpan(); ContextManager.stopSpan();
......
package org.skywalking.apm.agent.core.remote;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class DiscoveryRestServiceClientTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
private DiscoveryRestServiceClient client;
@Rule
public WireMockRule wireMockRule = new WireMockRule(8089);
@Before
public void setUpBeforeClass() {
Config.Collector.DISCOVERY_CHECK_INTERVAL = 1;
stubFor(get(urlEqualTo("/withoutResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("[]")));
stubFor(get(urlEqualTo("/withResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("['127.0.0.1:8080','127.0.0.1:8090']")));
stubFor(get(urlEqualTo("/withSameResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("['127.0.0.1:8090','127.0.0.1:8080']")));
stubFor(get(urlEqualTo("/withDifferenceResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("['127.0.0.1:9090','127.0.0.1:18090']")));
stubFor(get(urlEqualTo("/with404"))
.willReturn(aResponse()
.withStatus(400)));
}
@Test
public void testWithoutCollectorServer() throws RESTResponseStatusError, IOException {
client = new DiscoveryRestServiceClient();
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(0));
}
@Test
public void testWithGRPCAddress() throws RESTResponseStatusError, IOException {
Config.Collector.SERVERS = "127.0.0.1:8089";
Config.Collector.DISCOVERY_SERVICE_NAME = "/withResult";
client = new DiscoveryRestServiceClient();
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(2));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:8080"), is(true));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:8090"), is(true));
}
@Test
public void testWithoutGRPCAddress() throws RESTResponseStatusError, IOException {
Config.Collector.SERVERS = "127.0.0.1:8089";
Config.Collector.DISCOVERY_SERVICE_NAME = "/withoutResult";
client = new DiscoveryRestServiceClient();
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(0));
}
@Test
public void testChangeGrpcAddress() throws RESTResponseStatusError, IOException {
Config.Collector.SERVERS = "127.0.0.1:8089";
Config.Collector.DISCOVERY_SERVICE_NAME = "/withResult";
client = new DiscoveryRestServiceClient();
client.run();
Config.Collector.DISCOVERY_SERVICE_NAME = "/withDifferenceResult";
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(2));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:9090"), is(true));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:18090"), is(true));
}
@After
public void tearDown() {
Config.Collector.SERVERS = "";
Config.Collector.DISCOVERY_SERVICE_NAME = "/grpc/address";
RemoteDownstreamConfig.Collector.GRPC_SERVERS.clear();
}
}
package org.skywalking.apm.agent.core.remote;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.testing.GrpcServerRule;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({GRPCChannelManager.class, NettyChannelBuilder.class})
public class GRPCChannelManagerTest {
@Rule
private GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Spy
private GRPCChannelManager grpcChannelManager = new GRPCChannelManager();
@Mock
private NettyChannelBuilder mock;
@Spy
private MockGRPCChannelListener listener = new MockGRPCChannelListener();
@Before
public void setUp() throws Throwable {
List<String> grpcServers = new ArrayList<String>();
grpcServers.add("127.0.0.1:2181");
RemoteDownstreamConfig.Collector.GRPC_SERVERS = grpcServers;
Whitebox.setInternalState(grpcChannelManager, "retryCycle", 1);
mockStatic(NettyChannelBuilder.class);
when(NettyChannelBuilder.forAddress(anyString(), anyInt())).thenReturn(mock);
when(mock.nameResolverFactory(any(NameResolver.Factory.class))).thenReturn(mock);
when(mock.maxInboundMessageSize(anyInt())).thenReturn(mock);
when(mock.usePlaintext(true)).thenReturn(mock);
when(mock.build()).thenReturn(grpcServerRule.getChannel());
grpcChannelManager.addChannelListener(listener);
}
@Test
public void changeStatusToConnectedWithReportError() throws Throwable {
grpcChannelManager.reportError(new StatusRuntimeException(Status.ABORTED));
grpcChannelManager.run();
verify(listener, times(1)).statusChanged(GRPCChannelStatus.CONNECTED);
assertThat(listener.status, is(GRPCChannelStatus.CONNECTED));
}
@Test
public void changeStatusToDisConnectedWithReportError() throws Throwable {
doThrow(new RuntimeException()).when(mock).nameResolverFactory(any(NameResolver.Factory.class));
grpcChannelManager.run();
verify(listener, times(1)).statusChanged(GRPCChannelStatus.DISCONNECT);
assertThat(listener.status, is(GRPCChannelStatus.DISCONNECT));
}
@Test
public void reportErrorWithoutChangeStatus() throws Throwable {
grpcChannelManager.run();
grpcChannelManager.reportError(new RuntimeException());
grpcChannelManager.run();
verify(listener, times(1)).statusChanged(GRPCChannelStatus.CONNECTED);
assertThat(listener.status, is(GRPCChannelStatus.CONNECTED));
}
private class MockGRPCChannelListener implements GRPCChannelListener {
private GRPCChannelStatus status;
@Override
public void statusChanged(GRPCChannelStatus status) {
this.status = status;
}
}
}
package org.skywalking.apm.agent.core.remote;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.core.test.tools.SegmentStorage;
import org.skywalking.apm.agent.core.test.tools.SegmentStoragePoint;
import org.skywalking.apm.agent.core.test.tools.TracingSegmentRunner;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.spy;
@RunWith(TracingSegmentRunner.class)
public class TraceSegmentServiceClientTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@SegmentStoragePoint
private SegmentStorage storage;
private TraceSegmentServiceClient serviceClient = new TraceSegmentServiceClient();
private List<UpstreamSegment> upstreamSegments;
private TraceSegmentServiceGrpc.TraceSegmentServiceImplBase serviceImplBase = new TraceSegmentServiceGrpc.TraceSegmentServiceImplBase() {
@Override
public StreamObserver<UpstreamSegment> collect(final StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override
public void onNext(UpstreamSegment value) {
upstreamSegments.add(value);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onNext(Downstream.getDefaultInstance());
responseObserver.onCompleted();
}
};
}
};
@BeforeClass
public static void setUpBeforeClass() {
RemoteDownstreamConfig.Agent.APPLICATION_ID = 1;
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID = 1;
}
@Before
public void setUp() throws Throwable {
Whitebox.setInternalState(ServiceManager.INSTANCE.findService(GRPCChannelManager.class), "reconnect", false);
spy(serviceClient);
Whitebox.setInternalState(serviceClient, "serviceStub",
TraceSegmentServiceGrpc.newStub(grpcServerRule.getChannel()));
Whitebox.setInternalState(serviceClient, "status", GRPCChannelStatus.CONNECTED);
upstreamSegments = new ArrayList<UpstreamSegment>();
}
@Test
public void testSendTraceSegmentWithoutException() throws InvalidProtocolBufferException {
grpcServerRule.getServiceRegistry().addService(serviceImplBase);
AbstractSpan firstEntrySpan = ContextManager.createEntrySpan("/testFirstEntry", null);
firstEntrySpan.setComponent(ComponentsDefine.TOMCAT);
Tags.HTTP.METHOD.set(firstEntrySpan, "GET");
Tags.URL.set(firstEntrySpan, "127.0.0.1:8080");
SpanLayer.asHttp(firstEntrySpan);
ContextManager.stopSpan();
serviceClient.consume(storage.getTraceSegments());
assertThat(upstreamSegments.size(), is(1));
UpstreamSegment upstreamSegment = upstreamSegments.get(0);
assertThat(upstreamSegment.getGlobalTraceIdsCount(), is(1));
TraceSegmentObject traceSegmentObject = TraceSegmentObject.parseFrom(upstreamSegment.getSegment());
assertThat(traceSegmentObject.getRefsCount(), is(0));
assertThat(traceSegmentObject.getSpansCount(), is(1));
SpanObject spanObject = traceSegmentObject.getSpans(0);
assertThat(spanObject.getSpanType(), is(SpanType.Entry));
assertThat(spanObject.getSpanId(), is(0));
assertThat(spanObject.getParentSpanId(), is(-1));
}
@Test
public void testSendTraceSegmentWithException() throws InvalidProtocolBufferException {
grpcServerRule.getServiceRegistry().addService(serviceImplBase);
AbstractSpan firstEntrySpan = ContextManager.createEntrySpan("/testFirstEntry", null);
firstEntrySpan.setComponent(ComponentsDefine.TOMCAT);
Tags.HTTP.METHOD.set(firstEntrySpan, "GET");
Tags.URL.set(firstEntrySpan, "127.0.0.1:8080");
SpanLayer.asHttp(firstEntrySpan);
ContextManager.stopSpan();
grpcServerRule.getServer().shutdownNow();
serviceClient.consume(storage.getTraceSegments());
assertThat(upstreamSegments.size(), is(0));
boolean reconnect = Whitebox.getInternalState(ServiceManager.INSTANCE.findService(GRPCChannelManager.class), "reconnect");
assertThat(reconnect, is(true));
}
}
package org.skywalking.apm.agent.core.test.tools;
import java.util.HashMap;
import java.util.LinkedList;
import org.junit.rules.ExternalResource;
import org.powermock.reflect.Whitebox;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.skywalking.apm.agent.core.context.TracingContext;
import org.skywalking.apm.agent.core.context.TracingContextListener;
public class AgentServiceRule extends ExternalResource {
@Override
protected void after() {
super.after();
Whitebox.setInternalState(ServiceManager.INSTANCE, "bootedServices", new HashMap<Class, BootService>());
Whitebox.setInternalState(TracingContext.ListenerManager.class, "LISTENERS", new LinkedList<TracingContextListener>() );
Whitebox.setInternalState(IgnoredTracerContext.ListenerManager.class, "LISTENERS", new LinkedList<TracingContextListener>() );
}
@Override
protected void before() throws Throwable {
super.before();
ServiceManager.INSTANCE.boot();
}
}
package org.skywalking.apm.agent.core.context.util; package org.skywalking.apm.agent.core.test.tools;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -9,7 +9,7 @@ public class SegmentStorage { ...@@ -9,7 +9,7 @@ public class SegmentStorage {
private LinkedList<TraceSegment> traceSegments; private LinkedList<TraceSegment> traceSegments;
private LinkedList<IgnoredTracerContext> ignoredTracerContexts; private LinkedList<IgnoredTracerContext> ignoredTracerContexts;
SegmentStorage() { public SegmentStorage() {
traceSegments = new LinkedList<TraceSegment>(); traceSegments = new LinkedList<TraceSegment>();
ignoredTracerContexts = new LinkedList<IgnoredTracerContext>(); ignoredTracerContexts = new LinkedList<IgnoredTracerContext>();
} }
......
package org.skywalking.apm.agent.core.context.util; package org.skywalking.apm.agent.core.test.tools;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
......
package org.skywalking.apm.agent.core.context.util; package org.skywalking.apm.agent.core.test.tools;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import org.junit.runner.Description;
import org.junit.runner.notification.Failure;
import org.junit.runner.notification.RunNotifier;
import org.junit.runners.BlockJUnit4ClassRunner; import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.InitializationError; import org.junit.runners.model.InitializationError;
import org.junit.runners.model.Statement;
import org.skywalking.apm.agent.core.context.IgnoreTracerContextListener; import org.skywalking.apm.agent.core.context.IgnoreTracerContextListener;
import org.skywalking.apm.agent.core.context.IgnoredTracerContext; import org.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.skywalking.apm.agent.core.context.TracingContext; import org.skywalking.apm.agent.core.context.TracingContext;
...@@ -36,51 +35,39 @@ public class TracingSegmentRunner extends BlockJUnit4ClassRunner { ...@@ -36,51 +35,39 @@ public class TracingSegmentRunner extends BlockJUnit4ClassRunner {
return targetObject; return targetObject;
} }
@Override @Override protected Statement withAfters(FrameworkMethod method, Object target, final Statement statement) {
public void run(RunNotifier notifier) { return new Statement() {
notifier.addListener(new RunListener()); @Override public void evaluate() throws Throwable {
super.run(notifier); if (field != null) {
} try {
tracingData = new SegmentStorage();
field.set(targetObject, tracingData);
} catch (IllegalAccessException e) {
}
}
tracingContextListener = new TracingContextListener() {
@Override
public void afterFinished(TraceSegment traceSegment) {
tracingData.addTraceSegment(traceSegment);
}
};
class RunListener extends org.junit.runner.notification.RunListener { ignoreTracerContextListener = new IgnoreTracerContextListener() {
@Override @Override
public void testStarted(Description description) throws Exception { public void afterFinished(IgnoredTracerContext tracerContext) {
if (field != null) { tracingData.addIgnoreTraceContext(tracerContext);
}
};
TracingContext.ListenerManager.add(tracingContextListener);
IgnoredTracerContext.ListenerManager.add(ignoreTracerContextListener);
try { try {
tracingData = new SegmentStorage(); statement.evaluate();
field.set(targetObject, tracingData); } finally {
} catch (IllegalAccessException e) { TracingContext.ListenerManager.remove(tracingContextListener);
IgnoredTracerContext.ListenerManager.remove(ignoreTracerContextListener);
} }
} }
tracingContextListener = new TracingContextListener() { };
@Override
public void afterFinished(TraceSegment traceSegment) {
tracingData.addTraceSegment(traceSegment);
}
};
ignoreTracerContextListener = new IgnoreTracerContextListener() {
@Override
public void afterFinished(IgnoredTracerContext tracerContext) {
tracingData.addIgnoreTraceContext(tracerContext);
}
};
TracingContext.ListenerManager.add(tracingContextListener);
IgnoredTracerContext.ListenerManager.add(ignoreTracerContextListener);
super.testStarted(description);
}
@Override
public void testFinished(Description description) throws Exception {
super.testFinished(description);
TracingContext.ListenerManager.remove(tracingContextListener);
IgnoredTracerContext.ListenerManager.remove(ignoreTracerContextListener);
}
@Override
public void testFailure(Failure failure) throws Exception {
super.testFailure(failure);
TracingContext.ListenerManager.remove(tracingContextListener);
}
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册