ParallelStream导致的ThreadLocal丢失问题


并行流parallelStream

parallelStream提供了流的并行处理,它是Stream的另一重要特性,其底层使用Fork/Join框架实现。简单理解就是多线程异步任务的一种实现。

 

parallelStream配合ThreadLocal问题复现

/**
 * @author: peng
 * @date: 2021/12/7
 */
public class ParallelStreamTest {
    static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        threadLocal.set("主线程的标识-张三");
        List<Integer> list1 = new ArrayList<>();
        for (int i = 0; i < 15; i++) {
            list1.add(i);
        }

        System.out.println(list1);
        System.out.println("主线程执行前ThreadLocal:" + threadLocal.get());
        System.out.println("---------------------------------------------");
        List<String> result = list1.stream().parallel().map(ParallelStreamTest.proxy((l1 ->
                //任务运行依赖threadLocal
                threadLocal.get() + l1
        ))).collect(Collectors.toList());
        System.out.println("---------------------------------------------");
        System.out.println(result);
        System.out.println("主线程执行后ThreadLocal:" + threadLocal.get());


    }

    public static <T, R> Function<T, R> proxy(Function<T, R> function) {
        //主线程
        Thread mainThread = Thread.currentThread();
        String mainLocal = threadLocal.get();
        System.out.println("主线程:" + mainThread.getName());
        System.out.println("主线程的ThreadLocal" + mainLocal);
        System.out.println("---------------------------------------------");
        return t -> {
            //主线程的 threadLocal -> 执行线程的 threadLocal
            threadLocal.set(mainLocal);
            //执行线程
            //System.out.println("执行线程:" + Thread.currentThread().getName() + "--执行线程的ThreadLocal:" + threadLocal.get());
            try {
                //执行任务xxx(需要用到threadLocal)
                return function.apply(t);
            } finally {
                //清除 执行线程的 threadLocal
                threadLocal.remove();
            }
        };
    }

}

在执行任务时 执行线程先获取主线程的threadLocal复制到自己的threadLocal中 任务执行完成后 清除执行线程的thradLocal

 

执行结果如下:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
主线程执行前ThreadLocal:主线程的标识-张三
---------------------------------------------
主线程:main
主线程的ThreadLocal主线程的标识-张三
---------------------------------------------
---------------------------------------------
[主线程的标识-张三0, 主线程的标识-张三1, 主线程的标识-张三2, 主线程的标识-张三3, 主线程的标识-张三4, 主线程的标识-张三5, 主线程的标识-张三6, 主线程的标识-张三7, 主线程的标识-张三8, 主线程的标识-张三9, 主线程的标识-张三10, 主线程的标识-张三11, 主线程的标识-张三12, 主线程的标识-张三13, 主线程的标识-张三14]
主线程执行后ThreadLocal:null

Process finished with exit code 0
在并行流操作执行完成后 主线程的threadLocal 被清除了!
加上线程信息输出再看下结果:
            //执行线程
            System.out.println("执行线程:" + Thread.currentThread().getName() + "--执行线程的ThreadLocal:" + threadLocal.get());
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
主线程执行前ThreadLocal:主线程的标识-张三
---------------------------------------------
主线程:main
主线程的ThreadLocal主线程的标识-张三
---------------------------------------------
执行线程:main--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-4--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-2--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-3--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-5--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-1--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-5--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-7--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-3--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-2--执行线程的ThreadLocal:主线程的标识-张三
执行线程:main--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-6--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-4--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-5--执行线程的ThreadLocal:主线程的标识-张三
执行线程:ForkJoinPool.commonPool-worker-1--执行线程的ThreadLocal:主线程的标识-张三
---------------------------------------------
[主线程的标识-张三0, 主线程的标识-张三1, 主线程的标识-张三2, 主线程的标识-张三3, 主线程的标识-张三4, 主线程的标识-张三5, 主线程的标识-张三6, 主线程的标识-张三7, 主线程的标识-张三8, 主线程的标识-张三9, 主线程的标识-张三10, 主线程的标识-张三11, 主线程的标识-张三12, 主线程的标识-张三13, 主线程的标识-张三14]
主线程执行后ThreadLocal:null

main主线程也参与到了任务执行 所以 threadLocal.remove(); 会把主线程的threadLocal清除

解决方案:threadLocal.remove() 前对线程做判断

                if (!mainThread.equals(Thread.currentThread())) {
                    //清除 执行线程的 threadLocal
                    threadLocal.remove();
                }

执行结果:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
主线程执行前ThreadLocal:主线程的标识-张三
---------------------------------------------
主线程:main
主线程的ThreadLocal主线程的标识-张三
---------------------------------------------
---------------------------------------------
[主线程的标识-张三0, 主线程的标识-张三1, 主线程的标识-张三2, 主线程的标识-张三3, 主线程的标识-张三4, 主线程的标识-张三5, 主线程的标识-张三6, 主线程的标识-张三7, 主线程的标识-张三8, 主线程的标识-张三9, 主线程的标识-张三10, 主线程的标识-张三11, 主线程的标识-张三12, 主线程的标识-张三13, 主线程的标识-张三14]
主线程执行后ThreadLocal:主线程的标识-张三

Process finished with exit code 0

 
  • 作者:低调做个路人 (扫码联系作者)
  • 发表时间:2021-12-16 18:42:24
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 评论