2025-09-26
微服务与分布式
0

目录

什么是最大努力通知型事务
与可靠消息一致性的区别
最大努力通知的核心机制
工作流程
适用场景
实现方案
方案一:基于数据库的本地消息表
数据库表设计
核心代码实现
方案二:基于消息队列的实现
优缺点分析
总结

最大努力通知型分布式事务是一种柔性事务解决方案,它适用于那些不需要强一致性,而是最终一致性的场景。这种模式的核心思想是:系统尽最大努力将通知发送给接收方,如果通知失败,会按照一定的策略进行重试,直到成功或达到重试上限。

什么是最大努力通知型事务

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低的业务,其核心思想是:系统会 尽最大努力将业务处理结果通知给相关方,但不保证通知一定成功,也不保证实时性。

与传统的ACID事务或2PC等强一致性方案不同,最大努力通知更注重最终一致性,通过异步重试机制来保证数据的最终一致性。

与可靠消息一致性的区别

特性可靠消息一致性最大努力通知
事务关注点交易过程的事务一致性交易之后的通知事务
消息可靠性由发起方保证由接收方主动查询确认
业务场景交易必须完成交易结果通知可容忍延迟
一致性要求

最大努力通知的核心机制

最大努力通知包含两个关键机制:

  • 消息重复通知机制:当接收方未确认收到消息时,发起方会通过一定间隔重复发送消息
  • 消息校对机制:当尽最大努力仍无法通知到接收方时,接收方可以主动查询发起方确认业务结果

工作流程

  1. 业务活动主动方(发起方)完成业务处理
  2. 发起方将业务结果通知发送给接收方
  3. 如果接收方未确认收到(未回复ACK),发起方按一定间隔重试
  4. 如果重试多次仍无法通知,接收方通过消息校对机制查询发起方确认结果

提示

  • 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失
  • 定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询(主动方提供查询接口),恢复丢失的业务消息

适用场景

  • 支付结果通知:支付成功后通知业务系统

  • 订单状态同步:主订单创建后同步到其他系统

  • 数据增量同步:数据库变更同步到搜索索引

  • 事件驱动架构:微服务间的事件通知

实现方案

最大努力通知主要有两种实现方案,适用于不同场景:

方案一:基于数据库的本地消息表

数据库表设计

sql
-- 通知消息表 CREATE TABLE notify_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一ID', business_type VARCHAR(50) NOT NULL COMMENT '业务类型', business_id VARCHAR(64) NOT NULL COMMENT '业务ID', notify_url VARCHAR(500) NOT NULL COMMENT '通知地址', notify_data TEXT NOT NULL COMMENT '通知数据', notify_status TINYINT NOT NULL DEFAULT 0 COMMENT '0:待通知,1:通知中,2:通知成功,3:通知失败', retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数', max_retry_count INT NOT NULL DEFAULT 10 COMMENT '最大重试次数', next_retry_time DATETIME COMMENT '下次重试时间', last_notify_time DATETIME COMMENT '最后通知时间', success_time DATETIME COMMENT '成功时间', create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_retrytime (notify_status, next_retry_time), INDEX idx_business (business_type, business_id) ); -- 通知记录表(用于幂等性校验) CREATE TABLE notify_record ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, receive_status TINYINT NOT NULL COMMENT '接收状态', receive_time DATETIME NOT NULL, response_data TEXT COMMENT '响应数据', create_time DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_message_id (message_id) );

核心代码实现

  1. 业务服务(消息生产者)
