package com.alibaba.ttl; import java.util.*; import java.util.concurrent.Callable; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; /** * {@link TransmittableThreadLocal} can transmit value from the thread of submitting task to the thread of executing task. *

* Note: {@link TransmittableThreadLocal} extends {@link java.lang.InheritableThreadLocal}, * so {@link TransmittableThreadLocal} first is a {@link java.lang.InheritableThreadLocal}. * * @author Jerry Lee (oldratlee at gmail dot com) * @see TtlRunnable * @see TtlCallable * @since 0.10.0 */ public class TransmittableThreadLocal extends InheritableThreadLocal { private static final Logger logger = Logger.getLogger(TransmittableThreadLocal.class.getName()); /** * Computes the value for this transmittable thread-local variable * as a function of the source thread's value at the time the task * Object is created. This method is called from {@link TtlRunnable} or * {@link TtlCallable} when it create, before the task is started. *

* This method merely returns reference of its source thread value, and should be overridden * if a different behavior is desired. * * @since 1.0.0 */ protected T copy(T parentValue) { return parentValue; } /** * Callback method before task object({@link TtlRunnable}/{@link TtlCallable}) execute. *

* Default behavior is do nothing, and should be overridden * if a different behavior is desired. *

* Do not throw any exception, just ignored. * * @since 1.2.0 */ protected void beforeExecute() { } /** * Callback method after task object({@link TtlRunnable}/{@link TtlCallable}) execute. *

* Default behavior is do nothing, and should be overridden * if a different behavior is desired. *

* Do not throw any exception, just ignored. * * @since 1.2.0 */ protected void afterExecute() { } @Override public final T get() { T value = super.get(); if (null != value) { addValue(); } return value; } @Override public final void set(T value) { super.set(value); if (null == value) { // may set null to remove value removeValue(); } else { addValue(); } } @Override public final void remove() { removeValue(); super.remove(); } void superRemove() { super.remove(); } T copyValue() { return copy(get()); } private static InheritableThreadLocal>> holder = new InheritableThreadLocal>>() { @Override protected Set> initialValue() { return Collections.newSetFromMap(new WeakHashMap, Boolean>()); } @Override protected Set> childValue(Set> parentValue) { Set> result = Collections.newSetFromMap( new WeakHashMap, Boolean>()); result.addAll(parentValue); return result; } }; private void addValue() { if (!holder.get().contains(this)) { holder.get().add(this); } } private void removeValue() { holder.get().remove(this); } private static void doExecuteCallback(boolean isBefore) { for (TransmittableThreadLocal threadLocal : holder.get()) { try { if (isBefore) { threadLocal.beforeExecute(); } else { threadLocal.afterExecute(); } } catch (Throwable t) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t); } } } } /** * Debug only method! */ static void dump(String title) { if (title != null && title.length() > 0) { System.out.printf("Start TransmittableThreadLocal[%s] Dump...\n", title); } else { System.out.println("Start TransmittableThreadLocal Dump..."); } for (final TransmittableThreadLocal key : holder.get()) { System.out.println(key.get()); } System.out.println("TransmittableThreadLocal Dump end!"); } /** * Debug only method! */ static void dump() { dump(null); } /** * {@link Transmitter} transmit all {@link TransmittableThreadLocal} values of current thread to * other thread by static method {@link #capture()} => {@link #replay(Object)} => {@link #restore(Object)} (aka {@code CRR} operation). *

* {@link Transmitter} is internal manipulation api for framework/middleware integration; * In general, you will never use it in the biz/application code! *

* Below is the example code: * *


     * ///////////////////////////////////////////////////////////////////////////
     * // in thread A, capture all TransmittableThreadLocal values of thread A
     * ///////////////////////////////////////////////////////////////////////////
     *
     * Object captured = Transmitter.capture(); // (1)
     *
     * ///////////////////////////////////////////////////////////////////////////
     * // in thread B
     * ///////////////////////////////////////////////////////////////////////////
     *
     * // replay all TransmittableThreadLocal values from thread A
     * Object backup = Transmitter.replay(captured); // (2)
     * try {
     *     // your biz logic, run with the TransmittableThreadLocal values of thread B
     *     System.out.println("Hello");
     *     // ...
     *     return "World";
     * } finally {
     *     // restore the TransmittableThreadLocal of thread B when replay
     *     Transmitter.restore(backup); (3)
     * }
     * 
*

* see the implementation code of {@link TtlRunnable} and {@link TtlCallable} for more actual code sample. *


* Of course, {@link #replay(Object)} and {@link #restore(Object)} operation can be simplified * by util methods {@link #runCallableWithCaptured(Object, Callable)} or {@link #runSupplierWithCaptured(Object, Supplier)} * and the adorable {@code Java 8 lambda syntax}. *

* Below is the example code: * *


     * ///////////////////////////////////////////////////////////////////////////
     * // in thread A, capture all TransmittableThreadLocal values of thread A
     * ///////////////////////////////////////////////////////////////////////////
     *
     * Object captured = Transmitter.capture(); // (1)
     *
     * ///////////////////////////////////////////////////////////////////////////
     * // in thread B
     * ///////////////////////////////////////////////////////////////////////////
     *
     * String result = runSupplierWithCaptured(captured, () -> {
     *      // your biz logic, run with the TransmittableThreadLocal values of thread A
     *      System.out.println("Hello");
     *      ...
     *      return "World";
     * }); // (2) + (3)
     * 
*

* The reason of providing 2 util methods is the different {@code throws Exception} type from biz logic({@code lambda}): *

    *
  1. {@link #runCallableWithCaptured(Object, Callable)}: No {@code throws}
  2. *
  3. {@link #runSupplierWithCaptured(Object, Supplier)}: {@code throws Exception}
  4. *
*

* If you has the different {@code throws Exception}, * you can define your own util method with your own {@code throws Exception} type function interface({@code lambda}). * * @author Yang Fang (snoop dot fy at gmail dot com) * @author Jerry Lee (oldratlee at gmail dot com) * @see TtlRunnable * @see TtlCallable * @since 2.3.0 */ public static class Transmitter { /** * Capture all {@link TransmittableThreadLocal} values in current thread. * * @return the captured {@link TransmittableThreadLocal} values * @since 2.3.0 */ public static Object capture() { Map, Object> captured = new HashMap, Object>(); for (TransmittableThreadLocal threadLocal : holder.get()) { captured.put(threadLocal, threadLocal.copyValue()); } return captured; } /** * Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()}, * and return the backup {@link TransmittableThreadLocal} values in current thread before replay. * * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()} * @return the backup {@link TransmittableThreadLocal} values before replay * @see #capture() * @since 2.3.0 */ public static Object replay(Object captured) { @SuppressWarnings("unchecked") Map, Object> capturedMap = (Map, Object>) captured; Map, Object> backup = new HashMap, Object>(); for (Iterator> iterator = holder.get().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal threadLocal = iterator.next(); // backup backup.put(threadLocal, threadLocal.get()); // clear the TTL value only in captured // avoid extra TTL value in captured, when run task. if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set value to captured TTL for (Map.Entry, Object> entry : capturedMap.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal threadLocal = (TransmittableThreadLocal) entry.getKey(); threadLocal.set(entry.getValue()); } // call beforeExecute callback doExecuteCallback(true); return backup; } /** * Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}. * * @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)} * @since 2.3.0 */ public static void restore(Object backup) { @SuppressWarnings("unchecked") Map, Object> backupMap = (Map, Object>) backup; // call afterExecute callback doExecuteCallback(false); for (Iterator> iterator = holder.get().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal threadLocal = iterator.next(); // clear the TTL value only in backup // avoid the extra value of backup after restore if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // restore TTL value for (Map.Entry, Object> entry : backupMap.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal threadLocal = (TransmittableThreadLocal) entry.getKey(); threadLocal.set(entry.getValue()); } } /** * Util method for simplifying {@link #replay(Object)} and {@link #restore(Object)} operation. * * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()} * @param bizLogic biz logic * @param the return type of biz logic * @return the return value of biz logic * @see #capture() * @see #replay(Object) * @see #restore(Object) * @since 2.3.1 */ public static R runSupplierWithCaptured(Object captured, Supplier bizLogic) { Object backup = replay(captured); try { return bizLogic.get(); } finally { restore(backup); } } /** * Util method for simplifying {@link #replay(Object)} and {@link #restore(Object)} operation. * * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()} * @param bizLogic biz logic * @param the return type of biz logic * @return the return value of biz logic * @throws Exception exception threw by biz logic * @see #capture() * @see #replay(Object) * @see #restore(Object) * @since 2.3.1 */ public static R runCallableWithCaptured(Object captured, Callable bizLogic) throws Exception { Object backup = replay(captured); try { return bizLogic.call(); } finally { restore(backup); } } private Transmitter() { throw new InstantiationError("Must not instantiate this class"); } } }