Java 包装线程池或者task,统一异步线程流水
日志串流水,统一一次请求的流水号。同一个线程里比较简单,线程上下文ThreadLocal直接用。 当遇到起异步线程或者线程池的,我们怎么把业务线程的流水传递过去呢?
通常两种实现方式:
- 封装线程池,即把原始线程池包装一下,覆写execute方法。
- 线程池原生,封装提交的runnable和callbable任务。
思路都是,把主线程里的ThreadLocal拿出来,再恢复到异步线程里。(lambda表达式内的就表示异步线程里了,外部还是主线程)
TraceId 上下文工具类
使用上下文 ThreadLocal 存储流水号。
public class TraceIdContext {
// TraceID 上下文 Key
public static final String TRACE_ID_KEY = "traceId";
// 线程本地存储 TraceID
private static final ThreadLocal<String> TRACE_ID_HOLDER = new ThreadLocal<>();
/**
* 设置 TraceID 外部传入
*/
public static void set(String traceId) {
TRACE_ID_HOLDER.set(traceId);
}
/**
* 获取当前线程 TraceID
*/
public static String get() {
return TRACE_ID_HOLDER.get();
}
/**
* 清除 TraceID(防止内存泄漏)
*/
public static void clear() {
TRACE_ID_HOLDER.remove();
}
/**
* 捕获当前上下文(用于跨线程传递)
* 扩展:如需传递更多上下文(如用户ID),可在此添加
*/
public static Map<String, String> captureContext() {
Map<String, String> map = new HashMap<>();
map.put(TRACE_ID_KEY, get());
return map;
}
/**
* 恢复上下文(异步线程中使用)
*/
public static void restoreContext(Map<String, String> context) {
if (context != null && context.containsKey(TRACE_ID_KEY)) {
set(context.get(TRACE_ID_KEY));
}
}
}
自定义线程池包装器
包装线程池的 execute 方法,在提交任务时携带当前线程的 TraceID,异步执行时恢复到新线程中。
- 实现 Executor 接口
- 构造方法包装进来原始的线程池
- 重写executor 方法,上下文恢复
/**
* 携带 TraceID 的线程池包装器
*/
public class TraceIdExecutorWrapper implements Executor {
// 原始线程池
private final Executor delegate;
/**
* 通过构造方法注入进来原始线程池
* @param delegate
*/
public TraceIdExecutorWrapper(Executor delegate) {
this.delegate = delegate;
}
/**
* 覆盖execute方法,来包装原始的 runnable任务
* @param runnable the runnable task
*/
@Override
public void execute(Runnable runnable) {
// 捕获当前线程的 TraceID
String currentTraceId = TraceIdContext.get();
// 包装任务,执行时恢复 TraceID
delegate.execute(() -> {
try {
// 将主线程的 TraceID 绑定到异步线程
TraceIdContext.set(currentTraceId);
runnable.run(); // 执行原始任务
} finally {
// 执行完清除,避免 ThreadLocal 内存泄漏
TraceIdContext.clear();
}
});
}
}
Spring线程池ThreadPoolTaskExecutor的优雅装饰
自定义 Spring 任务装饰器 实现 TaskDecorator 接口,封装 TraceID 捕获和恢复逻辑,这是 Spring 官方推荐的线程上下文传递方式。
/**
* Spring 线程池任务装饰器
*/
public class TraceIdTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 1. 捕获当前线程的 TraceID 上下文(提交任务时的主线程)
Map<String, String> context = TraceIdContext.captureContext();
// 2. 包装任务:异步执行时恢复上下文,执行后清除
return () -> {
try {
TraceIdContext.restoreContext(context); // 恢复 TraceID 到异步线程
runnable.run(); // 执行原始任务
} finally {
TraceIdContext.clear(); // 强制清除,避免 ThreadLocal 泄漏
}
};
}
}
@Configuration
public class ExecutorConfig {
/**
* ThreadPoolTaskExecutor 异步线程池
*/
@Bean("asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("async-task-");
// 装饰器
executor.setTaskDecorator(new TraceIdTaskDecorator());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
}
封装task
上边的实现,主要用来包装线程池。
我们也可以使用原生线程池,但包装提交的callable或者runnable任务,把业务线程上下文写到异步线程里。
public class ContextThreadUtils {
public static Runnable wrap(Runnable runnable) {
// 当前业务线程上下文信息
Map<String, String> context = TraceIdContext.captureContext();
// 准备封装原始的Runnable 任务
return () -> {
// 异步线程原有的上下文(可能为空)
Map<String, String> asyncContext = TraceIdContext.captureContext();
try {
// 将主线程的上下文设置到当前异步线程
if (MapUtils.isNotEmpty(context)) {
TraceIdContext.restoreContext(context);
}
// 执行原始任务
runnable.run();
} finally {
// 恢复原始的异步线程里的上下文
if (MapUtils.isNotEmpty(asyncContext)) {
TraceIdContext.restoreContext(asyncContext);
}
}
};
}
public static <T> Callable<T> wrap(Callable<T> callable) {
// 当前业务线程上下文信息
Map<String, String> context = TraceIdContext.captureContext();
return () -> {
Map<String, String> asyncContext = TraceIdContext.captureContext();
try {
// 将主线程的上下文设置到当前异步线程
if (MapUtils.isNotEmpty(context)) {
TraceIdContext.restoreContext(context);
}
return callable.call();
} finally {
// 恢复原始的异步线程里的上下文
if (MapUtils.isNotEmpty(asyncContext)) {
TraceIdContext.restoreContext(asyncContext);
}
}
};
}
}