相关推荐recommended
Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法
作者:mmseoamin日期:2023-12-25

1. 背景

当前因为工作需求,要发送大量Http请求,经过实践遍历发送需要6小时才能发送完毕,如果单线程发送请求会导致主线程阻塞。就会存在以下问题:

  1. 前端用户等待响应时间过长,无法进行下一步操作,不利于用户操作系统
  2. 响应时间过长超过Tomcat服务器会话时间,导致前后端的请求重新连接,这会出现抛出java.io.IOException: 你的主机中的软件中止了一个已建立的连接;重而终止了还未完成的Http发送任务
  3. 如果主线程其他任务如:定时Excel数据批量导入,文件上传等等;很容易因为文件格式问题,导致抛出异常,从而把Http的任务中断
  4. 夜长梦多,长时间发送请求,无法判断是否执行完毕;如果抛出异常,或是需要重启服务,无法判断执行到什么阶段,定位重发位置需要耗费很多时间精力,得不偿失,只能全部重发(一样面临以上问题)。

2. 解决思路

经过考虑可以使用多线程来解决问题,主线程负责开子线程处理请求。其中根据业务需要可以将多线程分为以下2类

  • 任务类型1. 开完子线程就返回响应用户的请求。
  • 任务类型2. 开完子线程后等待子线程响应结果后,再响应用户请求

    Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法,在这里插入图片描述,第1张

    Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法,在这里插入图片描述,第2张

    3. 具体实现

    Springboot中已经有集成的ThreadPoolTaskExecutor线程池可以供调用(注意区分Java自带的ThreadPoolExecutor),虽然2种线程池的提供的具体方法不一定一样,但是调度线程过程原理是通用,下面贴出ThreadPoolExecutor的调度线程过程作为创建ThreadPoolTaskExecutor的参考。

    Springboot自定义ThreadPoolTaskExecutor线程池多线程并发执行异步方法,在这里插入图片描述,第3张

    4. 开始编写代码

    @EnableAsync 注解启用Spring 异步方法执行功能

    当前的线程池 :

    • 核心线程池(CorePool) : 10 ;
    • 最大线程池(MaxPool) : 20 ;
    • 队列长度(QueueCapacity) : 200;
    • 拒绝策略(RejectedExecutionHandler) : CallerRunsPolicy

      表示:

      • 如果 请求创建的线程数 <= 10 + 200, 只会使用核心线程池的10个线程执行任务;
      • 如果 10 + 200 < 请求创建的线程数 , 创建额外的10个线线程处理任务, 线程池中同时拥有20个线程处理任务

        使用CallerRunsPolicy策略,主线程还没将 (请求创建的线程数 - 10 + 200 ) 的子线程创建完毕, 主线程处于阻塞状态,需要将所有子线程创建完毕才可以返回响应,即使任务是任务类型1,它的响应时间也会无限接近于 任务类型2;根据业务需求可以通过设置更长的队列长度来解决

        线程处理完任务后,(最大线程池 - 核心线程池) > 0 && 经过60s空闲时间,就会回收除核心线程池外的空闲线程

        @EnableAsync //注解启用Spring 异步方法执行功能
        @Configuration
        public class AsyncTaskConfig {
            @Bean("AsyncTaskThreadPool")
            public Executor threadPoolExecutor() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                // 核心线程数:线程池创建时候初始化的线程数
                executor.setCorePoolSize(10);
                // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
                executor.setMaxPoolSize(20);
                // 缓冲队列:用来缓冲执行任务的队列
                executor.setQueueCapacity(200);
                // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
                executor.setKeepAliveSeconds(60);
                // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
                executor.setThreadNamePrefix("AsyncTaskThreadPool-");
                // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
                //AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常
                //DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态
                //DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
                //CallerRunsPolicy:不丢弃任务 由调用线程处理该任务
                executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                // 线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
                executor.setAllowCoreThreadTimeOut(true);
                executor.initialize();
                return executor;
            }
        }
        

        @Async 根据添加到自定义线程池中执行

        @Component
        public class AsyncRequestComponent {
            @Async("AsyncTaskThreadPool")
            public CompletableFuture mockHttpRequest(String url) throws InterruptedException, IOException {
                String threadName = Thread.currentThread().getName();
                System.out.println("线程" + threadName + "开始调用,输出:" + url);
                String result = "";
                // HttpClient RestTemplate Feign or 其他Http请求框架
                // 或者其他需要 异步并发 的 逻辑代码 
                // Thread.sleep(50);
                result = HttpClientUtil.get(url);
                System.out.println("线程" + threadName + "调用结束,输出:" + url);
                return CompletableFuture.completedFuture(result);
            }
        }
        

        @Slf4j
        @SpringBootTest
        class DemoTestApplicationTests {
            @Autowired
            private AsyncRequestComponent asyncRequestComponent;
             @Test
            void testAsyncThreadPerformance() throws InterruptedException, ExecutionException {
                long start = System.currentTimeMillis();
                List> list = new ArrayList<>();
                // 通过修改循环次数来查看 任务数<缓存队列+核心线程 和 任务数>缓存队列+核心线程 的主线程花费时间
                // 如果 任务数<缓存队列+核心线程 主线程不需要等待
                // 如果 任务数>缓存队列+核心线程 主线程需要等待 因为后面任务还没创建成功
                for(int i=0 ;i < 2000; i++){
                    CompletableFuture future= asyncRequestComponent.mockHttpRequest(i);
                    list.add(future);
                }
                // 任务类型1 主线程不需要等待所有子线程执行完毕,将下面的for循环注释
                // 任务类型2 主线程需要等待所有子线程执行完毕:需要遍历future执行get方法等待子线程执行完毕
                for(Future future : list){
                    while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                        if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
                            String result = (String) future.get();//获取结果
                            System.out.println("任务i=" + result + "获取完成!" + new Date());
                            break;//当前future获取结果完毕,跳出while
                        } else {
                            Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                        }
                    }
                }
                long end = System.currentTimeMillis();
                System.out.println("主线程花费时间:"+(end-start));
            }
        }
        

        5. 结论

        • 如果希望主线程永远不会阻塞,不设置队列长度,因为默认长度设置为Integer.MAX_VALUE; 因为队列长度很长,不会额外再创建线程处理了,所以核心线程(CorePool) = 最大线程(MaxPool)
        • 如果本身业务用户可以容忍一定范围的响应延迟,这就需要根据实际出发,统计子线程任务的平均执行实际,判断核心线程数、最大线程数、缓冲队列的容量



          题外话:如何创建线程池

          CPU密集型:对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 8 核的 CPU,每个核一个线程,理论上创建 8 个线程就就可以;理论公式:最大线程数 = CPU核数 + 1

          IO 密集型任务: IO 密集型任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。理论公式 : 最大线程数 = 最大线程数 = CPU核心数 / (1 - IO_TIME/REQUEST_TIME) ,比如在某个请求中,请求时长为10秒,调用IO时间为8秒,这时我们阻塞占百分比就是80%,有效利用CPU占比就是20%,假设是八核CPU,我们线程数就是8 / (1 - 80%) = 8 / 0.2 = 40个

          最大线程数:

          • CPU密集型:CPU 核心数 + 1
          • IO密集型:CPU 核心数 / (1 - IO_TIME/REQUEST_TIME)

            核心线程数: 核心线程数 = 最大线程数 * 20%

            获取CPU核心数命令:

            window - Java : Runtime.getRuntime().availableProcessors()//获取逻辑核心数,如6核心12线程,那么返回的是12

            linux - 物理CPU: cat /proc/cpuinfo| grep “physical id”| sort| uniq| wc -l

            linux - 物理CPU中core: cat /proc/cpuinfo| grep “cpu cores”| uniq

            linux - 逻辑CPU: cat /proc/cpuinfo| grep “processor”| wc -l

            偷懒大法:

            • 核心线程数:CPU核心数
            • 最大线程数:CPU逻辑CPU( CPU核心数 * 2)

              参考资料:

              https://blog.csdn.net/weixin_49258262/article/details/125463819?spm=1001.2014.3001.5506

              https://blog.csdn.net/Wei_Naijia/article/details/127028398?spm=1001.2014.3001.5506

              https://blog.csdn.net/wozyb1995/article/details/125044992?spm=1001.2014.3001.5506

              https://blog.csdn.net/iampatrick_star/article/details/124490586?spm=1001.2014.3001.5506

              https://www.cnblogs.com/651434092qq/p/14240406.html

              https://juejin.cn/post/6948034657321484318

              https://juejin.cn/post/7072281409053786120