全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-18 9 分钟 ✍️ juanwangdev

Spring 异步处理机制详解

异步处理是提升系统吞吐量的关键手段,Spring 提供了完善的异步支持。

启用异步

配置类

Java
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

@Async 基础用法

无返回值异步

Java
@Service
public class NotificationService {

    @Async("taskExecutor")
    public void sendEmail(String to, String content) {
        // 异步发送邮件
        mailSender.send(to, "Notification", content);
    }
}

有返回值异步

Java
@Service
public class DataService {

    @Async("taskExecutor")
    public CompletableFuture<User> getUserAsync(Long id) {
        User user = userRepository.findById(id).orElse(null);
        return CompletableFuture.completedFuture(user);
    }

    @Async("taskExecutor")
    public ListenableFuture<Order> getOrderAsync(Long id) {
        Order order = orderRepository.findById(id).orElse(null);
        return new AsyncResult<>(order);
    }
}

线程池配置详解

核心参数

参数说明建议值
corePoolSize核心线程数CPU核心数
maxPoolSize最大线程数CPU核心数 × 2
queueCapacity队列容量100-500
keepAliveSeconds空闲线程存活时间60s
threadNamePrefix线程名前缀业务名-async-

拒绝策略

策略说明
CallerRunsPolicy调用者线程执行
AbortPolicy抛出异常(默认)
DiscardPolicy直接丢弃
DiscardOldestPolicy丢弃最旧任务

多线程池配置

Java
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean("emailExecutor")
    public Executor emailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("email-");
        executor.initialize();
        return executor;
    }

    @Bean("reportExecutor")
    public Executor reportExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("report-");
        executor.initialize();
        return executor;
    }
}

异步调用实战

并行调用多个异步方法

Java
@Service
public class OrderService {

    @Autowired
    private UserService userService;
    @Autowired
    private ProductService productService;
    @Autowired
    private PaymentService paymentService;

    public OrderDetail getOrderDetail(Long orderId) throws Exception {
        // 并行获取用户、商品、支付信息
        CompletableFuture<User> userFuture = userService.getUserAsync(userId);
        CompletableFuture<List<Product>> productsFuture = productService.getProductsAsync(orderId);
        CompletableFuture<Payment> paymentFuture = paymentService.getPaymentAsync(orderId);

        // 等待所有结果
        CompletableFuture.allOf(userFuture, productsFuture, paymentFuture).join();

        return OrderDetail.builder()
            .user(userFuture.get())
            .products(productsFuture.get())
            .payment(paymentFuture.get())
            .build();
    }
}

异步回调处理

Java
@Service
public class CallbackService {

    @Async("taskExecutor")
    public CompletableFuture<String> processAsync(String data) {
        return CompletableFuture.supplyAsync(() -> {
            // 处理数据
            return processData(data);
        }).thenApply(result -> {
            // 后续转换
            return transform(result);
        }).thenAccept(finalResult -> {
            // 最终处理
            log.info("Final result: {}", finalResult);
        }).exceptionally(ex -> {
            // 异常处理
            log.error("Error: ", ex);
            return null;
        });
    }
}

异常处理

自定义异步异常处理器

Java
@Component
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.error("Async method {} threw exception", method.getName(), ex);
        // 发送告警通知
        alertService.sendAlert("Async error: " + ex.getMessage());
    }
}

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }
}

try-catch 捕获异常

Java
@Service
public class SafeAsyncService {

    @Async("taskExecutor")
    public void safeProcess(String data) {
        try {
            processData(data);
        } catch (Exception e) {
            log.error("Process failed: {}", data, e);
            // 补偿逻辑
            compensationService.compensate(data);
        }
    }
}

事务与异步

事务传播问题

Java
@Service
public class OrderService {

    @Transactional
    public void createOrder(Order order) {
        orderRepository.save(order);
        // 异步方法不会参与当前事务
        sendNotification(order);  // 不推荐
    }

    @Async
    public void sendNotification(Order order) {
        // 此处没有事务上下文
        notificationService.send(order);
    }
}

正确的事务处理

Java
@Service
public class OrderService {

    @Transactional
    public void createOrder(Order order) {
        orderRepository.save(order);
    }

    public void createOrderWithNotification(Order order) {
        createOrder(order);  // 同步事务
        notificationService.sendAsync(order);  // 异步通知
    }
}

线程池监控

监控指标

Java
@Component
@Slf4j
public class ThreadPoolMonitor {

    @Autowired
    @Qualifier("taskExecutor")
    private ThreadPoolTaskExecutor taskExecutor;

    @Scheduled(fixedRate = 60000)
    public void monitor() {
        ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor();
        log.info("ThreadPool: active={}, poolSize={}, queueSize={}, completed={}",
            executor.getActiveCount(),
            executor.getPoolSize(),
            executor.getQueue().size(),
            executor.getCompletedTaskCount());
    }
}

优雅关闭

Java
@Bean("taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(20);
    executor.setWaitForTasksToCompleteOnShutdown(true);  // 等待任务完成
    executor.setAwaitTerminationSeconds(60);              // 最多等待60秒
    executor.initialize();
    return executor;
}

要点总结

要点说明
@Async声明异步方法
线程池配置核心数、最大数、队列容量、拒绝策略
返回值CompletableFuture / ListenableFuture
异常处理AsyncUncaughtExceptionHandler
事务问题异步方法不在调用方事务中
优雅关闭waitForTasksToCompleteOnShutdown=true
监控activeCount、queueSize、completedTaskCount

📝 发现内容有误?点击此处直接编辑

← 上一篇 Spring事件机制原理与实战
下一篇 → Spring 数据库优化实践
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库