提交 ddf7607a 编写于 作者: oldratlee's avatar oldratlee 🔥

fix idempotency #36

上级 369eb23f
......@@ -30,9 +30,9 @@ public final class MtContextCallable<V> implements Callable<V> {
private MtContextCallable(Callable<V> callable, boolean releaseMtContextAfterCall) {
this.copiedRef = new AtomicReference<Map<MtContextThreadLocal<?>, Object>>(MtContextThreadLocal.copy());
this.callable = callable;
this.releaseMtContextAfterCall = releaseMtContextAfterCall;
this.copiedRef = new AtomicReference<Map<MtContextThreadLocal<?>, Object>>(MtContextThreadLocal.copy());
}
/**
......@@ -69,6 +69,7 @@ public final class MtContextCallable<V> implements Callable<V> {
return get(callable, false);
}
/**
* Factory method, wrapper input {@link Callable} to {@link MtContextCallable}.
* <p/>
......@@ -79,12 +80,31 @@ public final class MtContextCallable<V> implements Callable<V> {
* @return Wrapped {@link Callable}
*/
public static <T> MtContextCallable<T> get(Callable<T> callable, boolean releaseMtContextAfterCall) {
return get(callable, releaseMtContextAfterCall, false);
}
/**
* Factory method, wrapper input {@link Callable} to {@link MtContextCallable}.
* <p/>
* This method is idempotent.
*
* @param callable input {@link Callable}
* @param releaseMtContextAfterCall release MtContext after run, avoid memory leak even if {@link MtContextRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bug! <b>DO NOT</b> set, only when you why.
* @return Wrapped {@link Callable}
*/
public static <T> MtContextCallable<T> get(Callable<T> callable, boolean releaseMtContextAfterCall, boolean idempotent) {
if (null == callable) {
return null;
}
if (callable instanceof MtContextCallable) { // avoid redundant decoration, and ensure idempotency
throw new IllegalStateException("Already MtContextCallable!");
if (callable instanceof MtContextCallable) {
if (idempotent) {
// avoid redundant decoration, and ensure idempotency
return (MtContextCallable<T>) callable;
} else {
throw new IllegalStateException("Already MtContextCallable!");
}
}
return new MtContextCallable<T>(callable, releaseMtContextAfterCall);
}
......@@ -96,7 +116,7 @@ public final class MtContextCallable<V> implements Callable<V> {
* @return Wrapped {@link Callable}
*/
public static <T> List<MtContextCallable<T>> gets(Collection<? extends Callable<T>> tasks) {
return gets(tasks, false);
return gets(tasks, false, false);
}
/**
......@@ -112,7 +132,26 @@ public final class MtContextCallable<V> implements Callable<V> {
}
List<MtContextCallable<T>> copy = new ArrayList<MtContextCallable<T>>();
for (Callable<T> task : tasks) {
copy.add(MtContextCallable.get(task, releaseMtContextAfterCall));
copy.add(MtContextCallable.get(task, releaseMtContextAfterCall, false));
}
return copy;
}
/**
* wrapper input {@link Callable} Collection to {@link MtContextCallable} Collection.
*
* @param tasks task to be wrapped
* @param releaseMtContextAfterCall release MtContext after run, avoid memory leak even if {@link MtContextRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bug! <b>DO NOT</b> set, only when you why.
* @return Wrapped {@link Callable}
*/
public static <T> List<MtContextCallable<T>> gets(Collection<? extends Callable<T>> tasks, boolean releaseMtContextAfterCall, boolean idempotent) {
if (null == tasks) {
return null;
}
List<MtContextCallable<T>> copy = new ArrayList<MtContextCallable<T>>();
for (Callable<T> task : tasks) {
copy.add(MtContextCallable.get(task, releaseMtContextAfterCall, idempotent));
}
return copy;
}
......
......@@ -26,9 +26,9 @@ public final class MtContextRunnable implements Runnable {
private final boolean releaseMtContextAfterRun;
private MtContextRunnable(Runnable runnable, boolean releaseMtContextAfterRun) {
this.copiedRef = new AtomicReference<Map<MtContextThreadLocal<?>, Object>>(MtContextThreadLocal.copy());
this.runnable = runnable;
this.releaseMtContextAfterRun = releaseMtContextAfterRun;
this.copiedRef = new AtomicReference<Map<MtContextThreadLocal<?>, Object>>(MtContextThreadLocal.copy());
}
/**
......@@ -62,7 +62,7 @@ public final class MtContextRunnable implements Runnable {
* @return Wrapped {@link Runnable}
*/
public static MtContextRunnable get(Runnable runnable) {
return get(runnable, false);
return get(runnable, false, false);
}
/**
......@@ -75,12 +75,31 @@ public final class MtContextRunnable implements Runnable {
* @return Wrapped {@link Runnable}
*/
public static MtContextRunnable get(Runnable runnable, boolean releaseMtContextAfterRun) {
return get(runnable, releaseMtContextAfterRun, false);
}
/**
* Factory method, wrapper input {@link Runnable} to {@link MtContextRunnable}.
* <p/>
* This method is idempotent.
*
* @param runnable input {@link Runnable}
* @param releaseMtContextAfterRun release MtContext after run, avoid memory leak even if {@link MtContextRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bug! <b>DO NOT</b> set, only when you why.
* @return Wrapped {@link Runnable}
*/
public static MtContextRunnable get(Runnable runnable, boolean releaseMtContextAfterRun, boolean idempotent) {
if (null == runnable) {
return null;
}
if (runnable instanceof MtContextRunnable) { // avoid redundant decoration, and ensure idempotency
throw new IllegalStateException("Already MtContextRunnable!");
if (runnable instanceof MtContextRunnable) {
if (idempotent) {
// avoid redundant decoration, and ensure idempotency
return (MtContextRunnable) runnable;
} else {
throw new IllegalStateException("Already MtContextRunnable!");
}
}
return new MtContextRunnable(runnable, releaseMtContextAfterRun);
}
......@@ -92,7 +111,7 @@ public final class MtContextRunnable implements Runnable {
* @return wrapped tasks
*/
public static List<MtContextRunnable> gets(Collection<? extends Runnable> tasks) {
return gets(tasks, false);
return gets(tasks, false, false);
}
/**
......@@ -108,7 +127,26 @@ public final class MtContextRunnable implements Runnable {
}
List<MtContextRunnable> copy = new ArrayList<MtContextRunnable>();
for (Runnable task : tasks) {
copy.add(MtContextRunnable.get(task, releaseMtContextAfterRun));
copy.add(MtContextRunnable.get(task, releaseMtContextAfterRun, false));
}
return copy;
}
/**
* wrapper input {@link Runnable} Collection to {@link MtContextRunnable} Collection.
*
* @param tasks task to be wrapped
* @param releaseMtContextAfterRun release MtContext after run, avoid memory leak even if {@link MtContextRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bug! <b>DO NOT</b> set, only when you why.
* @return wrapped tasks
*/
public static List<MtContextRunnable> gets(Collection<? extends Runnable> tasks, boolean releaseMtContextAfterRun, boolean idempotent) {
if (null == tasks) {
return null;
}
List<MtContextRunnable> copy = new ArrayList<MtContextRunnable>();
for (Runnable task : tasks) {
copy.add(MtContextRunnable.get(task, releaseMtContextAfterRun, idempotent));
}
return copy;
}
......
......@@ -2,6 +2,7 @@ package com.alibaba.mtc;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicReference;
/**
* {@link MtContextTimerTask} decorate {@link TimerTask}, so as to get {@link MtContextThreadLocal}
......@@ -21,12 +22,14 @@ import java.util.TimerTask;
*/
@Deprecated
public final class MtContextTimerTask extends TimerTask {
private final Map<MtContextThreadLocal<?>, Object> copied;
private final AtomicReference<Map<MtContextThreadLocal<?>, Object>> copiedRef;
private final TimerTask timerTask;
private final boolean releaseMtContextAfterRun;
private MtContextTimerTask(TimerTask timerTask) {
copied = MtContextThreadLocal.copy();
private MtContextTimerTask(TimerTask timerTask, boolean releaseMtContextAfterRun) {
this.copiedRef = new AtomicReference<Map<MtContextThreadLocal<?>, Object>>(MtContextThreadLocal.copy());
this.timerTask = timerTask;
this.releaseMtContextAfterRun = releaseMtContextAfterRun;
}
/**
......@@ -35,7 +38,11 @@ public final class MtContextTimerTask extends TimerTask {
@Override
public void run() {
// backup MtContext
Map<MtContextThreadLocal<?>, Object> copied = copiedRef.get();
Map<MtContextThreadLocal<?>, Object> backup = MtContextThreadLocal.backupAndSet(copied);
if (copied == null || releaseMtContextAfterRun && !copiedRef.compareAndSet(copied, null)) {
throw new IllegalStateException("MtContext is released!");
}
try {
timerTask.run();
} finally {
......@@ -62,13 +69,45 @@ public final class MtContextTimerTask extends TimerTask {
* @return Wrapped {@link TimerTask}
*/
public static MtContextTimerTask get(TimerTask timerTask) {
return get(timerTask, false, false);
}
/**
* Factory method, wrapper input {@link Runnable} to {@link MtContextTimerTask}.
* <p/>
* This method is idempotent.
*
* @param timerTask input {@link TimerTask}
* @param releaseMtContextAfterRun release MtContext after run, avoid memory leak even if {@link MtContextRunnable} is referred.
* @return Wrapped {@link TimerTask}
*/
public static MtContextTimerTask get(TimerTask timerTask, boolean releaseMtContextAfterRun) {
return get(timerTask, releaseMtContextAfterRun, false);
}
/**
* Factory method, wrapper input {@link Runnable} to {@link MtContextTimerTask}.
* <p/>
* This method is idempotent.
*
* @param timerTask input {@link TimerTask}
* @param releaseMtContextAfterRun release MtContext after run, avoid memory leak even if {@link MtContextRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bug! <b>DO NOT</b> set, only when you why.
* @return Wrapped {@link TimerTask}
*/
public static MtContextTimerTask get(TimerTask timerTask, boolean releaseMtContextAfterRun, boolean idempotent) {
if (null == timerTask) {
return null;
}
if (timerTask instanceof MtContextTimerTask) { // avoid redundant decoration, and ensure idempotency
throw new IllegalStateException("Already MtContextTimerTask!");
if (timerTask instanceof MtContextTimerTask) {
if (idempotent) {
// avoid redundant decoration, and ensure idempotency
return (MtContextTimerTask) timerTask;
} else {
throw new IllegalStateException("Already MtContextTimerTask!");
}
}
return new MtContextTimerTask(timerTask);
return new MtContextTimerTask(timerTask, false);
}
}
......@@ -115,11 +115,11 @@ public class MtContextTransformer implements ClassFileTransformer {
for (int i = 0; i < parameterTypes.length; i++) {
CtClass paraType = parameterTypes[i];
if (RUNNABLE_CLASS_NAME.equals(paraType.getName())) {
String code = String.format("$%d = %s.get($%d);", i + 1, MT_CONTEXT_RUNNABLE_CLASS_NAME, i + 1);
String code = String.format("$%d = %s.get($%d, false, true);", i + 1, MT_CONTEXT_RUNNABLE_CLASS_NAME, i + 1);
logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
} else if (CALLABLE_CLASS_NAME.equals(paraType.getName())) {
String code = String.format("$%d = %s.get($%d);", i + 1, MT_CONTEXT_CALLABLE_CLASS_NAME, i + 1);
String code = String.format("$%d = %s.get($%d, false, true);", i + 1, MT_CONTEXT_CALLABLE_CLASS_NAME, i + 1);
logger.info("insert code before method " + method + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
}
......
......@@ -206,12 +206,12 @@ public class MtContextCallableTest {
@Test
public void test_gets() throws Exception {
Call call1 = new Call("1", null);
Call call2 = new Call("1", null);
Callable<String> call1 = new Call("1", null);
Callable<String> call2 = new Call("1", null);
Callable<String> call3 = new Call("1", null);
List<MtContextCallable<String>> callList = MtContextCallable.gets(
Arrays.<Callable<String>>asList(call1, call2, null, call3));
Arrays.asList(call1, call2, null, call3));
assertEquals(4, callList.size());
assertThat(callList.get(0), instanceOf(MtContextCallable.class));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册