提交 fc469f04 编写于 作者: zlt2000's avatar zlt2000

优化租户和MDC信息的父子线程传递方式

上级 51ca3b6a
......@@ -39,6 +39,7 @@
<txlcn.version>5.0.2.RELEASE</txlcn.version>
<fastdfs-client.version>1.26.5</fastdfs-client.version>
<userAgent.version>1.21</userAgent.version>
<transmittable.version>2.11.0</transmittable.version>
<platform-bom>Cairo-SR3</platform-bom>
<spring-cloud-alibaba-dependencies.version>2.0.0.RELEASE</spring-cloud-alibaba-dependencies.version>
<spring-boot-dependencies.version>2.0.9.RELEASE</spring-boot-dependencies.version>
......@@ -297,6 +298,11 @@
<artifactId>UserAgentUtils</artifactId>
<version>${userAgent.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>${transmittable.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
......
package com.central.common.utils;
import cn.hutool.core.util.StrUtil;
import org.slf4j.MDC;
import com.alibaba.ttl.TtlCallable;
import com.alibaba.ttl.TtlRunnable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* 这是{@link ThreadPoolTaskExecutor}的一个简单替换,可以在每个任务之前设置子线程的租户和MDC数据
* 这是{@link ThreadPoolTaskExecutor}的一个简单替换,可搭配TransmittableThreadLocal实现父子线程之间的数据传递
*
* @author zlt
* @date 2019/8/14
*/
public class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
/**
* 把父线程的租户和MDC内容赋值给子线程
* @param runnable
*/
private static final long serialVersionUID = -5887035957049288777L;
@Override
public void execute(Runnable runnable) {
String tenantId = TenantContextHolder.getTenant();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
super.execute(() -> run(runnable, tenantId, mdcContext));
Runnable ttlRunnable = TtlRunnable.get(runnable);
super.execute(ttlRunnable);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
String tenantId = TenantContextHolder.getTenant();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return super.submit(() -> call(task, tenantId, mdcContext));
Callable ttlCallable = TtlCallable.get(task);
return super.submit(ttlCallable);
}
@Override
public Future<?> submit(Runnable task) {
String tenantId = TenantContextHolder.getTenant();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return super.submit(() -> run(task, tenantId, mdcContext));
Runnable ttlRunnable = TtlRunnable.get(task);
return super.submit(ttlRunnable);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
String tenantId = TenantContextHolder.getTenant();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return super.submitListenable(() -> run(task, tenantId, mdcContext));
Runnable ttlRunnable = TtlRunnable.get(task);
return super.submitListenable(ttlRunnable);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
String tenantId = TenantContextHolder.getTenant();
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return super.submitListenable(() -> call(task, tenantId, mdcContext));
}
/**
* 子线程委托的执行方法
* @param runnable {@link Runnable}
* @param tenantId 租户id
* @param mdcContext 父线程MDC内容
*/
private void run(Runnable runnable, String tenantId, Map<String, String> mdcContext) {
// 将父线程的租户id传给子线程
if (StrUtil.isNotEmpty(tenantId)) {
TenantContextHolder.setTenant(tenantId);
}
// 将父线程的MDC内容传给子线程
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try {
// 执行异步操作
runnable.run();
} finally {
// 清空租户内容
TenantContextHolder.clear();
// 清空MDC内容
MDC.clear();
}
}
/**
* 子线程委托的执行方法
* @param task {@link Callable}
* @param tenantId 租户id
* @param mdcContext 父线程MDC内容
*/
private <T> T call(Callable<T> task, String tenantId, Map<String, String> mdcContext) throws Exception {
// 将父线程的租户id传给子线程
if (StrUtil.isNotEmpty(tenantId)) {
TenantContextHolder.setTenant(tenantId);
}
// 将父线程的MDC内容传给子线程
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try {
// 执行异步操作
return task.call();
} finally {
// 清空租户内容
TenantContextHolder.clear();
// 清空MDC内容
MDC.clear();
}
Callable ttlCallable = TtlCallable.get(task);
return super.submitListenable(ttlCallable);
}
}
package com.central.common.utils;
import com.alibaba.ttl.TransmittableThreadLocal;
/**
* 租户holder
*
......@@ -7,7 +9,10 @@ package com.central.common.utils;
* @date 2019/8/5
*/
public class TenantContextHolder {
private static final ThreadLocal<String> CONTEXT = new ThreadLocal<>();
/**
* 支持父子线程之间的数据传递
*/
private static final ThreadLocal<String> CONTEXT = new TransmittableThreadLocal<>();
public static void setTenant(String tenant) {
CONTEXT.set(tenant);
......
......@@ -26,5 +26,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
</dependency>
</dependencies>
</project>
package com.central.log.config;
import org.slf4j.TtlMDCAdapter;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
/**
* 初始化TtlMDCAdapter实例,并替换MDC中的adapter对象
*
* @author zlt
* @date 2019/8/17
*/
public class TtlMDCAdapterInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
//加载TtlMDCAdapter实例
TtlMDCAdapter.getInstance();
}
}
\ No newline at end of file
package org.slf4j;
import ch.qos.logback.classic.util.LogbackMDCAdapter;
import com.alibaba.ttl.TransmittableThreadLocal;
import org.slf4j.spi.MDCAdapter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* 重构{@link LogbackMDCAdapter}类,搭配TransmittableThreadLocal实现父子线程之间的数据传递
*
* @author zlt
* @date 2019/8/17
*/
public class TtlMDCAdapter implements MDCAdapter {
private final ThreadLocal<Map<String, String>> copyOnInheritThreadLocal = new TransmittableThreadLocal<>();
private static final int WRITE_OPERATION = 1;
private static final int MAP_COPY_OPERATION = 2;
private static TtlMDCAdapter mtcMDCAdapter;
/**
* keeps track of the last operation performed
*/
private final ThreadLocal<Integer> lastOperation = new ThreadLocal<>();
static {
mtcMDCAdapter = new TtlMDCAdapter();
MDC.mdcAdapter = mtcMDCAdapter;
}
public static MDCAdapter getInstance() {
return mtcMDCAdapter;
}
private Integer getAndSetLastOperation(int op) {
Integer lastOp = lastOperation.get();
lastOperation.set(op);
return lastOp;
}
private static boolean wasLastOpReadOrNull(Integer lastOp) {
return lastOp == null || lastOp == MAP_COPY_OPERATION;
}
private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
Map<String, String> newMap = Collections.synchronizedMap(new HashMap<>());
if (oldMap != null) {
// we don't want the parent thread modifying oldMap while we are
// iterating over it
synchronized (oldMap) {
newMap.putAll(oldMap);
}
}
copyOnInheritThreadLocal.set(newMap);
return newMap;
}
/**
* Put a context value (the <code>val</code> parameter) as identified with the
* <code>key</code> parameter into the current thread's context map. Note that
* contrary to log4j, the <code>val</code> parameter can be null.
* <p/>
* <p/>
* If the current thread does not have a context map it is created as a side
* effect of this call.
*
* @throws IllegalArgumentException in case the "key" parameter is null
*/
@Override
public void put(String key, String val) {
if (key == null) {
throw new IllegalArgumentException("key cannot be null");
}
Map<String, String> oldMap = copyOnInheritThreadLocal.get();
Integer lastOp = getAndSetLastOperation(WRITE_OPERATION);
if (wasLastOpReadOrNull(lastOp) || oldMap == null) {
Map<String, String> newMap = duplicateAndInsertNewMap(oldMap);
newMap.put(key, val);
} else {
oldMap.put(key, val);
}
}
/**
* Remove the the context identified by the <code>key</code> parameter.
* <p/>
*/
@Override
public void remove(String key) {
if (key == null) {
return;
}
Map<String, String> oldMap = copyOnInheritThreadLocal.get();
if (oldMap == null) {
return;
}
Integer lastOp = getAndSetLastOperation(WRITE_OPERATION);
if (wasLastOpReadOrNull(lastOp)) {
Map<String, String> newMap = duplicateAndInsertNewMap(oldMap);
newMap.remove(key);
} else {
oldMap.remove(key);
}
}
/**
* Clear all entries in the MDC.
*/
@Override
public void clear() {
lastOperation.set(WRITE_OPERATION);
copyOnInheritThreadLocal.remove();
}
/**
* Get the context identified by the <code>key</code> parameter.
* <p/>
*/
@Override
public String get(String key) {
final Map<String, String> map = copyOnInheritThreadLocal.get();
if ((map != null) && (key != null)) {
return map.get(key);
} else {
return null;
}
}
/**
* Get the current thread's MDC as a map. This method is intended to be used
* internally.
*/
public Map<String, String> getPropertyMap() {
lastOperation.set(MAP_COPY_OPERATION);
return copyOnInheritThreadLocal.get();
}
/**
* Returns the keys in the MDC as a {@link Set}. The returned value can be
* null.
*/
public Set<String> getKeys() {
Map<String, String> map = getPropertyMap();
if (map != null) {
return map.keySet();
} else {
return null;
}
}
/**
* Return a copy of the current thread's context map. Returned value may be
* null.
*/
@Override
public Map<String, String> getCopyOfContextMap() {
Map<String, String> hashMap = copyOnInheritThreadLocal.get();
if (hashMap == null) {
return null;
} else {
return new HashMap<>(hashMap);
}
}
@Override
public void setContextMap(Map<String, String> contextMap) {
lastOperation.set(WRITE_OPERATION);
Map<String, String> newMap = Collections.synchronizedMap(new HashMap<>());
newMap.putAll(contextMap);
// the newMap replaces the old one for serialisation's sake
copyOnInheritThreadLocal.set(newMap);
}
}
org.springframework.context.ApplicationContextInitializer=\
com.central.log.config.TtlMDCAdapterInitializer
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.central.log.config.LogAutoConfigure
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册