From 13d123276b94a67501b9d57da154f4a2d430b1b6 Mon Sep 17 00:00:00 2001 From: gaohongtao Date: Tue, 10 Oct 2017 17:37:23 +0800 Subject: [PATCH] Fix #381 Sharding-JDBC 1.5.x plugin --- .../trace/component/ComponentsDefine.java | 11 +- .../apm/agent/core/context/tag/Tags.java | 5 + apm-sniffer/apm-agent/pom.xml | 6 +- apm-sniffer/apm-sdk-plugin/pom.xml | 1 + .../sharding-jdbc-1.5.x-plugin/pom.xml | 74 +++++++ .../plugin/sjdbc/ExecuteEventListener.java | 69 +++++++ .../sjdbc/define/AsyncExecuteInterceptor.java | 51 +++++ .../sjdbc/define/ExecuteInterceptor.java | 39 ++++ .../sjdbc/define/ExecutorInstrumentation.java | 75 +++++++ .../src/main/resources/skywalking-plugin.def | 1 + .../apm/plugin/sjdbc/InterceptorTest.java | 195 ++++++++++++++++++ 11 files changed, 522 insertions(+), 5 deletions(-) create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/pom.xml create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/ExecuteEventListener.java create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/AsyncExecuteInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecuteInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecutorInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/resources/skywalking-plugin.def create mode 100644 apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/test/java/org/skywalking/apm/plugin/sjdbc/InterceptorTest.java diff --git a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java index c635c2d2e1..d0398789b5 100644 --- a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java +++ b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java @@ -43,10 +43,12 @@ public class ComponentsDefine { public static final OfficialComponent JETTY_CLIENT = new OfficialComponent(18, "JettyClient"); - public static final OfficialComponent JETTY_SERVER = new OfficialComponent(19, "JettyServer"); - + public static final OfficialComponent JETTY_SERVER = new OfficialComponent(19, "JettyServer"); + public static final OfficialComponent MEMCACHE = new OfficialComponent(20, "Memcache"); - + + public static final OfficialComponent SHARDING_JDBC = new OfficialComponent(21, "ShardingJDBC"); + private static ComponentsDefine instance = new ComponentsDefine(); private String[] components; @@ -56,7 +58,7 @@ public class ComponentsDefine { } public ComponentsDefine() { - components = new String[21]; + components = new String[22]; addComponent(TOMCAT); addComponent(HTTPCLIENT); addComponent(DUBBO); @@ -77,6 +79,7 @@ public class ComponentsDefine { addComponent(JETTY_CLIENT); addComponent(JETTY_SERVER); addComponent(MEMCACHE); + addComponent(SHARDING_JDBC); } private void addComponent(OfficialComponent component) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/tag/Tags.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/tag/Tags.java index be7543b5f6..825738ff40 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/tag/Tags.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/tag/Tags.java @@ -35,6 +35,11 @@ public final class Tags { */ public static final StringTag DB_STATEMENT = new StringTag("db.statement"); + /** + * DB_BIND_VARIABLES records the bind variables of sql statement. + */ + public static final StringTag DB_BIND_VARIABLES = new StringTag("db.bind_vars"); + public static final class HTTP { public static final StringTag METHOD = new StringTag("http.method"); } diff --git a/apm-sniffer/apm-agent/pom.xml b/apm-sniffer/apm-agent/pom.xml index 064e0917cd..9a55cadde3 100644 --- a/apm-sniffer/apm-agent/pom.xml +++ b/apm-sniffer/apm-agent/pom.xml @@ -130,7 +130,11 @@ apm-spymemcached-2.x-plugin ${project.version} - + + org.skywalking + apm-sharding-jdbc-1.5.x-plugin + ${project.version} + org.skywalking diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index 1458226560..047595cf3e 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -27,6 +27,7 @@ nutz-plugins jetty-plugin spymemcached-2.x-plugin + sharding-jdbc-1.5.x-plugin pom diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/pom.xml new file mode 100644 index 0000000000..0ece48390a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/pom.xml @@ -0,0 +1,74 @@ + + + + apm-sdk-plugin + org.skywalking + 3.2.3-2017 + + 4.0.0 + + apm-sharding-jdbc-1.5.x-plugin + jar + + sharding-jdbc-1.5.x-plugin + http://maven.apache.org + + + UTF-8 + + + + + ${groupId} + apm-jdbc-plugin + ${project.version} + test + + + mysql + mysql-connector-java + [2.0.14,6.0.6] + test + + + com.dangdang + sharding-jdbc-core + [1.5.0,2.0.0) + provided + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/ExecuteEventListener.java b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/ExecuteEventListener.java new file mode 100644 index 0000000000..d187d6f367 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/ExecuteEventListener.java @@ -0,0 +1,69 @@ +package org.skywalking.apm.plugin.sjdbc; + +import com.dangdang.ddframe.rdb.sharding.executor.event.AbstractExecutionEvent; +import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent; +import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent; +import com.dangdang.ddframe.rdb.sharding.executor.threadlocal.ExecutorDataMap; +import com.dangdang.ddframe.rdb.sharding.util.EventBusInstance; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; +import java.util.stream.Collectors; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.ContextSnapshot; +import org.skywalking.apm.agent.core.context.tag.Tags; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.skywalking.apm.network.trace.component.ComponentsDefine; +import org.skywalking.apm.plugin.sjdbc.define.AsyncExecuteInterceptor; + +/** + * Sharding-jdbc provides {@link EventBusInstance} to help external systems get events about sql execution. + * {@link ExecuteEventListener} can get sql statement start and end events, resulting in db span. + * + * @author gaohongtao + */ +public class ExecuteEventListener { + + public static void init() { + EventBusInstance.getInstance().register(new ExecuteEventListener()); + } + + @Subscribe + @AllowConcurrentEvents + public void listenDML(DMLExecutionEvent event) { + handle(event, "MODIFY"); + } + + @Subscribe + @AllowConcurrentEvents + public void listenDQL(DQLExecutionEvent event) { + handle(event, "QUERY"); + } + + private void handle(AbstractExecutionEvent event, String operation) { + switch (event.getEventExecutionType()) { + case BEFORE_EXECUTE: + AbstractSpan span = ContextManager.createExitSpan("/SJDBC/BRANCH/" + operation, event.getDataSource()); + if (ExecutorDataMap.getDataMap().containsKey(AsyncExecuteInterceptor.SNAPSHOT_DATA_KEY)) { + ContextManager.continued((ContextSnapshot)ExecutorDataMap.getDataMap().get(AsyncExecuteInterceptor.SNAPSHOT_DATA_KEY)); + } + Tags.DB_TYPE.set(span, "sql"); + Tags.DB_INSTANCE.set(span, event.getDataSource()); + Tags.DB_STATEMENT.set(span, event.getSql()); + if (!event.getParameters().isEmpty()) { + Tags.DB_BIND_VARIABLES.set(span, event.getParameters().stream().map(Object::toString).collect(Collectors.joining(","))); + } + span.setComponent(ComponentsDefine.SHARDING_JDBC); + SpanLayer.asDB(span); + break; + case EXECUTE_FAILURE: + span = ContextManager.activeSpan(); + span.errorOccurred(); + if (event.getException().isPresent()) { + span.log(event.getException().get()); + } + case EXECUTE_SUCCESS: + ContextManager.stopSpan(); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/AsyncExecuteInterceptor.java b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/AsyncExecuteInterceptor.java new file mode 100644 index 0000000000..a7ce9dcd80 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/AsyncExecuteInterceptor.java @@ -0,0 +1,51 @@ +package org.skywalking.apm.plugin.sjdbc.define; + +import com.dangdang.ddframe.rdb.sharding.constant.SQLType; +import com.dangdang.ddframe.rdb.sharding.executor.ExecuteCallback; +import com.dangdang.ddframe.rdb.sharding.executor.threadlocal.ExecutorDataMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.ContextSnapshot; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +/** + * {@link AsyncExecuteInterceptor} enhances {@link com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#asyncExecute(SQLType, Collection, List, ExecuteCallback)} + * so that the sql executor can get a {@link ContextSnapshot} of main thread when it is executed asynchronously. + * + * @author gaohongtao + */ +public class AsyncExecuteInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String SNAPSHOT_DATA_KEY = "APM_SKYWALKING_SNAPSHOT_DATA"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + ExecutorDataMap.getDataMap().put(SNAPSHOT_DATA_KEY, ContextManager.capture()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + Map oldMap = ExecutorDataMap.getDataMap(); + Map newMap = new HashMap<>(oldMap.size() - 1); + for (Map.Entry each : oldMap.entrySet()) { + if (!each.getKey().equals(SNAPSHOT_DATA_KEY)) { + newMap.put(each.getKey(), each.getValue()); + } + } + ExecutorDataMap.setDataMap(newMap); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecuteInterceptor.java b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecuteInterceptor.java new file mode 100644 index 0000000000..f7d7c4c079 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecuteInterceptor.java @@ -0,0 +1,39 @@ +package org.skywalking.apm.plugin.sjdbc.define; + +import com.dangdang.ddframe.rdb.sharding.constant.SQLType; +import com.dangdang.ddframe.rdb.sharding.executor.ExecuteCallback; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.List; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.skywalking.apm.network.trace.component.ComponentsDefine; + +/** + * {@link ExecuteInterceptor} enhances {@link com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#execute(SQLType, Collection, List, ExecuteCallback)} + * ,creating a local span that records the overall execution of sql + * + * @author gaohongtao + */ +public class ExecuteInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SQLType sqlType = (SQLType)allArguments[0]; + ContextManager.createLocalSpan("/SJDBC/TRUNK/" + sqlType.name()).setComponent(ComponentsDefine.SHARDING_JDBC); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecutorInstrumentation.java b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecutorInstrumentation.java new file mode 100644 index 0000000000..a9d89a0f5e --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/java/org/skywalking/apm/plugin/sjdbc/define/ExecutorInstrumentation.java @@ -0,0 +1,75 @@ +package org.skywalking.apm.plugin.sjdbc.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.skywalking.apm.plugin.sjdbc.ExecuteEventListener; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * {@link ExecutorInstrumentation} presents that skywalking intercepts {@link com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine}. + * + * @author gaohongtao + */ +public class ExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine"; + + private static final String EXECUTE_INTERCEPTOR_CLASS = "org.skywalking.apm.plugin.sjdbc.define.ExecuteInterceptor"; + + private static final String ASYNC_EXECUTE_INTERCEPTOR_CLASS = "org.skywalking.apm.plugin.sjdbc.define.AsyncExecuteInterceptor"; + + @Override + protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return null; + } + + @Override + protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("execute"); + } + + @Override + public String getMethodsInterceptor() { + return EXECUTE_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("asyncExecute"); + } + + @Override + public String getMethodsInterceptor() { + return ASYNC_EXECUTE_INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override + protected ClassMatch enhanceClass() { + ExecuteEventListener.init(); + return byName(ENHANCE_CLASS); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 0000000000..c30d53853b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1 @@ +sharding-jdbc-1.5.x=org.skywalking.apm.plugin.sjdbc.define.ExecutorInstrumentation \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/test/java/org/skywalking/apm/plugin/sjdbc/InterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/test/java/org/skywalking/apm/plugin/sjdbc/InterceptorTest.java new file mode 100644 index 0000000000..35228ca60b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sharding-jdbc-1.5.x-plugin/src/test/java/org/skywalking/apm/plugin/sjdbc/InterceptorTest.java @@ -0,0 +1,195 @@ +package org.skywalking.apm.plugin.sjdbc; + +import com.dangdang.ddframe.rdb.sharding.constant.SQLType; +import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent; +import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent; +import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType; +import com.dangdang.ddframe.rdb.sharding.executor.threadlocal.ExecutorDataMap; +import com.dangdang.ddframe.rdb.sharding.util.EventBusInstance; +import com.google.common.base.Optional; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.hamcrest.core.Is; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.skywalking.apm.agent.test.helper.SegmentHelper; +import org.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.skywalking.apm.agent.test.tools.SegmentStorage; +import org.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.skywalking.apm.network.trace.component.ComponentsDefine; +import org.skywalking.apm.plugin.sjdbc.define.AsyncExecuteInterceptor; +import org.skywalking.apm.plugin.sjdbc.define.ExecuteInterceptor; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.skywalking.apm.agent.test.tools.SpanAssert.assertComponent; +import static org.skywalking.apm.agent.test.tools.SpanAssert.assertLayer; +import static org.skywalking.apm.agent.test.tools.SpanAssert.assertOccurException; +import static org.skywalking.apm.agent.test.tools.SpanAssert.assertTag; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class InterceptorTest { + + private static ExecutorService ES; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + private ExecuteInterceptor executeInterceptor; + + private AsyncExecuteInterceptor asyncExecuteInterceptor; + + private Object[] allArguments; + + @BeforeClass + public static void init() { + ExecuteEventListener.init(); + ES = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void finish() { + ES.shutdown(); + } + + @Before + public void setUp() throws SQLException { + executeInterceptor = new ExecuteInterceptor(); + asyncExecuteInterceptor = new AsyncExecuteInterceptor(); + allArguments = new Object[]{SQLType.DQL, null}; + } + + @Test + public void assertSyncExecute() throws Throwable { + executeInterceptor.beforeMethod(null, null, allArguments, null, null); + sendEvent("ds_0", "select * from t_order_0"); + executeInterceptor.afterMethod(null, null, allArguments, null, null); + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment segment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(segment); + assertNotNull(spans); + assertThat(spans.size(), is(2)); + assertSpan(spans.get(0), 0); + assertThat(spans.get(1).getOperationName(), is("/SJDBC/TRUNK/DQL")); + } + + @Test + public void assertAsyncExecute() throws Throwable { + executeInterceptor.beforeMethod(null, null, allArguments, null, null); + asyncExecuteInterceptor.beforeMethod(null, null, null, null, null); + final Map dataMap = ExecutorDataMap.getDataMap(); + ES.submit(() -> { + ExecutorDataMap.setDataMap(dataMap); + sendEvent("ds_1", "select * from t_order_1"); + }).get(); + asyncExecuteInterceptor.afterMethod(null, null, null, null, null); + sendEvent("ds_0", "select * from t_order_0"); + executeInterceptor.afterMethod(null, null, allArguments, null, null); + assertThat(segmentStorage.getTraceSegments().size(), is(2)); + TraceSegment segment0 = segmentStorage.getTraceSegments().get(0); + TraceSegment segment1 = segmentStorage.getTraceSegments().get(1); + assertThat(segment0.getRefs().size(), is(1)); + List spans0 = SegmentHelper.getSpans(segment0); + assertNotNull(spans0); + assertThat(spans0.size(), is(1)); + assertSpan(spans0.get(0), 1); + List spans1 = SegmentHelper.getSpans(segment1); + assertNotNull(spans1); + assertThat(spans1.size(), is(2)); + assertSpan(spans1.get(0), 0); + assertThat(spans1.get(1).getOperationName(), is("/SJDBC/TRUNK/DQL")); + } + + @Test + public void assertAsyncContextHold() throws Throwable { + ExecutorDataMap.getDataMap().put("FOO_KEY", "FOO_VALUE"); + executeInterceptor.beforeMethod(null, null, allArguments, null, null); + asyncExecuteInterceptor.beforeMethod(null, null, null, null, null); + final Map dataMap = ExecutorDataMap.getDataMap(); + ES.submit(() -> { + ExecutorDataMap.setDataMap(dataMap); + sendEvent("ds_1", "select * from t_order_1"); + }).get(); + asyncExecuteInterceptor.afterMethod(null, null, null, null, null); + executeInterceptor.afterMethod(null, null, allArguments, null, null); + assertThat(ExecutorDataMap.getDataMap().size(), is(1)); + assertThat(ExecutorDataMap.getDataMap().get("FOO_KEY"), Is.is("FOO_VALUE")); + } + + @Test + public void assertExecuteError() throws Throwable { + executeInterceptor.beforeMethod(null, null, allArguments, null, null); + asyncExecuteInterceptor.beforeMethod(null, null, null, null, null); + final Map dataMap = ExecutorDataMap.getDataMap(); + ES.submit(() -> { + ExecutorDataMap.setDataMap(dataMap); + sendError(); + }).get(); + asyncExecuteInterceptor.handleMethodException(null, null, null, null, new SQLException("test")); + asyncExecuteInterceptor.afterMethod(null, null, null, null, null); + sendEvent("ds_0", "select * from t_order_0"); + executeInterceptor.handleMethodException(null, null, allArguments, null, new SQLException("Test")); + executeInterceptor.afterMethod(null, null, allArguments, null, null); + assertThat(segmentStorage.getTraceSegments().size(), is(2)); + TraceSegment segment0 = segmentStorage.getTraceSegments().get(0); + TraceSegment segment1 = segmentStorage.getTraceSegments().get(1); + List spans0 = SegmentHelper.getSpans(segment0); + assertNotNull(spans0); + assertThat(spans0.size(), is(1)); + assertErrorSpan(spans0.get(0)); + List spans1 = SegmentHelper.getSpans(segment1); + assertNotNull(spans1); + assertThat(spans1.size(), is(2)); + assertSpan(spans1.get(0), 0); + assertErrorSpan(spans1.get(1)); + } + + private void assertSpan(AbstractTracingSpan span, int index) { + assertComponent(span, ComponentsDefine.SHARDING_JDBC); + assertLayer(span, SpanLayer.DB); + assertTag(span, 0, "sql"); + assertTag(span, 1, "ds_" + index); + assertTag(span, 2, "select * from t_order_" + index); + assertThat(span.isExit(), is(true)); + assertThat(span.getOperationName(), is("/SJDBC/BRANCH/QUERY")); + } + + private void assertErrorSpan(AbstractTracingSpan span) { + assertOccurException(span, true); + } + + private void sendEvent(String datasource, String sql) { + DQLExecutionEvent event = new DQLExecutionEvent(datasource, sql, Arrays.asList("1", 100)); + EventBusInstance.getInstance().post(event); + event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS); + EventBusInstance.getInstance().post(event); + } + + private void sendError() { + DMLExecutionEvent event = new DMLExecutionEvent("", "", Collections.emptyList()); + EventBusInstance.getInstance().post(event); + event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE); + event.setException(Optional.of(new SQLException("Test"))); + EventBusInstance.getInstance().post(event); + } +} -- GitLab