Spring 异步处理机制详解
异步处理是提升系统吞吐量的关键手段,Spring 提供了完善的异步支持。
启用异步
配置类
Java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
@Async 基础用法
无返回值异步
Java
@Service
public class NotificationService {
@Async("taskExecutor")
public void sendEmail(String to, String content) {
// 异步发送邮件
mailSender.send(to, "Notification", content);
}
}
有返回值异步
Java
@Service
public class DataService {
@Async("taskExecutor")
public CompletableFuture<User> getUserAsync(Long id) {
User user = userRepository.findById(id).orElse(null);
return CompletableFuture.completedFuture(user);
}
@Async("taskExecutor")
public ListenableFuture<Order> getOrderAsync(Long id) {
Order order = orderRepository.findById(id).orElse(null);
return new AsyncResult<>(order);
}
}
线程池配置详解
核心参数
| 参数 | 说明 | 建议值 |
|---|---|---|
| corePoolSize | 核心线程数 | CPU核心数 |
| maxPoolSize | 最大线程数 | CPU核心数 × 2 |
| queueCapacity | 队列容量 | 100-500 |
| keepAliveSeconds | 空闲线程存活时间 | 60s |
| threadNamePrefix | 线程名前缀 | 业务名-async- |
拒绝策略
| 策略 | 说明 |
|---|---|
| CallerRunsPolicy | 调用者线程执行 |
| AbortPolicy | 抛出异常(默认) |
| DiscardPolicy | 直接丢弃 |
| DiscardOldestPolicy | 丢弃最旧任务 |
多线程池配置
Java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("emailExecutor")
public Executor emailExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("email-");
executor.initialize();
return executor;
}
@Bean("reportExecutor")
public Executor reportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("report-");
executor.initialize();
return executor;
}
}
异步调用实战
并行调用多个异步方法
Java
@Service
public class OrderService {
@Autowired
private UserService userService;
@Autowired
private ProductService productService;
@Autowired
private PaymentService paymentService;
public OrderDetail getOrderDetail(Long orderId) throws Exception {
// 并行获取用户、商品、支付信息
CompletableFuture<User> userFuture = userService.getUserAsync(userId);
CompletableFuture<List<Product>> productsFuture = productService.getProductsAsync(orderId);
CompletableFuture<Payment> paymentFuture = paymentService.getPaymentAsync(orderId);
// 等待所有结果
CompletableFuture.allOf(userFuture, productsFuture, paymentFuture).join();
return OrderDetail.builder()
.user(userFuture.get())
.products(productsFuture.get())
.payment(paymentFuture.get())
.build();
}
}
异步回调处理
Java
@Service
public class CallbackService {
@Async("taskExecutor")
public CompletableFuture<String> processAsync(String data) {
return CompletableFuture.supplyAsync(() -> {
// 处理数据
return processData(data);
}).thenApply(result -> {
// 后续转换
return transform(result);
}).thenAccept(finalResult -> {
// 最终处理
log.info("Final result: {}", finalResult);
}).exceptionally(ex -> {
// 异常处理
log.error("Error: ", ex);
return null;
});
}
}
异常处理
自定义异步异常处理器
Java
@Component
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("Async method {} threw exception", method.getName(), ex);
// 发送告警通知
alertService.sendAlert("Async error: " + ex.getMessage());
}
}
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
}
try-catch 捕获异常
Java
@Service
public class SafeAsyncService {
@Async("taskExecutor")
public void safeProcess(String data) {
try {
processData(data);
} catch (Exception e) {
log.error("Process failed: {}", data, e);
// 补偿逻辑
compensationService.compensate(data);
}
}
}
事务与异步
事务传播问题
Java
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
orderRepository.save(order);
// 异步方法不会参与当前事务
sendNotification(order); // 不推荐
}
@Async
public void sendNotification(Order order) {
// 此处没有事务上下文
notificationService.send(order);
}
}
正确的事务处理
Java
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
orderRepository.save(order);
}
public void createOrderWithNotification(Order order) {
createOrder(order); // 同步事务
notificationService.sendAsync(order); // 异步通知
}
}
线程池监控
监控指标
Java
@Component
@Slf4j
public class ThreadPoolMonitor {
@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Scheduled(fixedRate = 60000)
public void monitor() {
ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor();
log.info("ThreadPool: active={}, poolSize={}, queueSize={}, completed={}",
executor.getActiveCount(),
executor.getPoolSize(),
executor.getQueue().size(),
executor.getCompletedTaskCount());
}
}
优雅关闭
Java
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务完成
executor.setAwaitTerminationSeconds(60); // 最多等待60秒
executor.initialize();
return executor;
}
要点总结
| 要点 | 说明 |
|---|---|
| @Async | 声明异步方法 |
| 线程池配置 | 核心数、最大数、队列容量、拒绝策略 |
| 返回值 | CompletableFuture / ListenableFuture |
| 异常处理 | AsyncUncaughtExceptionHandler |
| 事务问题 | 异步方法不在调用方事务中 |
| 优雅关闭 | waitForTasksToCompleteOnShutdown=true |
| 监控 | activeCount、queueSize、completedTaskCount |
📝 发现内容有误?点击此处直接编辑