提交 13d12327 编写于 作者: G gaohongtao

Fix #381 Sharding-JDBC 1.5.x plugin

上级 fb4e88f9
......@@ -47,6 +47,8 @@ public class ComponentsDefine {
public static final OfficialComponent MEMCACHE = new OfficialComponent(20, "Memcache");
public static final OfficialComponent SHARDING_JDBC = new OfficialComponent(21, "ShardingJDBC");
private static ComponentsDefine instance = new ComponentsDefine();
private String[] components;
......@@ -56,7 +58,7 @@ public class ComponentsDefine {
}
public ComponentsDefine() {
components = new String[21];
components = new String[22];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
......@@ -77,6 +79,7 @@ public class ComponentsDefine {
addComponent(JETTY_CLIENT);
addComponent(JETTY_SERVER);
addComponent(MEMCACHE);
addComponent(SHARDING_JDBC);
}
private void addComponent(OfficialComponent component) {
......
......@@ -35,6 +35,11 @@ public final class Tags {
*/
public static final StringTag DB_STATEMENT = new StringTag("db.statement");
/**
* DB_BIND_VARIABLES records the bind variables of sql statement.
*/
public static final StringTag DB_BIND_VARIABLES = new StringTag("db.bind_vars");
public static final class HTTP {
public static final StringTag METHOD = new StringTag("http.method");
}
......
......@@ -130,7 +130,11 @@
<artifactId>apm-spymemcached-2.x-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-sharding-jdbc-1.5.x-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- activation -->
<dependency>
<groupId>org.skywalking</groupId>
......
......@@ -27,6 +27,7 @@
<module>nutz-plugins</module>
<module>jetty-plugin</module>
<module>spymemcached-2.x-plugin</module>
<module>sharding-jdbc-1.5.x-plugin</module>
</modules>
<packaging>pom</packaging>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.3-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-sharding-jdbc-1.5.x-plugin</artifactId>
<packaging>jar</packaging>
<name>sharding-jdbc-1.5.x-plugin</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>${groupId}</groupId>
<artifactId>apm-jdbc-plugin</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>[2.0.14,6.0.6]</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>sharding-jdbc-core</artifactId>
<version>[1.5.0,2.0.0)</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package org.skywalking.apm.plugin.sjdbc;
import com.dangdang.ddframe.rdb.sharding.executor.event.AbstractExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.threadlocal.ExecutorDataMap;
import com.dangdang.ddframe.rdb.sharding.util.EventBusInstance;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import java.util.stream.Collectors;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.ContextSnapshot;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.sjdbc.define.AsyncExecuteInterceptor;
/**
* Sharding-jdbc provides {@link EventBusInstance} to help external systems get events about sql execution.
* {@link ExecuteEventListener} can get sql statement start and end events, resulting in db span.
*
* @author gaohongtao
*/
public class ExecuteEventListener {
public static void init() {
EventBusInstance.getInstance().register(new ExecuteEventListener());
}
@Subscribe
@AllowConcurrentEvents
public void listenDML(DMLExecutionEvent event) {
handle(event, "MODIFY");
}
@Subscribe
@AllowConcurrentEvents
public void listenDQL(DQLExecutionEvent event) {
handle(event, "QUERY");
}
private void handle(AbstractExecutionEvent event, String operation) {
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
AbstractSpan span = ContextManager.createExitSpan("/SJDBC/BRANCH/" + operation, event.getDataSource());
if (ExecutorDataMap.getDataMap().containsKey(AsyncExecuteInterceptor.SNAPSHOT_DATA_KEY)) {
ContextManager.continued((ContextSnapshot)ExecutorDataMap.getDataMap().get(AsyncExecuteInterceptor.SNAPSHOT_DATA_KEY));
}
Tags.DB_TYPE.set(span, "sql");
Tags.DB_INSTANCE.set(span, event.getDataSource());
Tags.DB_STATEMENT.set(span, event.getSql());
if (!event.getParameters().isEmpty()) {
Tags.DB_BIND_VARIABLES.set(span, event.getParameters().stream().map(Object::toString).collect(Collectors.joining(",")));
}
span.setComponent(ComponentsDefine.SHARDING_JDBC);
SpanLayer.asDB(span);
break;
case EXECUTE_FAILURE:
span = ContextManager.activeSpan();
span.errorOccurred();
if (event.getException().isPresent()) {
span.log(event.getException().get());
}
case EXECUTE_SUCCESS:
ContextManager.stopSpan();
}
}
}
package org.skywalking.apm.plugin.sjdbc.define;
import com.dangdang.ddframe.rdb.sharding.constant.SQLType;
import com.dangdang.ddframe.rdb.sharding.executor.ExecuteCallback;
import com.dangdang.ddframe.rdb.sharding.executor.threadlocal.ExecutorDataMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.ContextSnapshot;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
/**
* {@link AsyncExecuteInterceptor} enhances {@link com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#asyncExecute(SQLType, Collection, List, ExecuteCallback)}
* so that the sql executor can get a {@link ContextSnapshot} of main thread when it is executed asynchronously.
*
* @author gaohongtao
*/
public class AsyncExecuteInterceptor implements InstanceMethodsAroundInterceptor {
public static final String SNAPSHOT_DATA_KEY = "APM_SKYWALKING_SNAPSHOT_DATA";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ExecutorDataMap.getDataMap().put(SNAPSHOT_DATA_KEY, ContextManager.capture());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Map<String, Object> oldMap = ExecutorDataMap.getDataMap();
Map<String, Object> newMap = new HashMap<>(oldMap.size() - 1);
for (Map.Entry<String, Object> each : oldMap.entrySet()) {
if (!each.getKey().equals(SNAPSHOT_DATA_KEY)) {
newMap.put(each.getKey(), each.getValue());
}
}
ExecutorDataMap.setDataMap(newMap);
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
package org.skywalking.apm.plugin.sjdbc.define;
import com.dangdang.ddframe.rdb.sharding.constant.SQLType;
import com.dangdang.ddframe.rdb.sharding.executor.ExecuteCallback;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* {@link ExecuteInterceptor} enhances {@link com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine#execute(SQLType, Collection, List, ExecuteCallback)}
* ,creating a local span that records the overall execution of sql
*
* @author gaohongtao
*/
public class ExecuteInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
SQLType sqlType = (SQLType)allArguments[0];
ContextManager.createLocalSpan("/SJDBC/TRUNK/" + sqlType.name()).setComponent(ComponentsDefine.SHARDING_JDBC);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
package org.skywalking.apm.plugin.sjdbc.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.skywalking.apm.plugin.sjdbc.ExecuteEventListener;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link ExecutorInstrumentation} presents that skywalking intercepts {@link com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine}.
*
* @author gaohongtao
*/
public class ExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.dangdang.ddframe.rdb.sharding.executor.ExecutorEngine";
private static final String EXECUTE_INTERCEPTOR_CLASS = "org.skywalking.apm.plugin.sjdbc.define.ExecuteInterceptor";
private static final String ASYNC_EXECUTE_INTERCEPTOR_CLASS = "org.skywalking.apm.plugin.sjdbc.define.AsyncExecuteInterceptor";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("execute");
}
@Override
public String getMethodsInterceptor() {
return EXECUTE_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("asyncExecute");
}
@Override
public String getMethodsInterceptor() {
return ASYNC_EXECUTE_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
ExecuteEventListener.init();
return byName(ENHANCE_CLASS);
}
}
sharding-jdbc-1.5.x=org.skywalking.apm.plugin.sjdbc.define.ExecutorInstrumentation
\ No newline at end of file
package org.skywalking.apm.plugin.sjdbc;
import com.dangdang.ddframe.rdb.sharding.constant.SQLType;
import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType;
import com.dangdang.ddframe.rdb.sharding.executor.threadlocal.ExecutorDataMap;
import com.dangdang.ddframe.rdb.sharding.util.EventBusInstance;
import com.google.common.base.Optional;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.test.helper.SegmentHelper;
import org.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.test.tools.SegmentStorage;
import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.sjdbc.define.AsyncExecuteInterceptor;
import org.skywalking.apm.plugin.sjdbc.define.ExecuteInterceptor;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.skywalking.apm.agent.test.tools.SpanAssert.assertComponent;
import static org.skywalking.apm.agent.test.tools.SpanAssert.assertLayer;
import static org.skywalking.apm.agent.test.tools.SpanAssert.assertOccurException;
import static org.skywalking.apm.agent.test.tools.SpanAssert.assertTag;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class InterceptorTest {
private static ExecutorService ES;
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private ExecuteInterceptor executeInterceptor;
private AsyncExecuteInterceptor asyncExecuteInterceptor;
private Object[] allArguments;
@BeforeClass
public static void init() {
ExecuteEventListener.init();
ES = Executors.newSingleThreadExecutor();
}
@AfterClass
public static void finish() {
ES.shutdown();
}
@Before
public void setUp() throws SQLException {
executeInterceptor = new ExecuteInterceptor();
asyncExecuteInterceptor = new AsyncExecuteInterceptor();
allArguments = new Object[]{SQLType.DQL, null};
}
@Test
public void assertSyncExecute() throws Throwable {
executeInterceptor.beforeMethod(null, null, allArguments, null, null);
sendEvent("ds_0", "select * from t_order_0");
executeInterceptor.afterMethod(null, null, allArguments, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment segment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertNotNull(spans);
assertThat(spans.size(), is(2));
assertSpan(spans.get(0), 0);
assertThat(spans.get(1).getOperationName(), is("/SJDBC/TRUNK/DQL"));
}
@Test
public void assertAsyncExecute() throws Throwable {
executeInterceptor.beforeMethod(null, null, allArguments, null, null);
asyncExecuteInterceptor.beforeMethod(null, null, null, null, null);
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
ES.submit(() -> {
ExecutorDataMap.setDataMap(dataMap);
sendEvent("ds_1", "select * from t_order_1");
}).get();
asyncExecuteInterceptor.afterMethod(null, null, null, null, null);
sendEvent("ds_0", "select * from t_order_0");
executeInterceptor.afterMethod(null, null, allArguments, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(2));
TraceSegment segment0 = segmentStorage.getTraceSegments().get(0);
TraceSegment segment1 = segmentStorage.getTraceSegments().get(1);
assertThat(segment0.getRefs().size(), is(1));
List<AbstractTracingSpan> spans0 = SegmentHelper.getSpans(segment0);
assertNotNull(spans0);
assertThat(spans0.size(), is(1));
assertSpan(spans0.get(0), 1);
List<AbstractTracingSpan> spans1 = SegmentHelper.getSpans(segment1);
assertNotNull(spans1);
assertThat(spans1.size(), is(2));
assertSpan(spans1.get(0), 0);
assertThat(spans1.get(1).getOperationName(), is("/SJDBC/TRUNK/DQL"));
}
@Test
public void assertAsyncContextHold() throws Throwable {
ExecutorDataMap.getDataMap().put("FOO_KEY", "FOO_VALUE");
executeInterceptor.beforeMethod(null, null, allArguments, null, null);
asyncExecuteInterceptor.beforeMethod(null, null, null, null, null);
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
ES.submit(() -> {
ExecutorDataMap.setDataMap(dataMap);
sendEvent("ds_1", "select * from t_order_1");
}).get();
asyncExecuteInterceptor.afterMethod(null, null, null, null, null);
executeInterceptor.afterMethod(null, null, allArguments, null, null);
assertThat(ExecutorDataMap.getDataMap().size(), is(1));
assertThat(ExecutorDataMap.getDataMap().get("FOO_KEY"), Is.<Object>is("FOO_VALUE"));
}
@Test
public void assertExecuteError() throws Throwable {
executeInterceptor.beforeMethod(null, null, allArguments, null, null);
asyncExecuteInterceptor.beforeMethod(null, null, null, null, null);
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
ES.submit(() -> {
ExecutorDataMap.setDataMap(dataMap);
sendError();
}).get();
asyncExecuteInterceptor.handleMethodException(null, null, null, null, new SQLException("test"));
asyncExecuteInterceptor.afterMethod(null, null, null, null, null);
sendEvent("ds_0", "select * from t_order_0");
executeInterceptor.handleMethodException(null, null, allArguments, null, new SQLException("Test"));
executeInterceptor.afterMethod(null, null, allArguments, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(2));
TraceSegment segment0 = segmentStorage.getTraceSegments().get(0);
TraceSegment segment1 = segmentStorage.getTraceSegments().get(1);
List<AbstractTracingSpan> spans0 = SegmentHelper.getSpans(segment0);
assertNotNull(spans0);
assertThat(spans0.size(), is(1));
assertErrorSpan(spans0.get(0));
List<AbstractTracingSpan> spans1 = SegmentHelper.getSpans(segment1);
assertNotNull(spans1);
assertThat(spans1.size(), is(2));
assertSpan(spans1.get(0), 0);
assertErrorSpan(spans1.get(1));
}
private void assertSpan(AbstractTracingSpan span, int index) {
assertComponent(span, ComponentsDefine.SHARDING_JDBC);
assertLayer(span, SpanLayer.DB);
assertTag(span, 0, "sql");
assertTag(span, 1, "ds_" + index);
assertTag(span, 2, "select * from t_order_" + index);
assertThat(span.isExit(), is(true));
assertThat(span.getOperationName(), is("/SJDBC/BRANCH/QUERY"));
}
private void assertErrorSpan(AbstractTracingSpan span) {
assertOccurException(span, true);
}
private void sendEvent(String datasource, String sql) {
DQLExecutionEvent event = new DQLExecutionEvent(datasource, sql, Arrays.asList("1", 100));
EventBusInstance.getInstance().post(event);
event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(event);
}
private void sendError() {
DMLExecutionEvent event = new DMLExecutionEvent("", "", Collections.emptyList());
EventBusInstance.getInstance().post(event);
event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
event.setException(Optional.of(new SQLException("Test")));
EventBusInstance.getInstance().post(event);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册