diff --git a/ttl-integrations/ttl-vertx/pom.xml b/ttl-integrations/ttl-vertx/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..1e6f7fa17ea19a633dab83969a4bccc4fdac1628 --- /dev/null +++ b/ttl-integrations/ttl-vertx/pom.xml @@ -0,0 +1,130 @@ + + 4.0.0 + + com.alibaba + ttl-vertx + 1.0.0-SNAPSHOT + jar + ${project.artifactId} + + + 1.8 + 1.8 + UTF-8 + + 2.13.0-SNAPSHOT + 1.4.21-2 + 1.4.2 + + + + + io.vertx + vertx-core + 4.0.0 + + + io.vertx + vertx-web-client + 4.0.0 + test + + + + com.github.spotbugs + spotbugs-annotations + 4.2.0 + true + + + com.alibaba + transmittable-thread-local + ${ttl.version} + + + junit + junit + 4.13.1 + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + + org.apache.maven.plugins + maven-deploy-plugin + 2.8.2 + + true + + + + + + + + enable-TtlAgent-forTest + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + ${surefire.verbose.class} + -javaagent:${com.alibaba:transmittable-thread-local:jar}=ttl.agent.logger:STDOUT + ${surefire.ttl.agent.log.class.transform} + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.2 + + + initialize + + properties + + + + + + + + + enable-LogTransform-forTest + + -Dttl.agent.log.class.transform + + + + + enable-verboseClass-forTest + + -verbose:class + + + + diff --git a/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/TtlVertxHandler.java b/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/TtlVertxHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..6af8c9bcc420d006f01689d479b60f82ef569666 --- /dev/null +++ b/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/TtlVertxHandler.java @@ -0,0 +1,274 @@ +package com.alibaba.ttl.agent.extension_transformlet.vertx; + +import com.alibaba.ttl.spi.TtlAttachments; +import com.alibaba.ttl.spi.TtlAttachmentsDelegate; +import com.alibaba.ttl.spi.TtlEnhanced; +import com.alibaba.ttl.spi.TtlWrapper; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import io.vertx.core.Handler; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static com.alibaba.ttl.TransmittableThreadLocal.Transmitter.*; + +/** + * {@link TtlVertxHandler} decorate {@link Handler}, so as to get {@link com.alibaba.ttl.TransmittableThreadLocal} + * and transmit it to the time of {@link Handler} execution, + * needed when use {@link Handler} to {@link io.vertx.core.Future}. + *

+ * we will capture ttl value in another thread by modify {@link io.netty.util.concurrent.SingleThreadEventExecutor#execute(Runnable)}, + * but we can not capture the ttl value which we expect in callback of identical thread. + * the reason of above issue is some async io callback was invoked by the + * {@link io.netty.channel.nio.NioEventLoop#run()} rather than the {@link com.alibaba.ttl.TtlRunnable#run()} + * + * @author tk (305809299 at qq dot com) + * @see io.netty.channel.nio.NioEventLoop#run() + * @see io.netty.channel.nio.NioEventLoop#processSelectedKeys() + * @see io.vertx.core.Future + * @see com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore(Object) + */ +public class TtlVertxHandler implements Handler, TtlWrapper>, TtlEnhanced, TtlAttachments { + private final AtomicReference capturedRef; + private final Handler handler; + private final boolean releaseTtlValueReferenceAfterRun; + + private TtlVertxHandler(@NonNull Handler handler, boolean releaseTtlValueReferenceAfterRun) { + this.capturedRef = new AtomicReference<>(capture()); + this.handler = handler; + this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; + } + + /** + * wrap method {@link Handler#handle(E)}. + */ + @Override + public void handle(E event) { + final Object captured = capturedRef.get(); + if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { + throw new IllegalStateException("TTL value reference is released after run!"); + } + + final Object backup = replay(captured); + try { + handler.handle(event); + } finally { + restore(backup); + } + } + + /** + * return original/unwrapped {@link Handler}. + */ + @NonNull + public Handler getHandler() { + return unwrap(); + } + + /** + * unwrap to original/unwrapped {@link Handler}. + * + * @see com.alibaba.ttl.TtlUnwrap#unwrap(Object) + */ + @NonNull + @Override + public Handler unwrap() { + return handler; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + @SuppressWarnings("unchecked") + TtlVertxHandler that = (TtlVertxHandler) o; + + return handler.equals(that.handler); + } + + @Override + public int hashCode() { + return handler.hashCode(); + } + + @Override + public String toString() { + return this.getClass().getName() + " - " + handler.toString(); + } + + /** + * Factory method, wrap input {@link Handler} to {@link TtlVertxHandler}. + * + * @param handler input {@link Handler}. if input is {@code null}, return {@code null}. + * @return Wrapped {@link Handler} + * @throws IllegalStateException when input is {@link TtlVertxHandler} already. + */ + @Nullable + public static TtlVertxHandler get(@Nullable Handler handler) { + return get(handler, false, false); + } + + /** + * Factory method, wrap input {@link Handler} to {@link TtlVertxHandler}. + * + * @param handler input {@link Handler}. if input is {@code null}, return {@code null}. + * @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred. + * @return Wrapped {@link Handler} + * @throws IllegalStateException when input is {@link TtlVertxHandler} already. + */ + @Nullable + public static TtlVertxHandler get(@Nullable Handler handler, boolean releaseTtlValueReferenceAfterRun) { + return get(handler, releaseTtlValueReferenceAfterRun, false); + } + + /** + * Factory method, wrap input {@link Handler} to {@link TtlVertxHandler}. + * + * @param handler input {@link Handler}. if input is {@code null}, return {@code null}. + * @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred. + * @param idempotent is idempotent mode or not. if {@code true}, just return input {@link Handler} when it's {@link TtlVertxHandler}, + * otherwise throw {@link IllegalStateException}. + * Caution: {@code true} will cover up bugs! DO NOT set, only when you know why. + * @return Wrapped {@link Handler} + * @throws IllegalStateException when input is {@link TtlVertxHandler} already and not idempotent. + */ + @Nullable + public static TtlVertxHandler get(@Nullable Handler handler, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { + if (null == handler) { + return null; + } + + if (handler instanceof TtlEnhanced) { + // avoid redundant decoration, and ensure idempotency + if (idempotent) { + return (TtlVertxHandler) handler; + } else { + throw new IllegalStateException("Already TtlVertxHandler!"); + } + } + return new TtlVertxHandler<>(handler, releaseTtlValueReferenceAfterRun); + } + + /** + * wrap input {@link Handler} Collection to {@link TtlVertxHandler} Collection. + * + * @param tasks task to be wrapped. if input is {@code null}, return {@code null}. + * @return wrapped tasks + * @throws IllegalStateException when input is {@link TtlVertxHandler} already. + */ + @NonNull + public static List> gets(@Nullable Collection> tasks) { + return gets(tasks, false, false); + } + + /** + * wrap input {@link Handler} Collection to {@link TtlVertxHandler} Collection. + * + * @param tasks task to be wrapped. if input is {@code null}, return {@code null}. + * @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred. + * @return wrapped tasks + * @throws IllegalStateException when input is {@link TtlVertxHandler} already. + */ + @NonNull + public static List> gets(@Nullable Collection> tasks, boolean releaseTtlValueReferenceAfterRun) { + return gets(tasks, releaseTtlValueReferenceAfterRun, false); + } + + /** + * wrap input {@link Handler} Collection to {@link TtlVertxHandler} Collection. + * + * @param tasks task to be wrapped. if input is {@code null}, return {@code null}. + * @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred. + * @param idempotent is idempotent mode or not. if {@code true}, just return input {@link Handler} when it's {@link TtlVertxHandler}, + * otherwise throw {@link IllegalStateException}. + * Caution: {@code true} will cover up bugs! DO NOT set, only when you know why. + * @return wrapped tasks + * @throws IllegalStateException when input is {@link TtlVertxHandler} already and not idempotent. + */ + @NonNull + public static List> gets(@Nullable Collection> tasks, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { + if (null == tasks) { + return Collections.emptyList(); + } + + List> copy = new ArrayList<>(); + for (Handler task : tasks) { + copy.add(TtlVertxHandler.get(task, releaseTtlValueReferenceAfterRun, idempotent)); + } + return copy; + } + + /** + * Unwrap {@link TtlVertxHandler} to the original/underneath one. + *

+ * this method is {@code null}-safe, when input {@code Function} parameter is {@code null}, return {@code null}; + * if input {@code Function} parameter is not a {@link TtlVertxHandler} just return input {@code Function}. + *

+ * so {@code TtlVertxHandler.unwrap(TtlVertxHandler.get(function))} will always return the same input {@code function} object. + * + * @see #handle(Object) + * @see com.alibaba.ttl.TtlUnwrap#unwrap(Object) + */ + @Nullable + public static Handler unwrap(@Nullable Handler handler) { + if (!(handler instanceof TtlVertxHandler)) { + return handler; + } else { + return ((TtlVertxHandler) handler).getHandler(); + } + } + + /** + * Unwrap {@link TtlVertxHandler} to the original/underneath one for collection. + *

+ * Invoke {@link #unwrap(Handler)} for each element in input collection. + *

+ * This method is {@code null}-safe, when input {@code Handler} parameter collection is {@code null}, return a empty list. + * + * @see #gets(Collection) + * @see #unwrap(Handler) + */ + @NonNull + public static List> unwraps(@Nullable Collection> tasks) { + if (null == tasks) { + return Collections.emptyList(); + } + + List> copy = new ArrayList<>(); + for (Handler task : tasks) { + if (!(task instanceof TtlVertxHandler)) { + copy.add(task); + } else { + copy.add(((TtlVertxHandler) task).getHandler()); + } + } + return copy; + } + + private final TtlAttachmentsDelegate ttlAttachment = new TtlAttachmentsDelegate(); + + /** + * see {@link TtlAttachments#setTtlAttachment(String, Object)} + */ + @Override + public void setTtlAttachment(@NonNull String key, Object value) { + ttlAttachment.setTtlAttachment(key, value); + } + + /** + * see {@link TtlAttachments#getTtlAttachment(String)} + */ + @Override + public T getTtlAttachment(@NonNull String key) { + return ttlAttachment.getTtlAttachment(key); + } +} diff --git a/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/transformlet/NettySingleThreadEventExecutorTtlTransformlet.java b/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/transformlet/NettySingleThreadEventExecutorTtlTransformlet.java new file mode 100644 index 0000000000000000000000000000000000000000..dfd198f6b6542c9a020741df12b9a57de857c07d --- /dev/null +++ b/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/transformlet/NettySingleThreadEventExecutorTtlTransformlet.java @@ -0,0 +1,33 @@ +package com.alibaba.ttl.agent.extension_transformlet.vertx.transformlet; + +import com.alibaba.ttl.threadpool.agent.transformlet.helper.AbstractExecutorTtlTransformlet; + +import java.util.HashSet; +import java.util.Set; + +/** + * {@link com.alibaba.ttl.threadpool.agent.transformlet.TtlTransformlet} + * for {@link io.netty.util.concurrent.SingleThreadEventExecutor}. + * + * @see io.netty.util.concurrent.SingleThreadEventExecutor + * @see io.vertx.core.eventbus.EventBus + * @see io.vertx.core.impl.EventLoopContext + * @see io.vertx.core.eventbus.Message + */ +public final class NettySingleThreadEventExecutorTtlTransformlet extends AbstractExecutorTtlTransformlet { + private static final Set EXECUTOR_CLASS_NAMES = new HashSet(); + + static { + EXECUTOR_CLASS_NAMES.add("io.netty.util.concurrent.SingleThreadEventExecutor"); + } + + private static final String THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ThreadFactory"; + + public NettySingleThreadEventExecutorTtlTransformlet(boolean disableInheritableForThreadPool) { + super(EXECUTOR_CLASS_NAMES, disableInheritableForThreadPool); + System.out.println("================================================"); + } + public NettySingleThreadEventExecutorTtlTransformlet() { + super(EXECUTOR_CLASS_NAMES, false); + } +} diff --git a/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/transformlet/VertxFutureTtlTransformlet.java b/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/transformlet/VertxFutureTtlTransformlet.java new file mode 100644 index 0000000000000000000000000000000000000000..542704d7d1cccad5b8feaa597967855ebf06cd2e --- /dev/null +++ b/ttl-integrations/ttl-vertx/src/main/java/com/alibaba/ttl/agent/extension_transformlet/vertx/transformlet/VertxFutureTtlTransformlet.java @@ -0,0 +1,83 @@ +package com.alibaba.ttl.agent.extension_transformlet.vertx.transformlet; + +import com.alibaba.ttl.threadpool.agent.logging.Logger; +import com.alibaba.ttl.threadpool.agent.transformlet.ClassInfo; +import com.alibaba.ttl.threadpool.agent.transformlet.TtlTransformlet; +import com.alibaba.ttl.threadpool.agent.transformlet.javassist.CannotCompileException; +import com.alibaba.ttl.threadpool.agent.transformlet.javassist.CtClass; +import com.alibaba.ttl.threadpool.agent.transformlet.javassist.CtMethod; +import com.alibaba.ttl.threadpool.agent.transformlet.javassist.NotFoundException; +import edu.umd.cs.findbugs.annotations.NonNull; + +import java.io.IOException; +import java.lang.reflect.Modifier; +import java.net.URLClassLoader; +import java.util.HashSet; +import java.util.Set; + +import static com.alibaba.ttl.threadpool.agent.transformlet.helper.TtlTransformletHelper.signatureOfMethod; + +/** + * {@link TtlTransformlet} for {@link io.vertx.core.Future}. + * + * @author tk (305809299 at qq dot com) + * @see com.alibaba.ttl.integration.vertx4.TtlVertxHandler + * @see io.vertx.core.Future + * @see io.vertx.core.Handler + * @see sun.misc.Launcher.AppClassLoader + * @see URLClassLoader#findClass(String) + */ +public class VertxFutureTtlTransformlet implements TtlTransformlet { + private static final Logger logger = Logger.getLogger(VertxFutureTtlTransformlet.class); + + private static final String HANDLER_CLASS_NAME = "io.vertx.core.Handler"; + private static final String TTL_HANDLER_CLASS_NAME = "com.alibaba.ttl.agent.extension_transformlet.vertx.TtlVertxHandler"; + private static final String FUTURE_CLASS_NAME = "io.vertx.core.Future"; + private static final String FUTURE_IMPL_CLASS_NAME = "io.vertx.core.impl.future.FutureImpl"; + + private static final Set TO_BE_TRANSFORMED_CLASS_NAMES = new HashSet<>(); + + static { + TO_BE_TRANSFORMED_CLASS_NAMES.add(FUTURE_CLASS_NAME); + TO_BE_TRANSFORMED_CLASS_NAMES.add(FUTURE_IMPL_CLASS_NAME); + } + + @Override + public void doTransform(@NonNull ClassInfo classInfo) throws CannotCompileException, NotFoundException, IOException { + final CtClass clazz = classInfo.getCtClass(); + if (TO_BE_TRANSFORMED_CLASS_NAMES.contains(classInfo.getClassName())) { + for (CtMethod method : clazz.getDeclaredMethods()) { + updateSetHandlerMethodsOfFutureClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method); + } + classInfo.setModified(); + } + } + + private void updateSetHandlerMethodsOfFutureClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(CtMethod method) throws NotFoundException, CannotCompileException { + final int modifiers = method.getModifiers(); + if (!checkMethodNeedToBeDecorated(modifiers)) { + return; + } + + CtClass[] parameterTypes = method.getParameterTypes(); + StringBuilder insertCode = new StringBuilder(); + for (int i = 0; i < parameterTypes.length; i++) { + final String paramTypeName = parameterTypes[i].getName(); + if (HANDLER_CLASS_NAME.equals(paramTypeName)) { + String code = String.format( + // decorate to TTL wrapper, + // and then set AutoWrapper attachment/Tag + "$%d = %s.get($%1$d, false, true);" + + "\n com.alibaba.ttl.spi.TtlAttachmentsDelegate.setAutoWrapperAttachment($%1$d);", + i + 1, TTL_HANDLER_CLASS_NAME); + logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ":\n" + code); + insertCode.append(code); + } + } + if (insertCode.length() > 0) method.insertBefore(insertCode.toString()); + } + + private boolean checkMethodNeedToBeDecorated(int modifiers) { + return Modifier.isPublic(modifiers) || !Modifier.isStatic(modifiers) || !Modifier.isAbstract(modifiers); + } +} diff --git a/ttl-integrations/ttl-vertx/src/main/resources/META-INF/ttl.agent.transformlets b/ttl-integrations/ttl-vertx/src/main/resources/META-INF/ttl.agent.transformlets new file mode 100644 index 0000000000000000000000000000000000000000..d6faf5e0ea7da65d06197b48e81db39469bc5d37 --- /dev/null +++ b/ttl-integrations/ttl-vertx/src/main/resources/META-INF/ttl.agent.transformlets @@ -0,0 +1,2 @@ +com.alibaba.ttl.agent.extension_transformlet.vertx.transformlet.NettySingleThreadEventExecutorTtlTransformlet +com.alibaba.ttl.agent.extension_transformlet.vertx.transformlet.VertxFutureTtlTransformlet diff --git a/ttl-integrations/ttl-vertx/src/test/java/com/alibaba/ttl/agent/extension_transformlet/vertx/VertxTransformletTest.java b/ttl-integrations/ttl-vertx/src/test/java/com/alibaba/ttl/agent/extension_transformlet/vertx/VertxTransformletTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6134afa69213c750142141d5f9b09ec2871b30b1 --- /dev/null +++ b/ttl-integrations/ttl-vertx/src/test/java/com/alibaba/ttl/agent/extension_transformlet/vertx/VertxTransformletTest.java @@ -0,0 +1,84 @@ +package com.alibaba.ttl.agent.extension_transformlet.vertx; + +import com.alibaba.ttl.TransmittableThreadLocal; +import com.alibaba.ttl.threadpool.agent.TtlAgent; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author: tk (soulmate.tangk at gmail dot com) + * @date: 2021/2/2 + */ +public class VertxTransformletTest { + @Test + public void testTransmitThreadLocalInEventbus() { + TransmittableThreadLocal transmittableThreadLocal = new TransmittableThreadLocal(); + InheritableThreadLocal inheritableThreadLocal = new InheritableThreadLocal(); + String transmittedData = "hahahahaha"; + + Vertx vertx = Vertx.vertx(); + vertx.eventBus().consumer("consumer", message -> { + //there will be execute in netty event loop thread + System.out.println("========================================"); + + if (TtlAgent.isTtlAgentLoaded()) { + System.out.println("Test **WITH** TTL Agent"); + Assert.assertEquals(transmittedData, transmittableThreadLocal.get()); + } else { + System.out.println("Test WITHOUT TTL Agent"); + Assert.assertNull(transmittableThreadLocal.get()); + } + + //it is always null + Assert.assertNull(inheritableThreadLocal.get()); + + System.out.println("========================================"); + }); + + transmittableThreadLocal.set(transmittedData); + inheritableThreadLocal.set("gagagagaga"); + + //delivery message + vertx.eventBus().request("consumer", "asdfsd"); + } + + /** + * @see + * @throws InterruptedException + */ + @Test + public void testCallback() throws InterruptedException { + TransmittableThreadLocal transmittableThreadLocal = new TransmittableThreadLocal(); + InheritableThreadLocal inheritableThreadLocal = new InheritableThreadLocal(); + String transmittedData = "hahahahaha"; + + Vertx vertx = Vertx.vertx(); + //here will bind eventLoop to client and create a new Thread for eventLoop + WebClient client = WebClient.create(vertx); + + //set value after eventLoop thread was created + transmittableThreadLocal.set(transmittedData); + inheritableThreadLocal.set("gagagagaga"); + + client + .get(80, "baidu.com", "/") + .send() + .onSuccess(response -> { + System.out.println("===================callback====================="); + + if (TtlAgent.isTtlAgentLoaded()) { + System.out.println("Test **WITH** TTL Agent"); + Assert.assertEquals(transmittedData, transmittableThreadLocal.get()); + } else { + System.out.println("Test WITHOUT TTL Agent"); + Assert.assertNull(transmittableThreadLocal.get()); + } + + System.out.println("===================callback====================="); + }); + + Thread.sleep(10000); + } +}