以下内容似乎对我有用:
import java.lang.Thread.UncaughtExceptionHandler;import java.util.Map;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.atomic.AtomicReference;import org.slf4j.MDC;public final class MdcForkJoinPool extends ForkJoinPool{ public MdcForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { super(parallelism, factory, handler, asyncMode); } @Override public void execute(ForkJoinTask<?> task) { // See http://stackoverflow.com/a/19329668/14731 super.execute(wrap(task, MDC.getCopyOfContextMap())); } @Override public void execute(Runnable task) { // See http://stackoverflow.com/a/19329668/14731 super.execute(wrap(task, MDC.getCopyOfContextMap())); } private <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task, Map<String, String> newContext) { return new ForkJoinTask<T>() { private static final long serialVersionUID = 1L; private final AtomicReference<T> override = new AtomicReference<>(); @Override public T getRawResult() { T result = override.get(); if (result != null) return result; return task.getRawResult(); } @Override protected void setRawResult(T value) { override.set(value); } @Override protected boolean exec() { // According to ForkJoinTask.fork() "it is a usage error to fork a task more than once unless it has completed // and been reinitialized". We therefore assume that this method does not have to be thread-safe. Map<String, String> oldContext = beforeExecution(newContext); try { task.invoke(); return true; } finally { afterExecution(oldContext); } } }; } private Runnable wrap(Runnable task, Map<String, String> newContext) { return () -> { Map<String, String> oldContext = beforeExecution(newContext); try { task.run(); } finally { afterExecution(oldContext); } }; } private Map<String, String> beforeExecution(Map<String, String> newValue) { Map<String, String> previous = MDC.getCopyOfContextMap(); if (newValue == null) MDC.clear(); else MDC.setContextMap(newValue); return previous; } private void afterExecution(Map<String, String> oldValue) { if (oldValue == null) MDC.clear(); else MDC.setContextMap(oldValue); }}和
import java.util.Map;import java.util.concurrent.CountedCompleter;import org.slf4j.MDC;public abstract class MdcCountedCompleter<T> extends CountedCompleter<T>{ private static final long serialVersionUID = 1L; private final Map<String, String> newContext; protected MdcCountedCompleter() { this(null); } protected MdcCountedCompleter(CountedCompleter<?> completer) { super(completer); this.newContext = MDC.getCopyOfContextMap(); } protected abstract void computeWithContext(); @Override public final void compute() { Map<String, String> oldContext = beforeExecution(newContext); try { computeWithContext(); } finally { afterExecution(oldContext); } } private Map<String, String> beforeExecution(Map<String, String> newValue) { Map<String, String> previous = MDC.getCopyOfContextMap(); if (newValue == null) MDC.clear(); else MDC.setContextMap(newValue); return previous; } private void afterExecution(Map<String, String> oldValue) { if (oldValue == null) MDC.clear(); else MDC.setContextMap(oldValue); }}- 针对您的任务
MdcForkJoinPool
而不是普通的ForkJoinPool 运行任务。 MdcCountedCompleter
代替扩展CountedCompleter
。



