Spring Boot 异步任务

在 Spring 3.0 开始提供了对异步任务的支持,我们可以使用 @Async 注解来创建一个异步任务,这些被 @Async 注解修饰的方法会在独立的线程中执行。调用者无需等待异步任务执行完毕,可以继续执行其他任务。

@Async 的原理是基于 AOP,当一个被 @Async 注解修饰的方法被调用时,Spring 会为该方法创建一个代理对象,该代理对象会将方法的调用转发给一个 TaskExecutorTaskExecutor 会在后台线程中执行该方法。

SpringBoot 开启异步任务

首先,我们需要编写自己的线程池,避免 Spring 自身不断创建线程池导致线程泄漏。Spring调用异步方法的默认的线程池 SimpleAsyncTaskExecutor 却并不是真正意义上的线程池,它会为每一个任务都创建一个线程,这样当我们一次性有很多的任务来时,就会创建大量的线程,可能造成OOM.

自定义线程池的方法有两种:

  • 通过 application.properties 配置文件配置线程池

# 线程池配置
spring.task.execution.pool.core-size=5  # 核心线程数,初始化准备就绪的线程数
spring.task.execution.pool.max-size=10  # 最大线程数,只有在队列满了的情况下才会创建超过核心线程数的线程
spring.task.execution.pool.queue-capacity=1000  # 线程所使用的缓冲队列
spring.task.execution.pool.keep-alive=60  # 空闲线程存活时间
  • 通过配置类配置线程池

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    @Bean("MyPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        executor.setCorePoolSize(8);
        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        //executor.setAllowCoreThreadTimeOut(true);
        //阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(124);
        //最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任
        //任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);
        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);
        //spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
        //jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
        executor.setThreadNamePrefix("自定义线程池");
        // rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }
}

其中 ThreadPoolTaskExecutor 是 Spring 提供的线程池实现类,我们可以通过配置它的属性来配置线程池。

上述代码中,我们设置了核心线程数为 5,最大线程数为 10,队列容量为 1000,线程存活时间为 60 秒,线程名称前缀为 async-service-,拒绝策略为 CallerRunsPolicy。当 200 个任务同时到达时,首先会占用 5 个核心线程,然后会将剩余的任务放入队列中,当队列满了之后,根据最大线程数创建新的线程,当线程数达到最大线程数时,会执行拒绝策略。

使用 @Async 注解开启异步任务,代码如下:

@Service
public class AsyncService {
    @Async
    public void asyncMethod() {
        System.out.println("任务开始");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务结束");
    }
}

@Async 注解修饰的方法的返回值必须是 void 或者 Future,如果返回值是 Future,则可以通过 Future.get() 方法获取返回值。

@Service
public class AsyncService {
    @Async("asyncServiceExecutor")  // 指定线程池,不指定则使用默认线程池
    public Future<String> asyncMethod() {
        System.out.println("任务开始");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务结束");
        return new AsyncResult<>("任务结束");
    }
}
  • 异步方法使用 static 修饰。

  • 异步方法不能与同步方法在同一个类中,@Async 需要在不同类使用才会产生异步效果,方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的,如果需要从类的内部调用,需要先获取其代理类。

  • 在异步方法上标注 @Transactional 是没用的。 在 Async 方法调用的方法上标注@Transactional有效。(例如: 方法 A,使用了@Async/@Transactional 来标注,但是无法产生事务控制的目的。方法 B,使用了@Async 来标注, B 中调用了 C、D,C/D 分别使用@Transactional 做了标注,则可实现事务控制的目的。)

  • 异步类没有被 Spring 管理。因为@Transactional@Async注解的实现都是基于 Spring 的 AOP,而 AOP 的实现是基于动态代理模式实现的。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过 Spring 容器管理。

最后更新于

这有帮助吗?