基于 Spring Framework v5.2.6.RELEASE
Spring 终有一种非常简便的方法使 Bean 中的一个方法变成异步执行的方法,那就是在方法上标记 @Async 注解,想要开启这一特性,需要在一个配置类上标记 @EnableAsync 注解。
本文将通过源码分析 @EnableAsync 注解是如何开启这一特性的。
@EnableAsync 注解的源码如下。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation.class; boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; }
注解的每一个属性都指定了默认值,后续的分析也会基于默认的属性值进行分析。除此之外,注解上的 @Import 元注解引入了 AsyncConfigurationSelector 类。
从它的类关系中可以看出,AsyncConfigurationSelector 实现了 ImportSelector 接口,因此,当 Spring 扫描到配置类后,会执行它的 selectImports 方法,获取一个包含配置类名称的数组,用于加载对应的配置。
AsyncConfigurationSelector 虽然也包含了selectImports
方法,但是从参数类型中可以看出它不是接口中的selectImports
方法的实现方法,要找到接口中的实现方法,我们需要去 AsyncConfigurationSelector 的父类 AdviceModeImportSelector 中。
@Override public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector"); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null) { throw new IllegalArgumentException(String.format( "@%s is not present on importing class '%s' as expected", annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); String[] imports = selectImports(adviceMode); if (imports == null) { throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode); } return imports; }
这个方法中,主要是从 @EnableAsync 注解获取各项属性的值,然后使用adviceMode
属性,调用另一个selectImports
方法获取最终的结果。
此处被调用的selectImports
方法,就是 AsyncConfigurationSelector 中的 selectImports
方法。
@Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
在 @EnableAsync 注解中,mode的默认值是AdviceMode.PROXY
,因此,这里引入的配置类是 ProxyAsyncConfiguration。
接下来分析 ProxyAsyncConfiguration 类。
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
在 ProxyAsyncConfiguration 中,只有一个 Bean 配置,类型是 AsyncAnnotationBeanPostProcessor,由此可以知道,@EnableAsync 所开启的功能,是通过 Bean 的后处理器来实现的。
上述的方法体中,通过构造方法创建了 AsyncAnnotationBeanPostProcessor 对象。
public AsyncAnnotationBeanPostProcessor() { setBeforeExistingAdvisors(true); }
构造方法中设置了一个属性值,这个属性是是beforeExistingAdvisors
,定义在父类 AbstractAdvisingBeanPostProcessor 中,这个属性的默认值是false,当它的值为true时,会将新的增强逻辑添加到增强逻辑列表的开头而不是最后。
也就是说,@EnableAsync 提供的异步执行特性,是基于 AOP 特性来实现的。
接着往下看,在创建了 AsyncAnnotationBeanPostProcessor 对象之后,为其配置了一些属性,有一些属性的值是从 @EnableAsync 属性值获取的,还有两个属性值需要留意,就是this.executor
和this.exceptionHandler
,这两个成员变量的值是从哪儿来的呢?
我们可以找到 ProxyAsyncConfiguration 的父类 AbstractAsyncConfiguration,其中有一个标记了 @Autowired 注解的方法。
// org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; }
如果我们自己配置了线程池和异常处理器,则会在这里执行配置,这样,我们配置的线程池和异常处理器就会被添加到 AsyncAnnotationBeanPostProcessor 中。
接下来,我们再分析 AsyncAnnotationBeanPostProcessor 后处理器是如何工作的。
从它的类继承关系中可以看出,它是一个基于 AOP 特性来为 Bean 中的方法提供异步执行功能的 Bean 后处理器。
AsyncAnnotationBeanPostProcessor 同时实现了 BeanFactoryAware 接口,在它的setBeanFactory
方法中,完成了 Advisor 的创建。
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
这里创建的 Advisor 类型是 AsyncAnnotationAdvisor,创建完之后,它被复制给了advisor
成员变量,这个成员变量定义在 AsyncAnnotationBeanPostProcessor 的父类 AbstractBeanFactoryAwareAdvisingPostProcessor 中。
这个advisor
成员变量就是处理增强逻辑的对象。
关于 Spring 是如何在后处理器中为 Bean 创建代理对象以及如何向代理对象中加入增强逻辑的,我之前的文章有很详细的分析,可以阅读之前关于 AOP 原理的分析文章来了解。下面我们直接分析 AsyncAnnotationAdvisor,它是完成方法异步执行的核心。
一个 Advisor 通常有两个非常重要的部分,一个是 Pointcut,用于匹配需要增强的方法,另一个是 Advice 也就是具体的增强逻辑。对于 AsyncAnnotationAdvisor 来说,这两个部分都是在它的构造方法中构建的。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); }
其中可以看到两行关键的代码,他们分别完成了advice
和pointcut
成员变量的构建。
this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes);
下面分别来看这两部分。
先看buildAdvice
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildAdvice protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
Advice 的构建比较简单,这里可以看到,最终构建的 Advice 是一个 AnnotationAsyncExecutionInterceptor 类型的拦截器,除了调用构造方法创建之外,还配置了executor
和exceptionHandler
,这个拦截器应该就是完成 AOP 增强逻辑的拦截器,我们放到后文中分析。
下面再看buildPointcut
方法。
// org.springframework.scheduling.annotation.AsyncAnnotationAdvisor#buildPointcut protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
这个方法的逻辑比较简单,首先创建了两个 Pointcut 对象,cpc
用于匹配类型,mpc
用于匹配方法,他们的逻辑都很简单,就是看类或者方法的定义是否包含 @Async 注解。
最后再将两者合并为一个 ComposablePointcut 对象返回,ComposablePointcut 的作用就是将多个 Pointcut 对象合并成一个。
了解完上面的内容,接下来就开始分析 AnnotationAsyncExecutionInterceptor 拦截器。它是一个包含 AOP 增强逻辑的拦截器,也是完成方法异步调用的核心逻辑。
AnnotationAsyncExecutionInterceptor 要完成它的任务,有两个比较核心的功能,一个是目标方法的匹配,另一个就是拦截器的逻辑。目标方法的匹配逻辑,我们在上文中已经介绍过了,以下主要分析其拦截器逻辑,也就是它的invoke
方法。
以上是 AnnotationAsyncExecutionInterceptor 的类关系图,它实现了 MethodInterceptor 接口,invoke
方法的实现在父类 AsyncExecutionInterceptor 中。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke @Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
从上面的源码中可以看到三个关键的步骤:
task
当中。task
。下面我们详细分析这三个步骤。
AsyncTaskExecutor 在determineAsyncExecutor
方法中完成。
@Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
首先会从executors
中根据方法获取对应的 AsyncTaskExecutor,executors
是一个用来缓存 Executor 的成员变量。
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
当第一次进入这个方法的时候,executors
肯定是空的,因此会进入if
语句的逻辑获取 Executor 然后再将其添加到executors
中。在if语句中,首先会通过getExecutorQualifier
方法获取一个qualifier
,我们进入方法查看获取的过程。
// org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier @Override @Nullable protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); }
这个方法会从目标方法或者其所在的类型上的 @Async 注解的value
属性,作为方法的返回值复制给qualifier
。这个qualifier
的值是一个 Executor 的 Bean 名称,也就是说,我们可以通过 @Async 的value
属性指定执行异步任务的 Executor 的 Bean 名称。
如果qualifier
不是空的,那么,就会通过findQualifiedExecutor方法从 Spring 容器中获取对应的 Executor 实例。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#findQualifiedExecutor @Nullable protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) { if (beanFactory == null) { throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() + " to access qualified executor '" + qualifier + "'"); } return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier); }
如果qualifier
是空的,那么就会通过this.defaultExecutor.get()
获取默认的 Executor,那么,默认的 Executor 是什么呢?我们需要在去 AsyncAnnotationAdvisor 的buildAdvice
方法中,回顾一下 AnnotationAsyncExecutionInterceptor 创建的过程。
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
以上是 AnnotationAsyncExecutionInterceptor 创建的语句,从这里找到对应的构造方法。
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) { super(defaultExecutor); }
构造方法需要提供一个默认的 Executor,也就是defaultExecutor
参数,这里提供了null
,不过我们可以继续查看父类的构造方法。
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) { this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new); }
在被调用的 AsyncExecutionAspectSupport 的构造方法中,通过getDefaultExecutor
方法,提供了默认的 Executor。
// org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor @Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
这里看到,默认的 Executor 是一个 SimpleAsyncTaskExecutor,也就是说,如果我们没有在项目中配置线程池,则默认使用 SimpleAsyncTaskExecutor 来执行异步任务。
得到 Executor 之后,就是任务的封装,这一步很简单,就是将目标方法的调用放到一个 Callable 类型的任务的call
方法中。
最后一步就是任务的提交,通过doSubmit
方法完成。
// org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit @Nullable protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
其实就是调用了 Executor 的submit
异步执行了任务。
不过这里有一点要说明,虽然在我们没有配置 Excutor 的情况下 ,Spring 会使用默认的 SimpleAsyncTaskExecutor 来执行异步任务,但是 SimpleAsyncTaskExecutor 会为每一个任务创建一个新的线程,而不是使用线程池来完成,很容易导致内存溢出,因此,在实践中最好为异步任务配置合适的线程池。
本文以 @EnableAsync 作为切入点,分析了 Spring 开启基于注解的异步任务特性的原理,更多关于Spring EnableAsync注解的资料请关注其它相关文章!