提交 bf8c023a 编写于 作者: K kun 提交者: oldratlee

feature: vert.x-extension

上级 ccf95259
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
property `com.alibaba:transmittable-thread-local:jar` is generated by maven-dependency-plugin
How to get path to dependency jar with maven
Apache Maven Dependency Plugin – Introduction
package com.alibaba.ttl.agent.extension_transformlet.vertx;
import com.alibaba.ttl.spi.TtlAttachments;
import com.alibaba.ttl.spi.TtlAttachmentsDelegate;
import com.alibaba.ttl.spi.TtlEnhanced;
import com.alibaba.ttl.spi.TtlWrapper;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.vertx.core.Handler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static com.alibaba.ttl.TransmittableThreadLocal.Transmitter.*;
* {@link TtlVertxHandler} decorate {@link Handler}, so as to get {@link com.alibaba.ttl.TransmittableThreadLocal}
* and transmit it to the time of {@link Handler} execution,
* needed when use {@link Handler} to {@link io.vertx.core.Future}.
* <p>
* we will capture ttl value in another thread by modify {@link io.netty.util.concurrent.SingleThreadEventExecutor#execute(Runnable)},
* but we can not capture the ttl value which we expect in callback of identical thread.
* the reason of above issue is some async io callback was invoked by the
* {@link io.netty.channel.nio.NioEventLoop#run()} rather than the {@link com.alibaba.ttl.TtlRunnable#run()}
* @author tk (305809299 at qq dot com)
* @see io.netty.channel.nio.NioEventLoop#run()
* @see io.netty.channel.nio.NioEventLoop#processSelectedKeys()
* @see io.vertx.core.Future
* @see com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore(Object)
public class TtlVertxHandler<E> implements Handler<E>, TtlWrapper<Handler<E>>, TtlEnhanced, TtlAttachments {
private final AtomicReference<Object> capturedRef;
private final Handler<E> handler;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlVertxHandler(@NonNull Handler<E> handler, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<>(capture());
this.handler = handler;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
* wrap method {@link Handler#handle(E)}.
public void handle(E event) {
final Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
final Object backup = replay(captured);
try {
} finally {
* return original/unwrapped {@link Handler}.
public Handler<E> getHandler() {
return unwrap();
* unwrap to original/unwrapped {@link Handler<E>}.
* @see com.alibaba.ttl.TtlUnwrap#unwrap(Object)
public Handler<E> unwrap() {
return handler;
public boolean equals(Object o) {
if (this == o) {
return true;
if (o == null || getClass() != o.getClass()) {
return false;
TtlVertxHandler<E> that = (TtlVertxHandler<E>) o;
return handler.equals(that.handler);
public int hashCode() {
return handler.hashCode();
public String toString() {
return this.getClass().getName() + " - " + handler.toString();
* Factory method, wrap input {@link Handler} to {@link TtlVertxHandler}.
* @param handler input {@link Handler}. if input is {@code null}, return {@code null}.
* @return Wrapped {@link Handler}
* @throws IllegalStateException when input is {@link TtlVertxHandler} already.
public static <T> TtlVertxHandler<T> get(@Nullable Handler<T> handler) {
return get(handler, false, false);
* Factory method, wrap input {@link Handler} to {@link TtlVertxHandler}.
* @param handler input {@link Handler}. if input is {@code null}, return {@code null}.
* @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred.
* @return Wrapped {@link Handler}
* @throws IllegalStateException when input is {@link TtlVertxHandler} already.
public static <T> TtlVertxHandler<T> get(@Nullable Handler<T> handler, boolean releaseTtlValueReferenceAfterRun) {
return get(handler, releaseTtlValueReferenceAfterRun, false);
* Factory method, wrap input {@link Handler} to {@link TtlVertxHandler}.
* @param handler input {@link Handler}. if input is {@code null}, return {@code null}.
* @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred.
* @param idempotent is idempotent mode or not. if {@code true}, just return input {@link Handler} when it's {@link TtlVertxHandler},
* otherwise throw {@link IllegalStateException}.
* <B><I>Caution</I></B>: {@code true} will cover up bugs! <b>DO NOT</b> set, only when you know why.
* @return Wrapped {@link Handler}
* @throws IllegalStateException when input is {@link TtlVertxHandler} already and not idempotent.
public static <T> TtlVertxHandler<T> get(@Nullable Handler<T> handler, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == handler) {
return null;
if (handler instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) {
return (TtlVertxHandler<T>) handler;
} else {
throw new IllegalStateException("Already TtlVertxHandler!");
return new TtlVertxHandler<>(handler, releaseTtlValueReferenceAfterRun);
* wrap input {@link Handler} Collection to {@link TtlVertxHandler} Collection.
* @param tasks task to be wrapped. if input is {@code null}, return {@code null}.
* @return wrapped tasks
* @throws IllegalStateException when input is {@link TtlVertxHandler} already.
public static <T> List<TtlVertxHandler<T>> gets(@Nullable Collection<? extends Handler<T>> tasks) {
return gets(tasks, false, false);
* wrap input {@link Handler} Collection to {@link TtlVertxHandler} Collection.
* @param tasks task to be wrapped. if input is {@code null}, return {@code null}.
* @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred.
* @return wrapped tasks
* @throws IllegalStateException when input is {@link TtlVertxHandler} already.
public static <T> List<TtlVertxHandler<T>> gets(@Nullable Collection<? extends Handler<T>> tasks, boolean releaseTtlValueReferenceAfterRun) {
return gets(tasks, releaseTtlValueReferenceAfterRun, false);
* wrap input {@link Handler} Collection to {@link TtlVertxHandler} Collection.
* @param tasks task to be wrapped. if input is {@code null}, return {@code null}.
* @param releaseTtlValueReferenceAfterRun release TTL value reference after run, avoid memory leak even if {@link TtlVertxHandler} is referred.
* @param idempotent is idempotent mode or not. if {@code true}, just return input {@link Handler} when it's {@link TtlVertxHandler},
* otherwise throw {@link IllegalStateException}.
* <B><I>Caution</I></B>: {@code true} will cover up bugs! <b>DO NOT</b> set, only when you know why.
* @return wrapped tasks
* @throws IllegalStateException when input is {@link TtlVertxHandler} already and not idempotent.
public static <T> List<TtlVertxHandler<T>> gets(@Nullable Collection<? extends Handler<T>> tasks, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == tasks) {
return Collections.emptyList();
List<TtlVertxHandler<T>> copy = new ArrayList<>();
for (Handler<T> task : tasks) {
copy.add(TtlVertxHandler.get(task, releaseTtlValueReferenceAfterRun, idempotent));
return copy;
* Unwrap {@link TtlVertxHandler} to the original/underneath one.
* <p>
* this method is {@code null}-safe, when input {@code Function} parameter is {@code null}, return {@code null};
* if input {@code Function} parameter is not a {@link TtlVertxHandler} just return input {@code Function}.
* <p>
* so {@code TtlVertxHandler.unwrap(TtlVertxHandler.get(function))} will always return the same input {@code function} object.
* @see #handle(Object)
* @see com.alibaba.ttl.TtlUnwrap#unwrap(Object)
public static <T> Handler<T> unwrap(@Nullable Handler<T> handler) {
if (!(handler instanceof TtlVertxHandler)) {
return handler;
} else {
return ((TtlVertxHandler<T>) handler).getHandler();
* Unwrap {@link TtlVertxHandler} to the original/underneath one for collection.
* <p>
* Invoke {@link #unwrap(Handler)} for each element in input collection.
* <p>
* This method is {@code null}-safe, when input {@code Handler} parameter collection is {@code null}, return a empty list.
* @see #gets(Collection)
* @see #unwrap(Handler)
public static <T> List<Handler<T>> unwraps(@Nullable Collection<? extends Handler<T>> tasks) {
if (null == tasks) {
return Collections.emptyList();
List<Handler<T>> copy = new ArrayList<>();
for (Handler<T> task : tasks) {
if (!(task instanceof TtlVertxHandler)) {
} else {
copy.add(((TtlVertxHandler<T>) task).getHandler());
return copy;
private final TtlAttachmentsDelegate ttlAttachment = new TtlAttachmentsDelegate();
* see {@link TtlAttachments#setTtlAttachment(String, Object)}
public void setTtlAttachment(@NonNull String key, Object value) {
ttlAttachment.setTtlAttachment(key, value);
* see {@link TtlAttachments#getTtlAttachment(String)}
public <T> T getTtlAttachment(@NonNull String key) {
return ttlAttachment.getTtlAttachment(key);
package com.alibaba.ttl.agent.extension_transformlet.vertx.transformlet;
import com.alibaba.ttl.threadpool.agent.transformlet.helper.AbstractExecutorTtlTransformlet;
import java.util.HashSet;
import java.util.Set;
* {@link com.alibaba.ttl.threadpool.agent.transformlet.TtlTransformlet}
* for {@link io.netty.util.concurrent.SingleThreadEventExecutor}.
* @see io.netty.util.concurrent.SingleThreadEventExecutor
* @see io.vertx.core.eventbus.EventBus
* @see io.vertx.core.impl.EventLoopContext
* @see io.vertx.core.eventbus.Message
public final class NettySingleThreadEventExecutorTtlTransformlet extends AbstractExecutorTtlTransformlet {
private static final Set<String> EXECUTOR_CLASS_NAMES = new HashSet<String>();
static {
private static final String THREAD_FACTORY_CLASS_NAME = "java.util.concurrent.ThreadFactory";
public NettySingleThreadEventExecutorTtlTransformlet(boolean disableInheritableForThreadPool) {
super(EXECUTOR_CLASS_NAMES, disableInheritableForThreadPool);
public NettySingleThreadEventExecutorTtlTransformlet() {
package com.alibaba.ttl.agent.extension_transformlet.vertx.transformlet;
import com.alibaba.ttl.threadpool.agent.logging.Logger;
import com.alibaba.ttl.threadpool.agent.transformlet.ClassInfo;
import com.alibaba.ttl.threadpool.agent.transformlet.TtlTransformlet;
import com.alibaba.ttl.threadpool.agent.transformlet.javassist.CannotCompileException;
import com.alibaba.ttl.threadpool.agent.transformlet.javassist.CtClass;
import com.alibaba.ttl.threadpool.agent.transformlet.javassist.CtMethod;
import com.alibaba.ttl.threadpool.agent.transformlet.javassist.NotFoundException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.URLClassLoader;
import java.util.HashSet;
import java.util.Set;
import static com.alibaba.ttl.threadpool.agent.transformlet.helper.TtlTransformletHelper.signatureOfMethod;
* {@link TtlTransformlet} for {@link io.vertx.core.Future}.
* @author tk (305809299 at qq dot com)
* @see com.alibaba.ttl.integration.vertx4.TtlVertxHandler
* @see io.vertx.core.Future
* @see io.vertx.core.Handler
* @see sun.misc.Launcher.AppClassLoader
* @see URLClassLoader#findClass(String)
public class VertxFutureTtlTransformlet implements TtlTransformlet {
private static final Logger logger = Logger.getLogger(VertxFutureTtlTransformlet.class);
private static final String HANDLER_CLASS_NAME = "io.vertx.core.Handler";
private static final String TTL_HANDLER_CLASS_NAME = "com.alibaba.ttl.agent.extension_transformlet.vertx.TtlVertxHandler";
private static final String FUTURE_CLASS_NAME = "io.vertx.core.Future";
private static final String FUTURE_IMPL_CLASS_NAME = "io.vertx.core.impl.future.FutureImpl";
private static final Set<String> TO_BE_TRANSFORMED_CLASS_NAMES = new HashSet<>();
static {
public void doTransform(@NonNull ClassInfo classInfo) throws CannotCompileException, NotFoundException, IOException {
final CtClass clazz = classInfo.getCtClass();
if (TO_BE_TRANSFORMED_CLASS_NAMES.contains(classInfo.getClassName())) {
for (CtMethod method : clazz.getDeclaredMethods()) {
private void updateSetHandlerMethodsOfFutureClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(CtMethod method) throws NotFoundException, CannotCompileException {
final int modifiers = method.getModifiers();
if (!checkMethodNeedToBeDecorated(modifiers)) {
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
final String paramTypeName = parameterTypes[i].getName();
if (HANDLER_CLASS_NAME.equals(paramTypeName)) {
String code = String.format(
// decorate to TTL wrapper,
// and then set AutoWrapper attachment/Tag
"$%d = %s.get($%1$d, false, true);"
+ "\n com.alibaba.ttl.spi.TtlAttachmentsDelegate.setAutoWrapperAttachment($%1$d);",
logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ":\n" + code);
if (insertCode.length() > 0) method.insertBefore(insertCode.toString());
private boolean checkMethodNeedToBeDecorated(int modifiers) {
return Modifier.isPublic(modifiers) || !Modifier.isStatic(modifiers) || !Modifier.isAbstract(modifiers);
package com.alibaba.ttl.agent.extension_transformlet.vertx;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.agent.TtlAgent;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;
import org.junit.Assert;
import org.junit.Test;
* @author: tk (soulmate.tangk at gmail dot com)
* @date: 2021/2/2
public class VertxTransformletTest {
public void testTransmitThreadLocalInEventbus() {
TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<String>();
InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<String>();
String transmittedData = "hahahahaha";
Vertx vertx = Vertx.vertx();
vertx.eventBus().consumer("consumer", message -> {
//there will be execute in netty event loop thread
if (TtlAgent.isTtlAgentLoaded()) {
System.out.println("Test **WITH** TTL Agent");
Assert.assertEquals(transmittedData, transmittableThreadLocal.get());
} else {
System.out.println("Test WITHOUT TTL Agent");
//it is always null
//delivery message
vertx.eventBus().request("consumer", "asdfsd");
* @see
* @throws InterruptedException
public void testCallback() throws InterruptedException {
TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<String>();
InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<String>();
String transmittedData = "hahahahaha";
Vertx vertx = Vertx.vertx();
//here will bind eventLoop to client and create a new Thread for eventLoop
WebClient client = WebClient.create(vertx);
//set value after eventLoop thread was created
.get(80, "baidu.com", "/")
.onSuccess(response -> {
if (TtlAgent.isTtlAgentLoaded()) {
System.out.println("Test **WITH** TTL Agent");
Assert.assertEquals(transmittedData, transmittableThreadLocal.get());
} else {
System.out.println("Test WITHOUT TTL Agent");
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册