java
@Service @Transactional @Slf4j public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private NotifyMessageMapper notifyMessageMapper; /** * 创建订单并记录通知消息 */ public String createOrder(OrderCreateRequest request) { // 1. 创建订单 Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(request.getUserId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.CREATED); orderMapper.insert(order); // 2. 记录通知消息 NotifyMessage message = new NotifyMessage(); message.setMessageId(UUID.randomUUID().toString()); message.setBusinessType("ORDER_CREATE"); message.setBusinessId(order.getOrderNo()); message.setNotifyUrl("http://inventory-service/api/order/notify"); Map<String, Object> notifyData = new HashMap<>(); notifyData.put("orderNo", order.getOrderNo()); notifyData.put("userId", order.getUserId()); notifyData.put("amount", order.getAmount()); notifyData.put("products", request.getProducts()); message.setNotifyData(JSON.toJSONString(notifyData)); message.setNotifyStatus(NotifyStatus.PENDING); message.setMaxRetryCount(8); // 最大重试8次 message.setNextRetryTime(new Date()); notifyMessageMapper.insert(message); // 3. 事务提交后异步触发第一次通知 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { // 异步发送通知 notifyMessageService.triggerNotify(message.getMessageId()); } } ); return order.getOrderNo(); } }
  1. 消息扫描服务
java
@Component @Slf4j public class NotifyMessageScanner { @Autowired private NotifyMessageMapper notifyMessageMapper; @Autowired private NotifyMessageService notifyMessageService; /** * 扫描待通知的消息 */ @Scheduled(fixedDelay = 30000) // 每30秒扫描一次 public void scanPendingMessages() { try { List<NotifyMessage> pendingMessages = notifyMessageMapper .selectByStatusAndTime(NotifyStatus.PENDING, new Date()); log.info("扫描到待通知消息数量: {}", pendingMessages.size()); for (NotifyMessage message : pendingMessages) { // 更新为通知中状态,防止重复处理 if (notifyMessageMapper.updateStatus(message.getId(), NotifyStatus.PENDING, NotifyStatus.NOTIFYING) > 0) { // 异步处理通知 notifyMessageService.processNotify(message); } } } catch (Exception e) { log.error("扫描通知消息异常", e); } } /** * 扫描通知中的消息(处理中断的情况) */ @Scheduled(fixedDelay = 60000) // 每60秒扫描一次 public void scanNotifyingMessages() { try { // 查找通知中但超过5分钟未更新的消息(可能处理中断) Date timeoutTime = new Date(System.currentTimeMillis() - 5 * 60 * 1000); List<NotifyMessage> timeoutMessages = notifyMessageMapper .selectTimeoutMessages(NotifyStatus.NOTIFYING, timeoutTime); for (NotifyMessage message : timeoutMessages) { log.warn("发现超时通知消息,重置状态: messageId={}", message.getMessageId()); notifyMessageMapper.updateStatus(message.getId(), NotifyStatus.NOTIFYING, NotifyStatus.PENDING); } } catch (Exception e) { log.error("扫描超时消息异常", e); } } }
  1. 通知服务
java
@Service @Slf4j public class NotifyMessageService { @Autowired private NotifyMessageMapper notifyMessageMapper; @Autowired private RestTemplate restTemplate; @Autowired private NotifyRecordService notifyRecordService; /** * 处理消息通知 */ @Async("notifyExecutor") public void processNotify(NotifyMessage message) { log.info("开始处理通知: messageId={}, businessId={}", message.getMessageId(), message.getBusinessId()); try { // 检查是否已经成功通知过(幂等性校验) if (notifyRecordService.isNotified(message.getMessageId())) { log.info("消息已通知成功,跳过: messageId={}", message.getMessageId()); notifyMessageMapper.updateStatus(message.getId(), NotifyStatus.NOTIFYING, NotifyStatus.SUCCESS); return; } // 执行通知 boolean success = doNotify(message); if (success) { // 通知成功 notifyMessageMapper.updateSuccess(message.getId(), new Date()); log.info("通知成功: messageId={}", message.getMessageId()); } else { // 通知失败,计算下次重试时间 handleNotifyFailure(message); } } catch (Exception e) { log.error("处理通知异常: messageId={}", message.getMessageId(), e); handleNotifyFailure(message); } } /** * 执行具体的通知操作 */ private boolean doNotify(NotifyMessage message) { try { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.set("X-Message-Id", message.getMessageId()); headers.set("X-Business-Type", message.getBusinessType()); HttpEntity<String> entity = new HttpEntity<>(message.getNotifyData(), headers); ResponseEntity<String> response = restTemplate.exchange( message.getNotifyUrl(), HttpMethod.POST, entity, String.class ); // 判断响应是否成功(2xx状态码) if (response.getStatusCode().is2xxSuccessful()) { // 记录通知成功 notifyRecordService.recordSuccess( message.getMessageId(), response.getBody() ); return true; } else { log.warn("通知响应失败: status={}, messageId={}", response.getStatusCode(), message.getMessageId()); return false; } } catch (ResourceAccessException e) { log.warn("网络连接失败: messageId={}, url={}", message.getMessageId(), message.getNotifyUrl()); return false; } catch (Exception e) { log.error("通知请求异常: messageId={}", message.getMessageId(), e); return false; } } /** * 处理通知失败 */ private void handleNotifyFailure(NotifyMessage message) { int newRetryCount = message.getRetryCount() + 1; if (newRetryCount >= message.getMaxRetryCount()) { // 达到最大重试次数,标记为失败 notifyMessageMapper.updateStatus(message.getId(), NotifyStatus.NOTIFYING, NotifyStatus.FAILED); log.error("通知达到最大重试次数: messageId={}, retryCount={}", message.getMessageId(), newRetryCount); // 触发告警,需要人工干预 triggerAlert(message); } else { // 计算下次重试时间(指数退避策略) Date nextRetryTime = calculateNextRetryTime(newRetryCount); notifyMessageMapper.updateForRetry( message.getId(), newRetryCount, nextRetryTime, NotifyStatus.PENDING ); log.info("通知失败,等待重试: messageId={}, retryCount={}, nextRetry={}", message.getMessageId(), newRetryCount, nextRetryTime); } } /** * 计算下次重试时间(指数退避) */ private Date calculateNextRetryTime(int retryCount) { long delay = Math.min(1000 * (long) Math.pow(2, retryCount), 3600000); // 最大1小时 return new Date(System.currentTimeMillis() + delay); } /** * 触发告警 */ private void triggerAlert(NotifyMessage message) { // 发送邮件、短信、钉钉通知等 log.error("需要人工干预的通知消息: messageId={}, businessType={}, businessId={}", message.getMessageId(), message.getBusinessType(), message.getBusinessId()); } }
  1. 幂等性服务
java
@Service @Slf4j public class NotifyRecordService { @Autowired private NotifyRecordMapper notifyRecordMapper; /** * 检查消息是否已经通知成功 */ public boolean isNotified(String messageId) { return notifyRecordMapper.existsByMessageIdAndStatus(messageId, ReceiveStatus.SUCCESS); } /** * 记录通知成功 */ @Transactional public void recordSuccess(String messageId, String responseData) { // 使用insert ignore防止重复插入 notifyRecordMapper.insertIgnore( messageId, ReceiveStatus.SUCCESS, new Date(), responseData ); } /** * 记录通知失败 */ public void recordFailure(String messageId, String errorMsg) { notifyRecordMapper.insertIgnore( messageId, ReceiveStatus.FAILED, new Date(), errorMsg ); } }

方案二:基于消息队列的实现

  1. 消息生产者
java
@Service @Slf4j public class OrderMessageProducer { @Autowired private RocketMQTemplate rocketMQTemplate; @Transactional public String createOrderWithMessage(OrderCreateRequest request) { // 1. 创建订单 Order order = createOrder(request); // 2. 发送事务消息 String transactionId = UUID.randomUUID().toString(); OrderMessage message = new OrderMessage(); message.setMessageId(transactionId); message.setOrderNo(order.getOrderNo()); message.setEventType("ORDER_CREATED"); message.setTimestamp(new Date()); Message<OrderMessage> mqMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, transactionId) .setHeader("businessType", "ORDER") .build(); // 发送延迟消息,第一次延迟5秒(给业务处理留出时间) rocketMQTemplate.syncSend("ORDER_NOTIFY_TOPIC", mqMessage, 3000, 5); return order.getOrderNo(); } }
  1. 消息消费者
java
@Component @Slf4j @RocketMQMessageListener( topic = "ORDER_NOTIFY_TOPIC", consumerGroup = "order-notify-consumer-group", delayLevelWhenNextConsume = 0 // 使用RocketMQ的重试机制 ) public class OrderNotifyConsumer implements RocketMQListener<MessageExt> { @Autowired private NotifyService notifyService; @Autowired private NotifyRecordService notifyRecordService; @Override public void onMessage(MessageExt message) { String messageId = message.getMsgId(); String keys = message.getKeys(); try { // 幂等性检查 if (notifyRecordService.isNotified(keys)) { log.info("消息已处理,跳过: messageId={}, keys={}", messageId, keys); return; } String body = new String(message.getBody(), StandardCharsets.UTF_8); OrderMessage orderMessage = JSON.parseObject(body, OrderMessage.class); // 执行通知 boolean success = notifyService.notifyOrderCreated(orderMessage); if (success) { // 记录成功 notifyRecordService.recordSuccess(keys, "RocketMQ通知成功"); log.info("订单通知成功: orderNo={}", orderMessage.getOrderNo()); } else { // 通知失败,抛出异常让RocketMQ重试 throw new RuntimeException("通知处理失败"); } } catch (Exception e) { log.error("订单通知消费失败: messageId={}, keys={}", messageId, keys, e); // 记录失败 notifyRecordService.recordFailure(keys, e.getMessage()); // 根据重试次数决定是否继续重试 int reconsumeTimes = message.getReconsumeTimes(); if (reconsumeTimes >= 8) { // 最大重试8次 log.error("达到最大重试次数,进入死信队列: keys={}, retryCount={}", keys, reconsumeTimes); // RocketMQ会自动将消息投递到死信队列 } else { throw new RuntimeException("需要重试", e); } } } }
  1. 死信队列处理
java
@Component @Slf4j @RocketMQMessageListener( topic = "%DLQ%order-notify-consumer-group", consumerGroup = "order-notify-dlq-consumer" ) public class OrderNotifyDLQConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { String keys = message.getKeys(); log.error("收到死信队列消息,需要人工干预: keys={}", keys); // 发送告警通知 sendAlert(keys, new String(message.getBody())); // 可以在这里实现一些自动修复逻辑,或者等待人工处理 } private void sendAlert(String keys, String messageBody) { // 发送邮件、钉钉、短信告警 log.error("死信消息告警 - keys: {}, body: {}", keys, messageBody); } }

优缺点分析

优点

  • 实现简单:无需复杂事务管理,只需基本的MQ和重试机制
  • 低侵入性:对现有业务代码修改较少
  • 灵活性高:适用于各种场景,特别是外部系统交互
  • 资源占用低:不需要长时间锁定资源

缺点

  • 一致性保证低:无法保证消息一定被接收方收到
  • 需要额外校对机制:接收方需要实现查询接口
  • 重试可能导致重复处理:需要确保业务逻辑幂等

总结

最大努力通知型分布式事务是一种在实际业务中非常实用的柔性事务解决方案。它通过 异步处理 + 重试机制 + 最终一致性 的设计理念,在保证系统性能的同时,提供了足够的数据可靠性保障。

核心要点总结:

  • 业务优先:先完成本地业务处理,再异步通知

  • 消息持久化:确保通知消息不丢失

  • 智能重试:实现合理的重试策略和退避机制

  • 幂等设计:接收方必须能够处理重复通知

  • 监控告警:及时发现和处理问题

  • 人工干预:为异常情况提供处理手段

在实际项目中,可以根据具体业务需求和技术栈选择合适的实现方案。无论选择哪种方案,都要牢记最大努力通知的核心思想:尽最大努力交付,但不保证100%成功,这种务实的设计理念正是其在分布式系统中广受欢迎的原因。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!