Spring事件机制原理与实战
事件驱动是解耦系统的核心模式,Spring 提供了完善的事件发布订阅机制。
事件机制核心组件
三大角色
| 角色 | 接口/类 | 作用 |
|---|---|---|
| 事件 | ApplicationEvent | 携带事件数据 |
| 发布者 | ApplicationEventPublisher | 发布事件 |
| 监听者 | ApplicationListener | 处理事件 |
核心接口
Java
// 事件接口
public abstract class ApplicationEvent extends EventObject {
private final long timestamp;
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
}
// 发布者接口
public interface ApplicationEventPublisher {
void publishEvent(ApplicationEvent event);
}
// 监听者接口
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E event);
}
自定义事件
定义事件类
Java
public class UserCreatedEvent extends ApplicationEvent {
private final Long userId;
private final String username;
public UserCreatedEvent(Object source, Long userId, String username) {
super(source);
this.userId = userId;
this.username = username;
}
public Long getUserId() { return userId; }
public String getUsername() { return username; }
}
发布事件
Java
@Service
public class UserService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void createUser(User user) {
userRepository.save(user);
// 发布事件
eventPublisher.publishEvent(new UserCreatedEvent(this, user.getId(), user.getName()));
}
}
监听事件
Java
@Component
public class UserEventListener implements ApplicationListener<UserCreatedEvent> {
@Override
public void onApplicationEvent(UserCreatedEvent event) {
System.out.println("User created: " + event.getUsername());
// 发送欢迎邮件等
sendWelcomeEmail(event.getUserId());
}
}
@EventListener 注解
注解方式监听
Java
@Component
public class UserEventListener {
@EventListener
public void handleUserCreated(UserCreatedEvent event) {
log.info("User created: {}", event.getUsername());
}
// 条件过滤
@EventListener(condition = "#event.username.startsWith('admin')")
public void handleAdminUser(UserCreatedEvent event) {
log.info("Admin user created: {}", event.getUsername());
}
// 监听多种事件
@EventListener({UserCreatedEvent.class, UserDeletedEvent.class})
public void handleUserEvent(ApplicationEvent event) {
if (event instanceof UserCreatedEvent) {
log.info("Created");
} else if (event instanceof UserDeletedEvent) {
log.info("Deleted");
}
}
}
SpEL 条件表达式
| 表达式 | 说明 |
|---|---|
| #event | 事件对象 |
| #event.userId | 事件属性 |
| #root.event | 根事件 |
| #root.args[0] | 第一个参数 |
异步事件监听
@Async + @EventListener
Java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setThreadNamePrefix("event-");
executor.initialize();
return executor;
}
}
@Component
public class AsyncEventListener {
@Async
@EventListener
public void handleAsync(UserCreatedEvent event) {
// 异步处理,不影响主流程
Thread.sleep(5000); // 模拟耗时操作
log.info("Async handled: {}", event.getUsername());
}
}
异步事件异常处理
Java
@Component
public class AsyncEventListener implements AsyncUncaughtExceptionHandler {
@Async
@EventListener
public void handleAsync(UserCreatedEvent event) {
// 可能抛出异常
throw new RuntimeException("Async error");
}
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("Async event error in {}: {}", method.getName(), ex.getMessage());
}
}
事件监听器排序
@Order 控制顺序
Java
@Component
public class FirstListener {
@EventListener
@Order(1) // 最先执行
public void handleFirst(UserCreatedEvent event) {
log.info("First listener");
}
}
@Component
public class SecondListener {
@EventListener
@Order(2) // 第二执行
public void handleSecond(UserCreatedEvent event) {
log.info("Second listener");
}
}
ApplicationEventMulticaster
源码实现
Java
public class SimpleApplicationEventMulticaster implements ApplicationEventMulticaster {
private final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
private Executor taskExecutor; // 异步执行器
@Override
public void multicastEvent(ApplicationEvent event, ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 获取匹配的监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
// 异步执行
executor.execute(() -> invokeListener(listener, event));
} else {
// 同步执行
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
} catch (ClassCastException ex) {
// 类型不匹配,忽略
}
}
}
配置异步 Multicaster
Java
@Configuration
public class EventConfig {
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
multicaster.setTaskExecutor(taskExecutor()); // 设置异步执行器
return multicaster;
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.initialize();
return executor;
}
}
内置事件
Spring 内置事件
| 事件 | 说明 | 触发时机 |
|---|---|---|
| ContextRefreshedEvent | 容器刷新完成 | refresh() 完成后 |
| ContextStartedEvent | 容器启动 | start() 调用 |
| ContextStoppedEvent | 容器停止 | stop() 调用 |
| ContextClosedEvent | 容器关闭 | close() 调用 |
| RequestHandledEvent | 请求处理完成 | Spring MVC |
监听内置事件
Java
@Component
public class ContextEventListener {
@EventListener(ContextRefreshedEvent.class)
public void onRefresh(ContextRefreshedEvent event) {
log.info("ApplicationContext refreshed");
// 初始化缓存、预热数据等
initializeCache();
}
@EventListener(ContextClosedEvent.class)
public void onClosed(ContextClosedEvent event) {
log.info("ApplicationContext closed");
// 清理资源
cleanupResources();
}
}
事务绑定事件
@TransactionalEventListener
Java
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
orderRepository.save(order);
// 发布事件,在事务提交后处理
eventPublisher.publishEvent(new OrderCreatedEvent(order));
}
}
@Component
public class OrderEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleAfterCommit(OrderCreatedEvent event) {
// 事务提交后才执行
sendOrderConfirmationEmail(event.getOrder());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleAfterRollback(OrderCreatedEvent event) {
// 事务回滚后执行
log.warn("Order creation failed: {}", event.getOrder().getId());
}
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handleBeforeCommit(OrderCreatedEvent event) {
// 事务提交前执行
validateOrder(event.getOrder());
}
}
事务阶段
| 阶段 | 说明 |
|---|---|
| BEFORE_COMMIT | 事务提交前 |
| AFTER_COMMIT | 事务提交后(默认) |
| AFTER_ROLLBACK | 事务回滚后 |
| AFTER_COMPLETION | 事务完成后(提交或回滚) |
无事务时的行为
Java
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
public void handleWithoutTransaction(OrderCreatedEvent event) {
// 如果没有事务,立即执行
log.info("Fallback execution");
}
事件传播机制
监听器匹配源码
Java
protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
// 根据事件类型和来源过滤监听器
return applicationListeners.stream()
.filter(listener -> supportsEvent(listener, eventType, sourceType))
.collect(Collectors.toList());
}
protected boolean supportsEvent(ApplicationListener<?> listener, ResolvableType eventType, Class<?> sourceType) {
// 获取监听器泛型类型
ResolvableType declaredEventType = resolveDeclaredEventType(listener);
// 检查事件类型是否匹配
return declaredEventType.isAssignableFrom(eventType);
}
实战:事件驱动架构
订单处理事件驱动示例
Java
// 事件定义
public class OrderCreatedEvent extends ApplicationEvent {
private final Order order;
public OrderCreatedEvent(Object source, Order order) {
super(source);
this.order = order;
}
}
// 发布者
@Service
public class OrderService {
@Autowired private ApplicationEventPublisher publisher;
@Transactional
public Order createOrder(OrderRequest request) {
Order order = orderRepository.save(new Order(request));
publisher.publishEvent(new OrderCreatedEvent(this, order));
return order;
}
}
// 多监听器解耦
@Component
@Order(1)
public class InventoryListener {
@TransactionalEventListener(phase = AFTER_COMMIT)
public void updateInventory(OrderCreatedEvent event) {
inventoryService.reduce(event.getOrder().getItems());
}
}
@Component
@Order(2)
public class PaymentListener {
@TransactionalEventListener(phase = AFTER_COMMIT)
public void processPayment(OrderCreatedEvent event) {
paymentService.process(event.getOrder().getPayment());
}
}
@Component
@Order(3)
class NotificationListener {
@Async
@TransactionalEventListener(phase = AFTER_COMMIT)
public void sendNotification(OrderCreatedEvent event) {
notificationService.sendOrderConfirmation(event.getOrder());
}
}
要点总结
| 要点 | 说明 |
|---|---|
| 核心组件 | ApplicationEvent + Publisher + Listener |
| 注解监听 | @EventListener 比 ApplicationListener 更简洁 |
| 异步事件 | @Async + @EventListener 或配置 Multicaster |
| 监听顺序 | @Order 控制执行顺序 |
| 事务绑定 | @TransactionalEventListener 指定事务阶段 |
| 内置事件 | ContextRefreshedEvent/StartedEvent/ClosedEvent |
| 条件过滤 | @EventListener(condition = "#event.xxx") |
📝 发现内容有误?点击此处直接编辑