异步调用 对应的是 同步调用,同步调用 指程序按照 定义顺序 依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;异步调用 指程序在顺序执行时,不等待 异步调用的语句 返回结果 就执行后面的程序。
利用 Spring Initializer
创建一个 gradle
项目 spring-boot-async-task
,创建时添加相关依赖。得到的初始 build.gradle
如下:
buildscript {
ext {
springBootVersion = '2.0.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'io.ostenant.springboot.sample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-web')
compileOnly('org.projectlombok:lombok')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
复制代码
在 Spring Boot
入口类上配置 @EnableAsync
注解开启异步处理。
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
复制代码
创建任务抽象类 AbstractTask
,并分别配置三个任务方法 doTaskOne()
,doTaskTwo()
,doTaskThree()
。
public abstract class AbstractTask {
private static Random random = new Random();
public void doTaskOne() throws Exception {
out.println("开始做任务一");
long start = currentTimeMillis();
sleep(random.nextInt(10000));
long end = currentTimeMillis();
out.println("完成任务一,耗时:" + (end - start) + "毫秒");
}
public void doTaskTwo() throws Exception {
out.println("开始做任务二");
long start = currentTimeMillis();
sleep(random.nextInt(10000));
long end = currentTimeMillis();
out.println("完成任务二,耗时:" + (end - start) + "毫秒");
}
public void doTaskThree() throws Exception {
out.println("开始做任务三");
long start = currentTimeMillis();
sleep(random.nextInt(10000));
long end = currentTimeMillis();
out.println("完成任务三,耗时:" + (end - start) + "毫秒");
}
}
复制代码
下面通过一个简单示例来直观的理解什么是同步调用:
Task
类,继承 AbstractTask
,三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10
秒内)。@Component
public class Task extends AbstractTask {
}
复制代码
Task
对象,并在测试用例中执行 doTaskOne()
,doTaskTwo()
,doTaskThree()
三个方法。@RunWith(SpringRunner.class)
@SpringBootTest
public class TaskTest {
@Autowired
private Task task;
@Test
public void testSyncTasks() throws Exception {
task.doTaskOne();
task.doTaskTwo();
task.doTaskThree();
}
}
复制代码
开始做任务一
完成任务一,耗时:4059毫秒
开始做任务二
完成任务二,耗时:6316毫秒
开始做任务三
完成任务三,耗时:1973毫秒
复制代码
任务一、任务二、任务三顺序的执行完了,换言之 doTaskOne()
,doTaskTwo()
,doTaskThree()
三个方法顺序的执行完成。
上述的 同步调用 虽然顺利的执行完了三个任务,但是可以看到 执行时间比较长,若这三个任务本身之间 不存在依赖关系,可以 并发执行 的话,同步调用在 执行效率 方面就比较差,可以考虑通过 异步调用 的方式来 并发执行。
AsyncTask
类,分别在方法上配置 @Async
注解,将原来的 同步方法 变为 异步方法。@Component
public class AsyncTask extends AbstractTask {
@Async
public void doTaskOne() throws Exception {
super.doTaskOne();
}
@Async
public void doTaskTwo() throws Exception {
super.doTaskTwo();
}
@Async
public void doTaskThree() throws Exception {
super.doTaskThree();
}
}
复制代码
AsyncTask
对象,并在测试用例中执行 doTaskOne()
,doTaskTwo()
,doTaskThree()
三个方法。@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncTaskTest {
@Autowired
private AsyncTask task;
@Test
public void testAsyncTasks() throws Exception {
task.doTaskOne();
task.doTaskTwo();
task.doTaskThree();
}
}
复制代码
开始做任务三
开始做任务一
开始做任务二
复制代码
如果反复执行单元测试,可能会遇到各种不同的结果,比如:
原因是目前 doTaskOne()
,doTaskTwo()
,doTaskThree()
这三个方法已经 异步执行 了。主程序在 异步调用 之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就 自动结束 了,导致了 不完整 或是 没有输出任务 相关内容的情况。
注意:@Async所修饰的函数不要定义为static类型,这样异步调用不会生效。
为了让 doTaskOne()
,doTaskTwo()
,doTaskThree()
能正常结束,假设我们需要统计一下三个任务 并发执行 共耗时多少,这就需要等到上述三个函数都完成动用之后记录时间,并计算结果。
那么我们如何判断上述三个 异步调用 是否已经执行完成呢?我们需要使用 Future<T>
来返回 异步调用 的 结果。
AsyncCallBackTask
类,声明 doTaskOneCallback()
,doTaskTwoCallback()
,doTaskThreeCallback()
三个方法,对原有的三个方法进行包装。@Component
public class AsyncCallBackTask extends AbstractTask {
@Async
public Future<String> doTaskOneCallback() throws Exception {
super.doTaskOne();
return new AsyncResult<>("任务一完成");
}
@Async
public Future<String> doTaskTwoCallback() throws Exception {
super.doTaskTwo();
return new AsyncResult<>("任务二完成");
}
@Async
public Future<String> doTaskThreeCallback() throws Exception {
super.doTaskThree();
return new AsyncResult<>("任务三完成");
}
}
复制代码
AsyncCallBackTask
对象,并在测试用例中执行 doTaskOneCallback()
,doTaskTwoCallback()
,doTaskThreeCallback()
三个方法。循环调用 Future
的 isDone()
方法等待三个 并发任务 执行完成,记录最终执行时间。@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncCallBackTaskTest {
@Autowired
private AsyncCallBackTask task;
@Test
public void testAsyncCallbackTask() throws Exception {
long start = currentTimeMillis();
Future<String> task1 = task.doTaskOneCallback();
Future<String> task2 = task.doTaskTwoCallback();
Future<String> task3 = task.doTaskThreeCallback();
// 三个任务都调用完成,退出循环等待
while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
sleep(1000);
}
long end = currentTimeMillis();
out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}
}
复制代码
看看都做了哪些改变:
执行一下上述的单元测试,可以看到如下结果:
开始做任务一
开始做任务三
开始做任务二
完成任务二,耗时:4882毫秒
完成任务三,耗时:6484毫秒
完成任务一,耗时:8748毫秒
任务全部完成,总耗时:9043毫秒
复制代码
可以看到,通过 异步调用,让任务一、任务二、任务三 并发执行,有效的 减少 了程序的 运行总时间。
在上述操作中,创建一个 线程池配置类 TaskConfiguration
,并配置一个 任务线程池对象 taskExecutor
。
@Configuration
public class TaskConfiguration {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
return executor;
}
}
复制代码
上面我们通过使用 ThreadPoolTaskExecutor
创建了一个 线程池,同时设置了以下这些参数:
线程池属性
属性的作用
设置初始值
核心线程数
线程池创建时候初始化的线程数
10
最大线程数
线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程
20
缓冲队列
用来缓冲执行任务的队列
200
允许线程的空闲时间
当超过了核心线程之外的线程,在空闲时间到达之后会被销毁
60秒
线程池名的前缀
可以用于定位处理任务所在的线程池
taskExecutor-
线程池对拒绝任务的处理策略
这里采用CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
CallerRunsPolicy
AsyncExecutorTask
类,三个任务的配置和 AsyncTask
一样,不同的是 @Async
注解需要指定前面配置的 线程池的名称 taskExecutor
。@Component
public class AsyncExecutorTask extends AbstractTask {
@Async("taskExecutor")
public void doTaskOne() throws Exception {
super.doTaskOne();
out.println("任务一,当前线程:" + currentThread().getName());
}
@Async("taskExecutor")
public void doTaskTwo() throws Exception {
super.doTaskTwo();
out.println("任务二,当前线程:" + currentThread().getName());
}
@Async("taskExecutor")
public void doTaskThree() throws Exception {
super.doTaskThree();
out.println("任务三,当前线程:" + currentThread().getName());
}
}
复制代码
AsyncExecutorTask
对象,并在测试用例中执行 doTaskOne()
,doTaskTwo()
,doTaskThree()
三个方法。@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncExecutorTaskTest {
@Autowired
private AsyncExecutorTask task;
@Test
public void testAsyncExecutorTask() throws Exception {
task.doTaskOne();
task.doTaskTwo();
task.doTaskThree();
sleep(30 * 1000L);
}
}
复制代码
执行一下上述的 单元测试,可以看到如下结果:
开始做任务一
开始做任务三
开始做任务二
完成任务二,耗时:3905毫秒
任务二,当前线程:taskExecutor-2
完成任务一,耗时:6184毫秒
任务一,当前线程:taskExecutor-1
完成任务三,耗时:9737毫秒
任务三,当前线程:taskExecutor-3
复制代码
执行上面的单元测试,观察到 任务线程池 的 线程池名的前缀 被打印,说明 线程池 成功执行 异步任务!
由于在应用关闭的时候异步任务还在执行,导致类似 数据库连接池 这样的对象一并被 销毁了,当 异步任务 中对 数据库 进行操作就会出错。
解决方案如下,重新设置线程池配置对象,新增线程池 setWaitForTasksToCompleteOnShutdown()
和 setAwaitTerminationSeconds()
配置:
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize(20);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
复制代码
Bean
,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。本文介绍了在 Spring Boot
中如何使用 @Async
注解配置 异步任务、异步回调任务,包括结合 任务线程池 的使用,以及如何 正确 并 优雅 地关闭 任务线程池。
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。
Original url: Access
Created at: 2019-05-29 15:28:36
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论