在整理老的业务逻辑代码时候发现好多接口实现上面都标记了 @Async注解。我本身对这个注解使用的比较少,异步逻辑我都习惯自定义ThreadPoolExecutor工具类。正好借着这次梳理代码结构,来看看 @Async这个注解到底在玩什么?
本文将会给大家从 @Async注解使用层面入手逐步解读源码,分析各种踩坑实践,并且扩展sleuth链路追踪与线程变量如何花式应用。
Spring中,被 @Async注解标注的方法,称之为异步方法。这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作,是spring默认提供的异步调用方式。
使用 @Async进行异步变成的方式特别简单。
仔细品味一下上面三个限制条件,任意一个不满足,均会导致 @Async无法生效。
启动类定义
@SpringBootApplication @EnableAsync public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } } 复制代码
controller层方法定义
@RestController @RequestMapping public class TestController { @Autowired TestService testService; @GetMapping() public void test(){ for (int i = 0; i <5 ; i++) { testService.testAsync(); } } } 复制代码
service方法定义
@Service @Slf4j public class TestServiceImpl implements TestService { @Override @Async public void testAsync(){ log.info("嘻嘻"); } } 复制代码
日志输出
- 2021-09-15 19:39:54.300,[http-nio-8088-exec-5], com.examp.controller.TestController - 嘿嘿 - 2021-09-15 19:36:54.302,[task-5], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 19:36:54.302,[task-4], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 19:36:54.302,[task-1], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 19:36:54.302,[task-2], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 19:36:54.302,[task-3], com.examp.service.impl.TestServiceImpl - 嘻嘻 复制代码
从日志打印可以发现,controller方法打印与service层方法打印日志使用的是不同的线程。
使用是真滴简单!
前面两点可以看到我们使用@Async进行异步变成是真的简单,但是里面也埋伏了各种各样的坑点。
先抛出问题,大家可以先思考:
废话不多说,看看源码。
/** * 该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。 * 入参随意,但返回值只能是void或者Future.(ListenableFuture接口/CompletableFuture类) * Future是代理返回的切实的异步返回,用以追踪异步方法的返回值。当然也可以使用AsyncResult类(实现 * ListenableFuture接口)(Spring或者EJB都有)或者CompletableFuture类 */ @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Async { //用以限定执行方法的执行器名称(自定义):Executor或者TaskExecutor //加在类上表示整个类都使用,加在方法上会覆盖类上的设置 String value() default ""; } 复制代码
/** * 开启spring异步执行器,类似xml中的task标签配置,需要联合@Configuration注解一起使用,对应文章开头,注解需* 要标注在启动类或者能被启动类扫描到的配置类上。 * * 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使* 用SimpleAsyncTaskExecutor执行器 * * 可实现AsyncConfigurer接口复写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获* 取异步未捕获异常处理器 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { // 该属性用来支持用户自定义异步注解,默认扫描spring的@Async和EJB3.1的@code @javax.ejb.Asynchronous Class extends Annotation> annotation() default Annotation.class; //标明是否需要创建CGLIB子类代理,AdviceMode=PROXY时才适用。注意设置为true时,其它spring管理的bean也会升级到CGLIB子类代理 boolean proxyTargetClass() default false; //标明异步通知将会如何实现,默认PROXY,如需支持同一个类中非异步方法调用另一个异步方法,需要设置为ASPECTJ AdviceMode mode() default AdviceMode.PROXY; //标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级(Integer.MAX_VALUE,值越小优先级越高) int order() default Ordered.LOWEST_PRECEDENCE; } 复制代码
@EnableAsync注解是非常明显的一个启动注解,几乎所有spring的开启配置类的注解都是以@Enable开头的。方法里面一看也是熟悉的老朋友 @Import
//查询器:基于@EanableAsync中定义的模式AdviceMode加在@Configuration标记的类上,确定抽象异步配置类的实现 public class AsyncConfigurationSelector extends AdviceModeImportSelector{ private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; //根据当前注解标注的方法坐在类的代理方式决定代理模式 @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY://jdk接口代理 return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ://cglib代理 return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } } 复制代码
类还是比较简单的,就是决定使用什么代理模式
以jdk接口代理为例,瞅一瞅看看
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { //判断注解元数据信息是否拿到 Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); //新建一个异步注解bean后处理器 AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); //配置执行器与异常处理器 bpp.configure(this.executor, this.exceptionHandler); Class extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } //设置是否升级到CGLIB子类代理,默认不开启 bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); //设置执行优先级,默认最后执行 bpp.setOrder(this.enableAsync.getNumber("order")); return bpp; } } 复制代码
配置类操作也比较清晰,就是集成AbstractAsyncConfiguration抽象类,新建AsyncAnnotationBeanPostProcessor后置处理器并进行参数初始化。
@Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { //enableAsync注解属性 @Nullable protected AnnotationAttributes enableAsync; //线程执行器 @Nullable protected Supplierexecutor; //异常执行器 @Nullable protected Supplier exceptionHandler; //设置注解元数据信息 @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } //根据配置设置异步任务执行器和异常处理器 @Autowired(required = false) void setConfigurers(Collection configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } } 复制代码
由依赖图可知,这是一个bean的后置处理器
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor { //省略部分代码 //初始化异步处理切面 @Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); //初始化切面 AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; } } 复制代码
芜湖,发现了,这是一个异步注解的切面,定义了@Async解析的解析的切面,具体处理在AbstractAdvisingBeanPostProcessor.postProcessAfterInitialization方法
Interceptor接口的invoke方法,
断点打在接口方法处,逐步下去,找到AsyncExecutionInterceptor类
@Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } //定义任务 Callable
再看最后的核心方法doSubmit
@Nullable protected Object doSubmit(Callable
再次执行demo的方法,打断点在上面的代码行
发现任务被提交给了一个叫做applicationTaskExecutor的任务执行器
线程池参数:
核心线程数:8
队列大小与最大线程池都是Interger.Max。
初始化解析@Async注解的切面类
解析@Async,切点织入执行
大家在2.8结尾能发现,默认情况下,@Async对应的线程池配置最大线程数与最大队列数为2147483647。
也就是说,如果被标记的@Async的方法,某个事件并发量突然变高,系统的负载会瞬间变高,夸张一点直接down机。
那@Async这个注解就不能使用了吗?
答案肯定是否定的。
在@EnableAsync注解的类注释上已经说了,允许自定义线程池的bean来替换掉系统默认线程池。
搜索一下applicationTaskExecutor,找到TaskExecutionAutoConfiguration
@ConditionalOnClass(ThreadPoolTaskExecutor.class) @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(TaskExecutionProperties.class) public class TaskExecutionAutoConfiguration { /** * Bean name of the application {@link TaskExecutor}. */ public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvidertaskExecutorCustomizers, ObjectProvider taskDecorator) { TaskExecutionProperties.Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator); builder = builder.taskDecorator(taskDecorator.getIfUnique()); return builder; } //系统中不存在Executor的bean时,默认加载名为:applicationTaskExecutor的bean。这里的bean即为实际切面执行时,从beanfactory里面获取的beanName叫做taskExecutor的线程池进行执行异步任务 @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } } 复制代码
因此,如果你想要使用@Async,请务必增加自定义线程池返回的bean
例如
@Configuration public class ThreadPoolTaskConfig { /** 核心线程数(默认线程数) */ private static final int CORE_POOL_SIZE = 5; /** 最大线程数 */ private static final int MAX_POOL_SIZE = 10; /** 允许线程空闲时间(单位:默认为秒) */ private static final int KEEP_ALIVE_TIME = 10; /** 缓冲队列大小 */ private static final int QUEUE_CAPACITY = 200; /** 线程池名前缀 */ private static final String THREAD_NAME_PREFIX = "Async-Service-"; @Bean public ThreadPoolTaskExecutor taskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); executor.setQueueCapacity(KEEP_ALIVE_TIME); executor.setKeepAliveSeconds(QUEUE_CAPACITY); executor.setThreadNamePrefix(THREAD_NAME_PREFIX); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } } 复制代码
这里还要注意的是,当你的spring版本是2.1之前的版本时,这里是没有applicationTaskExecutor这个默认配置的线程池,而是SimpleAsyncTaskExecutor,他的submit方法就是在不断的新建线程去执行任务,也是及其消耗资源的,因此这里也是不推荐直接使用的。
从分析可知,@Async标注的方法所在的类是被代理的,并且是使用线程池线程执行的。
例如现在调用链式:
UserService.test1()-> UserService.test2()-> UserService.test3()与UserRoleService.test1()
UserService.test()2方法上标注了 @Async
@Service @Slf4j public class UserServiceImpl implements UserService { @Autowired UserRoleService userRoleService; @Override @Transactional(rollbackFor = Exception.class) public void test3(){ //省略save逻辑 save(); } /** * 事务一致 */ @Override @Async @Transactional(rollbackFor = Exception.class) public void test2(){ test3(); userRoleService.test1(); } /** * 事务不一致 */ @Override @Transactional(rollbackFor = Exception.class) public void test1(){ //省略save逻辑 save(); test2(); } } @Service public class UserRoleServiceImpl implements UserRoleService { @Override @Transactional(rollbackFor = Exception.class) public void test1(){ //省略save逻辑 save(); throw new RuntimeException("1"); } } 复制代码
哈哈哈,这里如果你保证代码不报错,你爱怎么控制事务就怎么控制事务,反正一定会落库成功,也就不会回滚,有没有事务也无所谓。当然这个是废话,代码不报错,事务不控制对于生产来说那是不可能的。
其实就是事务管理跨了线程,那么不同线程的事务就不能保证一致了。
先来看一下测试代码
@Service @Slf4j public class UserServiceImpl implements UserService { @Autowired UserService userService; @Override public void testAsync(){ userService.testAsync1(); } @Override @Async() public void testAsync1(){ log.info("嘻嘻1"); } } 复制代码
我们知道spring默认帮我们解决了循环依赖
循环依赖:A类中注入属性B类,B类属性中注入A类;A类注入自身 都被称为循环依赖。默认情况下spring使用的三级缓存给我们解决了这个情况。关于三级缓存感兴趣的可以百度看看网上的解析,后续我也会抽时间出一篇解析的文章。
我们来启动一下,发现启动失败,报错如下
Caused by: org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'userServiceImpl': Bean with name 'userServiceImpl' has been injected into other beans [userServiceImpl] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example. 复制代码
意思就是发生了循环依赖,陷入了死循环,启动失败了!
诶,上面不是说spring给我们解决了循环依赖吗?为什么加了@Async注解后启动失败了?
考虑到部分读者不太了解三级缓存,这里直接给出文字版结论,感兴趣的可以去结合上面的源码进行debug分析。
结论:
将UserService放入二级缓存时会校验代理,但是生成代理是判断对应的后置处理器类型是否为:SmartInstantiationAwareBeanPostProcessor,上文2.6可以看到,@Async解析的后置处理器AsyncAnnotationBeanPostProcessor仅仅是BeanPostProcessor类型,因此早期构造方法暴露出来的是原始对象。因此UserService关联到自己本身是原始对象。但是后面到AsyncAnnotationBeanPostProcessor后置处理器处理时将UserService包装成了一个代理对象。因此这时在最后的校验逻辑里面发现类UserService内部属性依赖的是userService实际bean,与代理对象不相等。进入到下层自检程序,最终报错。
这里连启动都无法启动,答案自然是不能调用成功。
因此,异步调用需要成功,则必须限制调用方法需要是两个不同bean的方法,即为@Async标注的方法必须能够被切面类代理到。
本质意义上@Async还是使用的是线程池,因此直接使用ThreadLocal,是无法将线程变量从主线程同步至线程池内的线程,需要使用阿里开源线程变量:TransmittableThreadLocal。
Threadlocal知识可以参考:一文吃透ThreadLocal的前世与今生
从2.8的invoke方法得知,方法直接返回值无法获取,将得到null,可以通过Future类进行返回值获取。
可以
pom引入
复制代码 org.springframework.cloud spring-cloud-starter-sleuth2.2.5.RELEASE
再次执行
- 2021-09-15 23:13:13.502, INFO, [7f6570e5ad87cebe,7f6570e5ad87cebe,], [http-nio-8088-exec-2], com.examp.controller.TestController - 嘿嘿 - 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,6c4ef07be0f53434,7f6570e5ad87cebe], [Async-Service-2], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,8ff5d5ce00c93add,7f6570e5ad87cebe], [Async-Service-1], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,99b553b90241b79e,7f6570e5ad87cebe], [Async-Service-3], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,42c54ecd36df454c,7f6570e5ad87cebe], [Async-Service-4], com.examp.service.impl.TestServiceImpl - 嘻嘻 - 2021-09-15 23:13:13.517, INFO, [7f6570e5ad87cebe,176b3679aa72038e,7f6570e5ad87cebe], [Async-Service-5], com.examp.service.impl.TestServiceImpl - 嘻嘻 复制代码
依旧回到2.8方法断点处查看
类型变成了LazyTraceThreadPoolTaskExecutor,是ThreadPoolTaskExecutor的子类
查看LazyTraceThreadPoolTaskExecutor的调用链得知,在ExecutorBeanPostProcessor后置处理器中代理了系统内ThreadPoolTaskExecutor类型的bean。也就是说,本质意义上执行任务的还是你自定义的ThreadPoolTaskExecutor,只是ExecutorBeanPostProcessor做了链路追踪的增强。
本文从@Async的简单使用入手,进行了@Async源码的分析,最后对@Async使用过程中可能出现的问题做了总结与解析。当然@Async虽香,还是不要一股脑把整个系统的异步任务都放在一个线程池中。不同业务的大量的异步任务尽量分离线程池处理。