最大努力通知型分布式事务是一种柔性事务解决方案,它适用于那些不需要强一致性,而是最终一致性的场景。这种模式的核心思想是:系统尽最大努力将通知发送给接收方,如果通知失败,会按照一定的策略进行重试,直到成功或达到重试上限。
最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低的业务,其核心思想是:系统会 尽最大努力将业务处理结果通知给相关方,但不保证通知一定成功,也不保证实时性。
与传统的ACID事务或2PC等强一致性方案不同,最大努力通知更注重最终一致性,通过异步重试机制来保证数据的最终一致性。
| 特性 | 可靠消息一致性 | 最大努力通知 |
|---|---|---|
| 事务关注点 | 交易过程的事务一致性 | 交易之后的通知事务 |
| 消息可靠性 | 由发起方保证 | 由接收方主动查询确认 |
| 业务场景 | 交易必须完成 | 交易结果通知可容忍延迟 |
| 一致性要求 | 高 | 低 |
最大努力通知包含两个关键机制:
提示
支付结果通知:支付成功后通知业务系统
订单状态同步:主订单创建后同步到其他系统
数据增量同步:数据库变更同步到搜索索引
事件驱动架构:微服务间的事件通知
最大努力通知主要有两种实现方案,适用于不同场景:
sql-- 通知消息表
CREATE TABLE notify_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一ID',
business_type VARCHAR(50) NOT NULL COMMENT '业务类型',
business_id VARCHAR(64) NOT NULL COMMENT '业务ID',
notify_url VARCHAR(500) NOT NULL COMMENT '通知地址',
notify_data TEXT NOT NULL COMMENT '通知数据',
notify_status TINYINT NOT NULL DEFAULT 0 COMMENT '0:待通知,1:通知中,2:通知成功,3:通知失败',
retry_count INT NOT NULL DEFAULT 0 COMMENT '重试次数',
max_retry_count INT NOT NULL DEFAULT 10 COMMENT '最大重试次数',
next_retry_time DATETIME COMMENT '下次重试时间',
last_notify_time DATETIME COMMENT '最后通知时间',
success_time DATETIME COMMENT '成功时间',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_retrytime (notify_status, next_retry_time),
INDEX idx_business (business_type, business_id)
);
-- 通知记录表(用于幂等性校验)
CREATE TABLE notify_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE,
receive_status TINYINT NOT NULL COMMENT '接收状态',
receive_time DATETIME NOT NULL,
response_data TEXT COMMENT '响应数据',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_message_id (message_id)
);
java@Service
@Transactional
@Slf4j
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private NotifyMessageMapper notifyMessageMapper;
/**
* 创建订单并记录通知消息
*/
public String createOrder(OrderCreateRequest request) {
// 1. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
// 2. 记录通知消息
NotifyMessage message = new NotifyMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setBusinessType("ORDER_CREATE");
message.setBusinessId(order.getOrderNo());
message.setNotifyUrl("http://inventory-service/api/order/notify");
Map<String, Object> notifyData = new HashMap<>();
notifyData.put("orderNo", order.getOrderNo());
notifyData.put("userId", order.getUserId());
notifyData.put("amount", order.getAmount());
notifyData.put("products", request.getProducts());
message.setNotifyData(JSON.toJSONString(notifyData));
message.setNotifyStatus(NotifyStatus.PENDING);
message.setMaxRetryCount(8); // 最大重试8次
message.setNextRetryTime(new Date());
notifyMessageMapper.insert(message);
// 3. 事务提交后异步触发第一次通知
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 异步发送通知
notifyMessageService.triggerNotify(message.getMessageId());
}
}
);
return order.getOrderNo();
}
}
java@Component
@Slf4j
public class NotifyMessageScanner {
@Autowired
private NotifyMessageMapper notifyMessageMapper;
@Autowired
private NotifyMessageService notifyMessageService;
/**
* 扫描待通知的消息
*/
@Scheduled(fixedDelay = 30000) // 每30秒扫描一次
public void scanPendingMessages() {
try {
List<NotifyMessage> pendingMessages = notifyMessageMapper
.selectByStatusAndTime(NotifyStatus.PENDING, new Date());
log.info("扫描到待通知消息数量: {}", pendingMessages.size());
for (NotifyMessage message : pendingMessages) {
// 更新为通知中状态,防止重复处理
if (notifyMessageMapper.updateStatus(message.getId(),
NotifyStatus.PENDING, NotifyStatus.NOTIFYING) > 0) {
// 异步处理通知
notifyMessageService.processNotify(message);
}
}
} catch (Exception e) {
log.error("扫描通知消息异常", e);
}
}
/**
* 扫描通知中的消息(处理中断的情况)
*/
@Scheduled(fixedDelay = 60000) // 每60秒扫描一次
public void scanNotifyingMessages() {
try {
// 查找通知中但超过5分钟未更新的消息(可能处理中断)
Date timeoutTime = new Date(System.currentTimeMillis() - 5 * 60 * 1000);
List<NotifyMessage> timeoutMessages = notifyMessageMapper
.selectTimeoutMessages(NotifyStatus.NOTIFYING, timeoutTime);
for (NotifyMessage message : timeoutMessages) {
log.warn("发现超时通知消息,重置状态: messageId={}", message.getMessageId());
notifyMessageMapper.updateStatus(message.getId(),
NotifyStatus.NOTIFYING, NotifyStatus.PENDING);
}
} catch (Exception e) {
log.error("扫描超时消息异常", e);
}
}
}
java@Service
@Slf4j
public class NotifyMessageService {
@Autowired
private NotifyMessageMapper notifyMessageMapper;
@Autowired
private RestTemplate restTemplate;
@Autowired
private NotifyRecordService notifyRecordService;
/**
* 处理消息通知
*/
@Async("notifyExecutor")
public void processNotify(NotifyMessage message) {
log.info("开始处理通知: messageId={}, businessId={}",
message.getMessageId(), message.getBusinessId());
try {
// 检查是否已经成功通知过(幂等性校验)
if (notifyRecordService.isNotified(message.getMessageId())) {
log.info("消息已通知成功,跳过: messageId={}", message.getMessageId());
notifyMessageMapper.updateStatus(message.getId(),
NotifyStatus.NOTIFYING, NotifyStatus.SUCCESS);
return;
}
// 执行通知
boolean success = doNotify(message);
if (success) {
// 通知成功
notifyMessageMapper.updateSuccess(message.getId(), new Date());
log.info("通知成功: messageId={}", message.getMessageId());
} else {
// 通知失败,计算下次重试时间
handleNotifyFailure(message);
}
} catch (Exception e) {
log.error("处理通知异常: messageId={}", message.getMessageId(), e);
handleNotifyFailure(message);
}
}
/**
* 执行具体的通知操作
*/
private boolean doNotify(NotifyMessage message) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Message-Id", message.getMessageId());
headers.set("X-Business-Type", message.getBusinessType());
HttpEntity<String> entity = new HttpEntity<>(message.getNotifyData(), headers);
ResponseEntity<String> response = restTemplate.exchange(
message.getNotifyUrl(),
HttpMethod.POST,
entity,
String.class
);
// 判断响应是否成功(2xx状态码)
if (response.getStatusCode().is2xxSuccessful()) {
// 记录通知成功
notifyRecordService.recordSuccess(
message.getMessageId(),
response.getBody()
);
return true;
} else {
log.warn("通知响应失败: status={}, messageId={}",
response.getStatusCode(), message.getMessageId());
return false;
}
} catch (ResourceAccessException e) {
log.warn("网络连接失败: messageId={}, url={}",
message.getMessageId(), message.getNotifyUrl());
return false;
} catch (Exception e) {
log.error("通知请求异常: messageId={}", message.getMessageId(), e);
return false;
}
}
/**
* 处理通知失败
*/
private void handleNotifyFailure(NotifyMessage message) {
int newRetryCount = message.getRetryCount() + 1;
if (newRetryCount >= message.getMaxRetryCount()) {
// 达到最大重试次数,标记为失败
notifyMessageMapper.updateStatus(message.getId(),
NotifyStatus.NOTIFYING, NotifyStatus.FAILED);
log.error("通知达到最大重试次数: messageId={}, retryCount={}",
message.getMessageId(), newRetryCount);
// 触发告警,需要人工干预
triggerAlert(message);
} else {
// 计算下次重试时间(指数退避策略)
Date nextRetryTime = calculateNextRetryTime(newRetryCount);
notifyMessageMapper.updateForRetry(
message.getId(),
newRetryCount,
nextRetryTime,
NotifyStatus.PENDING
);
log.info("通知失败,等待重试: messageId={}, retryCount={}, nextRetry={}",
message.getMessageId(), newRetryCount, nextRetryTime);
}
}
/**
* 计算下次重试时间(指数退避)
*/
private Date calculateNextRetryTime(int retryCount) {
long delay = Math.min(1000 * (long) Math.pow(2, retryCount), 3600000); // 最大1小时
return new Date(System.currentTimeMillis() + delay);
}
/**
* 触发告警
*/
private void triggerAlert(NotifyMessage message) {
// 发送邮件、短信、钉钉通知等
log.error("需要人工干预的通知消息: messageId={}, businessType={}, businessId={}",
message.getMessageId(), message.getBusinessType(), message.getBusinessId());
}
}
java@Service
@Slf4j
public class NotifyRecordService {
@Autowired
private NotifyRecordMapper notifyRecordMapper;
/**
* 检查消息是否已经通知成功
*/
public boolean isNotified(String messageId) {
return notifyRecordMapper.existsByMessageIdAndStatus(messageId, ReceiveStatus.SUCCESS);
}
/**
* 记录通知成功
*/
@Transactional
public void recordSuccess(String messageId, String responseData) {
// 使用insert ignore防止重复插入
notifyRecordMapper.insertIgnore(
messageId,
ReceiveStatus.SUCCESS,
new Date(),
responseData
);
}
/**
* 记录通知失败
*/
public void recordFailure(String messageId, String errorMsg) {
notifyRecordMapper.insertIgnore(
messageId,
ReceiveStatus.FAILED,
new Date(),
errorMsg
);
}
}
java@Service
@Slf4j
public class OrderMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
public String createOrderWithMessage(OrderCreateRequest request) {
// 1. 创建订单
Order order = createOrder(request);
// 2. 发送事务消息
String transactionId = UUID.randomUUID().toString();
OrderMessage message = new OrderMessage();
message.setMessageId(transactionId);
message.setOrderNo(order.getOrderNo());
message.setEventType("ORDER_CREATED");
message.setTimestamp(new Date());
Message<OrderMessage> mqMessage = MessageBuilder
.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, transactionId)
.setHeader("businessType", "ORDER")
.build();
// 发送延迟消息,第一次延迟5秒(给业务处理留出时间)
rocketMQTemplate.syncSend("ORDER_NOTIFY_TOPIC", mqMessage, 3000, 5);
return order.getOrderNo();
}
}
java@Component
@Slf4j
@RocketMQMessageListener(
topic = "ORDER_NOTIFY_TOPIC",
consumerGroup = "order-notify-consumer-group",
delayLevelWhenNextConsume = 0 // 使用RocketMQ的重试机制
)
public class OrderNotifyConsumer implements RocketMQListener<MessageExt> {
@Autowired
private NotifyService notifyService;
@Autowired
private NotifyRecordService notifyRecordService;
@Override
public void onMessage(MessageExt message) {
String messageId = message.getMsgId();
String keys = message.getKeys();
try {
// 幂等性检查
if (notifyRecordService.isNotified(keys)) {
log.info("消息已处理,跳过: messageId={}, keys={}", messageId, keys);
return;
}
String body = new String(message.getBody(), StandardCharsets.UTF_8);
OrderMessage orderMessage = JSON.parseObject(body, OrderMessage.class);
// 执行通知
boolean success = notifyService.notifyOrderCreated(orderMessage);
if (success) {
// 记录成功
notifyRecordService.recordSuccess(keys, "RocketMQ通知成功");
log.info("订单通知成功: orderNo={}", orderMessage.getOrderNo());
} else {
// 通知失败,抛出异常让RocketMQ重试
throw new RuntimeException("通知处理失败");
}
} catch (Exception e) {
log.error("订单通知消费失败: messageId={}, keys={}", messageId, keys, e);
// 记录失败
notifyRecordService.recordFailure(keys, e.getMessage());
// 根据重试次数决定是否继续重试
int reconsumeTimes = message.getReconsumeTimes();
if (reconsumeTimes >= 8) { // 最大重试8次
log.error("达到最大重试次数,进入死信队列: keys={}, retryCount={}", keys, reconsumeTimes);
// RocketMQ会自动将消息投递到死信队列
} else {
throw new RuntimeException("需要重试", e);
}
}
}
}
java@Component
@Slf4j
@RocketMQMessageListener(
topic = "%DLQ%order-notify-consumer-group",
consumerGroup = "order-notify-dlq-consumer"
)
public class OrderNotifyDLQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String keys = message.getKeys();
log.error("收到死信队列消息,需要人工干预: keys={}", keys);
// 发送告警通知
sendAlert(keys, new String(message.getBody()));
// 可以在这里实现一些自动修复逻辑,或者等待人工处理
}
private void sendAlert(String keys, String messageBody) {
// 发送邮件、钉钉、短信告警
log.error("死信消息告警 - keys: {}, body: {}", keys, messageBody);
}
}
优点
缺点
最大努力通知型分布式事务是一种在实际业务中非常实用的柔性事务解决方案。它通过 异步处理 + 重试机制 + 最终一致性 的设计理念,在保证系统性能的同时,提供了足够的数据可靠性保障。
核心要点总结:
业务优先:先完成本地业务处理,再异步通知
消息持久化:确保通知消息不丢失
智能重试:实现合理的重试策略和退避机制
幂等设计:接收方必须能够处理重复通知
监控告警:及时发现和处理问题
人工干预:为异常情况提供处理手段
在实际项目中,可以根据具体业务需求和技术栈选择合适的实现方案。无论选择哪种方案,都要牢记最大努力通知的核心思想:尽最大努力交付,但不保证100%成功,这种务实的设计理念正是其在分布式系统中广受欢迎的原因。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!