在分布式系统中,保证数据一致性是一个核心挑战。特别是在跨服务调用时,如何确保多个服务的操作要么全部成功,要么全部失败,这就是分布式事务要解决的问题。本文将深入探讨可靠消息投递的分布式事务原理,重点分析RocketMQ事务消息 和 本地消息表 两种实现方式。
基本思路:将分布式事务拆解为多个本地事务,通过消息队列保证各个本地事务的最终一致性。
核心思想:避免分布式事务。
要实现可靠消息投递,必须解决两个核心问题:
其基本原理是:
在可靠消息投递中,关键在于保证"业务操作"与"消息发送"的原子性。如果业务操作成功但消息发送失败,或业务操作失败但消息已发送,系统就会出现数据不一致。可靠消息投递机制通过特定的实现方式,解决了这一问题。
可靠消息投递是实现分布式事务最终一致性的有效方案,而 RocketMQ事务消息 和 本地消息表 是两种主流的实现方式。
RocketMQ 事务消息基于 两阶段提交(2PC) 思想,通过"半消息"机制实现业务与消息的原子性。
提示
RocketMQ 天然的支持事务消息,可查看其官方文档 《事务消息》

生产者实现
java@Slf4j
@Component
public class MQTXProducerService {
public static final String TOPIC = "RLT_TEST_TOPIC";
public static final String TAG = "charge";
@Autowired
RocketMQTemplate rocketMQTemplate;
/**
* 先向MQ Server发送半消息
* @param userCharge 用户充值信息
*/
public TransactionSendResult sendHalfMsg(UserCharge userCharge) {
/*
执行顺序:
1:发送半消息
2:执行本地事务(实现了 RocketMQLocalTransactionListener 接口的类)
3:发送半消息
4:MQ消费
*/
// 生成事务id,唯一,可用业务标识
String transactionId = UUID.randomUUID().toString().replace("-", "");
log.info("1、【发送半消息】transactionId={}", transactionId);
// 发送事务消息(参1:生产者所在事务组,参2:topic+tag,参3:消息体(可以传参),参4:发送参数)
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TOPIC + ":" + TAG,
MessageBuilder.withPayload(userCharge).setHeader(RocketMQHeaders.MESSAGE_ID, transactionId).build(),
userCharge);
log.info("【发送半消息】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
}
执行本地事务 + 回查方法
java@Slf4j
@RocketMQTransactionListener
public class MQTXLocalService implements RocketMQLocalTransactionListener {
@Autowired
private ITUserService userService;
@Autowired
private TMqTransactionLogMapper mqTransactionLogMapper;
/*
这里代码是主要关键的地方,本地事务是给用户增加余额后再插入mq事务日志,这两个操作只有成功了,才返回COMMIT,异常失败就返回ROLLBACK
回查方法不一定会执行,但是得有,回查就是根据我们之前生成穿过来的那个事务id(transactionId)来查询事务日志表,
这样的好处是业务牵涉的表再多无所谓,我这个日志表也与你本地事务绑定,我只需查询这一张事务表就够了,能找到就代表本地事务执行成功了
*/
/**
* 用于执行本地事务的方法
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) {
// 获取消息体里参数
MessageHeaders messageHeaders = message.getHeaders();
String transactionId = (String) messageHeaders.get(RocketMQHeaders.TRANSACTION_ID);
log.info("2、【执行本地事务】消息体参数:transactionId={}", transactionId);
// 执行带有事务注解的本地方法:增加用户余额+保存mq日志
try {
UserCharge userCharge = (UserCharge) obj;
userService.addBalance(userCharge, transactionId);
log.info("3、【执行本地事务】提交commit:transactionId={}", transactionId);
// 正常:向MQ Server发送commit消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("【执行本地事务】发生异常,消息将被回滚", e);
// 异常:向MQ Server发送rollback消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 用于回查本地事务执行结果的方法
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID, String.class);
log.info("【回查本地事务】transactionId={}", transactionId);
// 根据事务id查询事务日志表
TMqTransactionLog mqTransactionLog = mqTransactionLogMapper.selectById(transactionId);
if (null == mqTransactionLog) {
// 没查到表明本地事务执行失败,通知回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
// 查到表明本地事务执行成功,提交
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者实现
java@Slf4j
@Component
@RocketMQMessageListener(topic = MQTXProducerService.TOPIC, selectorExpression = MQTXProducerService.TAG, consumerGroup = "Con_Group_Four")
public class MQTXConsumerService implements RocketMQListener<UserCharge> {
@Autowired
private ITCreditService creditService;
@Override
public void onMessage(UserCharge userCharge) {
// 一般真实环境这里消费前,得做幂等性判断,防止重复消费
// 方法一:如果你的业务中有某个字段是唯一的,有标识性,如订单号,那就可以用此字段来判断
// 方法二:新建一张消费记录表t_mq_consumer_log,字段consumer_key是唯一性,能插入则表明该消息还未消费,往下走,否则停止消费
// 我个人建议用方法二,根据你的项目业务来定义key,这里我就不做幂等判断了,因为此案例只是模拟,重在分布式事务
// 给用户增加积分
TCredit tCredit = creditService.getOne(creditService.baseWrapper(new TCreditDTO().setUserId(userCharge.getUserId())));
boolean i = creditService.updateById(tCredit.setIntegration(tCredit.getIntegration() + userCharge.getChargeAmount()));
if (i) {
log.info("【MQ消费】用户增加积分成功,userCharge={}", JSONObject.toJSONString(userCharge));
} else {
log.error("【MQ消费】用户充值增加积分消费失败,userCharge={}", JSONObject.toJSONString(userCharge));
}
}
}
优点:
缺点:
本地消息表是一种基于BASE理论的最终一致性方案,核心思路是将分布式事务拆分成本地事务进行处理,将消息数据与业务数据保存在同一个数据库中,保证本地数据库事务保证两者的原子性,利用中间件查询其他服务的事务消息状态。

数据库表设计
sql-- 业务订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_no VARCHAR(64) NOT NULL UNIQUE,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
quantity INT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status TINYINT NOT NULL DEFAULT 0,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 本地消息表
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE,
topic VARCHAR(128) NOT NULL,
tags VARCHAR(128),
body TEXT NOT NULL,
status TINYINT NOT NULL DEFAULT 0 COMMENT '0:待发送,1:已发送,2:发送失败',
retry_count INT NOT NULL DEFAULT 0,
next_retry_time DATETIME,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_next_retry (status, next_retry_time)
);
本地事务+消息处理
java@Service
@Transactional
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private MessageSender messageSender;
public void createOrderWithMessage(Order order) {
// 1. 保存订单
orderMapper.insert(order);
// 2. 构建消息
LocalMessage message = new LocalMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setTopic("OrderTopic");
message.setTags("CREATE_ORDER");
Map<String, Object> messageBody = new HashMap<>();
messageBody.put("orderId", order.getId());
messageBody.put("productId", order.getProductId());
messageBody.put("quantity", order.getQuantity());
message.setBody(JSON.toJSONString(messageBody));
message.setStatus(0); // 待发送
// 3. 保存消息(与订单在同一个事务中)
localMessageMapper.insert(message);
// 事务提交后,异步发送消息
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
messageSender.asyncSendMessage(message);
}
}
);
}
}
发送事务消息
java@Component
public class MessageSender {
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Async
public void asyncSendMessage(LocalMessage message) {
try {
Message<String> mqMessage = MessageBuilder.withPayload(message.getBody())
.setHeader(RocketMQHeaders.KEYS, message.getMessageId())
.build();
rocketMQTemplate.send(message.getTopic() + ":" + message.getTags(), mqMessage);
// 更新消息状态为已发送
message.setStatus(1);
localMessageMapper.updateStatus(message);
} catch (Exception e) {
// 发送失败,更新重试信息
message.setStatus(2);
message.setRetryCount(message.getRetryCount() + 1);
message.setNextRetryTime(calculateNextRetryTime(message.getRetryCount()));
localMessageMapper.updateStatus(message);
}
}
private Date calculateNextRetryTime(int retryCount) {
// 指数退避策略
long delay = Math.min(1000 * (long) Math.pow(2, retryCount), 3600000); // 最大1小时
return new Date(System.currentTimeMillis() + delay);
}
}
消息补偿机制
java@Component
@Slf4j
public class MessageCompensateTask {
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private MessageSender messageSender;
@Scheduled(fixedDelay = 30000) // 每30秒执行一次
public void compensateFailedMessages() {
List<LocalMessage> failedMessages = localMessageMapper
.selectByStatusAndTime(2, new Date()); // 状态为发送失败且到达重试时间
for (LocalMessage message : failedMessages) {
if (message.getRetryCount() >= 10) { // 最大重试次数
log.warn("消息达到最大重试次数,需要人工干预: {}", message.getMessageId());
continue;
}
try {
messageSender.asyncSendMessage(message);
} catch (Exception e) {
log.error("消息补偿发送失败: {}", message.getMessageId(), e);
}
}
}
}
消费者实现
java@Component
@RocketMQMessageListener(
topic = "OrderTopic",
selectorExpression = "CREATE_ORDER",
consumerGroup = "Order_Consumer_Group"
)
public class OrderConsumer implements RocketMQListener<MessageExt> {
@Autowired
private InventoryService inventoryService;
@Autowired
private ConsumedMessageMapper consumedMessageMapper;
@Override
public void onMessage(MessageExt message) {
String messageId = message.getMsgId();
// 检查是否已经消费过(幂等性保障)
if (consumedMessageMapper.exists(messageId)) {
log.info("消息已消费,跳过处理: {}", messageId);
return;
}
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
Map<String, Object> data = JSON.parseObject(body, Map.class);
Long productId = Long.valueOf(data.get("productId").toString());
Integer quantity = Integer.valueOf(data.get("quantity").toString());
// 执行库存扣减
inventoryService.deductInventory(productId, quantity);
// 记录消费成功的消息
consumedMessageMapper.insert(messageId, new Date());
} catch (Exception e) {
log.error("消息消费失败: {}", messageId, e);
throw new RuntimeException("消费失败,触发重试", e);
}
}
}
优点:
缺点:
接下来我们考虑下在实际项目中遇到了 多服务调用的协调问题,这是非常典型的分布式事务复杂场景问题。
比如电商系统中的订单、库存、支付服务,需要考虑链式服务之间的调用关系,同时需要考虑如何确保多个步骤要么全部成功,要么全部回滚,在实际编码中实现事务的逆向操作。
提示
以下代码示例中将以本地消息表方案作为示例,在 RocketMQ 事务消息方案中的思路是一样的。
服务A调用服务B,服务B调用服务C,同时每个服务都有自己的本地事务
java// 服务A:订单服务
@Service
public class OrderService {
@Transactional
public void createOrder(OrderDTO order) {
// 1. 创建订单
orderRepository.save(order);
// 2. 创建消息:通知库存服务扣减库存
MessageRecord message = new MessageRecord();
message.setMessageId(UUID.randomUUID().toString());
message.setBusinessId(order.getId());
message.setMessageContent(JSON.toJSONString(order));
message.setStatus("PENDING");
message.setTargetService("inventory-service"); // 目标服务
messageRecordRepository.save(message);
}
}
// 服务B:库存服务
@Component
public class InventoryMessageConsumer {
@Autowired
private InventoryService inventoryService;
public void handleInventoryMessage(MessageRecord message) {
try {
// 1. 扣减库存(本地事务)
inventoryService.decreaseInventory(message.getBusinessId());
// 2. 创建下游消息:通知物流服务创建配送单
MessageRecord downstreamMessage = new MessageRecord();
downstreamMessage.setMessageId(UUID.randomUUID().toString());
downstreamMessage.setBusinessId(message.getBusinessId());
downstreamMessage.setMessageContent(JSON.toJSONString(message));
downstreamMessage.setStatus("PENDING");
downstreamMessage.setTargetService("logistics-service");
// 注意:这里是在库存服务的数据库中创建消息
messageRecordRepository.save(downstreamMessage);
// 3. 更新当前消息状态
message.setStatus("SUCCESS");
messageRecordRepository.save(message);
} catch (Exception e) {
// 处理失败,更新状态为FAILED,后续重试
message.setStatus("FAILED");
messageRecordRepository.save(message);
throw e;
}
}
}
// 服务C:物流服务
@Component
public class LogisticsMessageConsumer {
public void handleLogisticsMessage(MessageRecord message) {
try {
// 创建配送单
logisticsService.createDeliveryOrder(message.getBusinessId());
message.setStatus("SUCCESS");
messageRecordRepository.save(message);
// 可选:发送最终确认消息给订单服务
sendFinalConfirmation(message.getBusinessId());
} catch (Exception e) {
message.setStatus("FAILED");
messageRecordRepository.save(message);
throw e;
}
}
}
服务A需要同时调用服务B和服务C,两个调用是并行的,互不影响
java// 服务A:订单服务
@Service
public class OrderService {
@Transactional
public void createOrder(OrderDTO order) {
// 1. 创建订单
orderRepository.save(order);
// 2. 创建两条独立的消息
MessageRecord inventoryMessage = createMessage(order, "inventory-service");
MessageRecord paymentMessage = createMessage(order, "payment-service");
messageRecordRepository.save(inventoryMessage);
messageRecordRepository.save(paymentMessage);
// 3. 可选:创建聚合状态记录
OrderStatus status = new OrderStatus();
status.setOrderId(order.getId());
status.setInventoryStatus("PENDING");
status.setPaymentStatus("PENDING");
status.setOverallStatus("PROCESSING");
orderStatusRepository.save(status);
}
private MessageRecord createMessage(OrderDTO order, String targetService) {
MessageRecord message = new MessageRecord();
message.setMessageId(UUID.randomUUID().toString());
message.setBusinessId(order.getId());
message.setMessageContent(JSON.toJSONString(order));
message.setStatus("PENDING");
message.setTargetService(targetService);
return message;
}
}
// 服务B和C的处理逻辑类似,处理完成后发送确认消息
@Component
public class InventoryMessageConsumer {
public void handleInventoryMessage(MessageRecord message) {
try {
inventoryService.decreaseInventory(message.getBusinessId());
message.setStatus("SUCCESS");
messageRecordRepository.save(message);
// 发送确认消息给订单服务
sendConfirmation("inventory-service", message.getBusinessId(), "SUCCESS");
} catch (Exception e) {
message.setStatus("FAILED");
messageRecordRepository.save(message);
sendConfirmation("inventory-service", message.getBusinessId(), "FAILED");
throw e;
}
}
}
// 服务A:处理确认消息
@Component
public class OrderConfirmationConsumer {
public void handleConfirmation(ConfirmationMessage confirmation) {
// 更新聚合状态
OrderStatus status = orderStatusRepository.findByOrderId(confirmation.getBusinessId());
if ("inventory-service".equals(confirmation.getServiceName())) {
status.setInventoryStatus(confirmation.getStatus());
} else if ("payment-service".equals(confirmation.getServiceName())) {
status.setPaymentStatus(confirmation.getStatus());
}
// 检查整体状态
if ("SUCCESS".equals(status.getInventoryStatus()) &&
"SUCCESS".equals(status.getPaymentStatus())) {
status.setOverallStatus("COMPLETED");
} else if ("FAILED".equals(status.getInventoryStatus()) ||
"FAILED".equals(status.getPaymentStatus())) {
status.setOverallStatus("FAILED");
// 触发补偿逻辑
triggerCompensation(status.getOrderId());
}
orderStatusRepository.save(status);
}
}
服务A和B的本地事务已经提交,服务C处理失败,需要进行补偿(回滚)操作
以上可靠消息投递的两种基本流程已经理解,其本身只保证消息生产和本地事务的一致性,并不直接管理多个服务的回滚,主要解决的是 生产者本地事务 与 消息发送的原子性 问题,但它本身并不直接管理多个服务之间的分布式事务协调。当涉及多个服务调用时,我们需要额外的机制来处理。
这时候需要引入 Saga模式 的概念,通过 补偿事务 来实现回滚。
sqlCREATE TABLE distributed_transaction (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
transaction_id VARCHAR(64) NOT NULL UNIQUE COMMENT '全局事务ID',
current_service VARCHAR(100) NOT NULL COMMENT '当前服务名',
next_service VARCHAR(100) COMMENT '下一个服务名',
transaction_status TINYINT NOT NULL COMMENT '0:进行中, 1:已完成, 2:已回滚, 3:失败',
business_data TEXT COMMENT '业务数据',
compensation_data TEXT COMMENT '补偿需要的数据',
retry_count INT DEFAULT 0,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_transaction_id (transaction_id),
INDEX idx_status (transaction_status)
);
java@Component
@Slf4j
public class TransactionCoordinator {
@Autowired
private DistributedTransactionMapper transactionMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 开始分布式事务
*/
public String beginTransaction(String currentService, String businessData) {
String transactionId = generateTransactionId();
DistributedTransaction transaction = new DistributedTransaction();
transaction.setTransactionId(transactionId);
transaction.setCurrentService(currentService);
transaction.setTransactionStatus(0); // 进行中
transaction.setBusinessData(businessData);
transactionMapper.insert(transaction);
return transactionId;
}
/**
* 执行下一个服务调用
*/
public void invokeNextService(String transactionId, String nextService,
Object messageBody) {
try {
// 更新事务状态
DistributedTransaction transaction = transactionMapper.selectByTransactionId(transactionId);
transaction.setNextService(nextService);
transactionMapper.update(transaction);
// 发送事务消息到下一个服务
Map<String, Object> message = new HashMap<>();
message.put("transactionId", transactionId);
message.put("businessData", transaction.getBusinessData());
message.put("messageBody", messageBody);
Message<String> mqMessage = MessageBuilder.withPayload(JSON.toJSONString(message))
.setHeader(RocketMQHeaders.KEYS, transactionId)
.build();
rocketMQTemplate.sendMessageInTransaction(
"TransactionTopic",
mqMessage,
null
);
} catch (Exception e) {
log.error("调用下一个服务失败: transactionId={}, nextService={}",
transactionId, nextService, e);
// 触发补偿流程
triggerCompensation(transactionId);
}
}
/**
* 标记事务步骤完成
*/
public void completeStep(String transactionId) {
DistributedTransaction transaction = transactionMapper.selectByTransactionId(transactionId);
transaction.setTransactionStatus(1); // 已完成
transactionMapper.update(transaction);
}
/**
* 触发补偿流程
*/
public void triggerCompensation(String transactionId) {
log.info("开始补偿流程: transactionId={}", transactionId);
// 发送补偿消息(按照服务调用逆序)
Message<String> compensateMessage = MessageBuilder.withPayload(transactionId)
.setHeader("COMPENSATION", "true")
.build();
rocketMQTemplate.syncSend("CompensationTopic", compensateMessage);
}
}
java@Service
@Slf4j
public class OrderService {
@Autowired
private TransactionCoordinator transactionCoordinator;
@Autowired
private InventoryService inventoryService;
@Transactional
public String createOrder(OrderRequest request) {
// 1. 开始分布式事务
String transactionId = transactionCoordinator.beginTransaction(
"order-service",
JSON.toJSONString(request)
);
try {
// 2. 执行本地事务(创建订单)
Order order = createOrderLocal(request);
// 3. 保存补偿需要的数据
String compensationData = buildCompensationData(order);
// 4. 调用下一个服务(库存服务)
InventoryDeductRequest inventoryRequest = buildInventoryRequest(order);
transactionCoordinator.invokeNextService(
transactionId,
"inventory-service",
inventoryRequest
);
return transactionId;
} catch (Exception e) {
log.error("创建订单失败: transactionId={}", transactionId, e);
transactionCoordinator.triggerCompensation(transactionId);
throw new RuntimeException("创建订单失败", e);
}
}
/**
* 订单服务补偿操作
*/
@Transactional
public void compensateOrder(String transactionId, String compensationData) {
log.info("执行订单服务补偿: transactionId={}", transactionId);
try {
// 解析补偿数据
OrderCompensationData data = JSON.parseObject(compensationData,
OrderCompensationData.class);
// 取消订单(软删除或状态更新)
orderMapper.updateStatus(data.getOrderId(), OrderStatus.CANCELLED);
// 记录补偿日志
log.info("订单补偿完成: orderId={}", data.getOrderId());
} catch (Exception e) {
log.error("订单补偿失败: transactionId={}", transactionId, e);
throw new RuntimeException("订单补偿失败", e);
}
}
}
java@Service
@Slf4j
public class InventoryService {
@Autowired
private TransactionCoordinator transactionCoordinator;
@Autowired
private PointService pointService;
/**
* 库存扣减(事务消息消费者)
*/
@RocketMQMessageListener(
topic = "TransactionTopic",
selectorExpression = "inventory-service",
consumerGroup = "inventory-consumer-group"
)
public class InventoryConsumer implements RocketMQListener<MessageExt> {
@Override
@Transactional
public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
Map<String, Object> data = JSON.parseObject(body, Map.class);
String transactionId = (String) data.get("transactionId");
InventoryDeductRequest request = JSON.parseObject(
JSON.toJSONString(data.get("messageBody")),
InventoryDeductRequest.class
);
// 检查是否补偿消息
if (isCompensationMessage(message)) {
compensateInventory(transactionId, request);
return;
}
// 执行库存扣减
boolean success = deductInventory(request);
if (success) {
// 标记当前步骤完成
transactionCoordinator.completeStep(transactionId);
// 调用下一个服务(积分服务)
PointAddRequest pointRequest = buildPointRequest(request);
transactionCoordinator.invokeNextService(
transactionId,
"point-service",
pointRequest
);
} else {
// 库存不足,触发补偿
transactionCoordinator.triggerCompensation(transactionId);
}
} catch (Exception e) {
log.error("库存服务处理失败", e);
// 触发补偿
String transactionId = extractTransactionId(message);
transactionCoordinator.triggerCompensation(transactionId);
}
}
/**
* 库存服务补偿操作
*/
@Transactional
public void compensateInventory(String transactionId, InventoryDeductRequest request) {
log.info("执行库存服务补偿: transactionId={}", transactionId);
try {
// 恢复库存
restoreInventory(request.getProductId(), request.getQuantity());
log.info("库存补偿完成: productId={}, quantity={}",
request.getProductId(), request.getQuantity());
} catch (Exception e) {
log.error("库存补偿失败: transactionId={}", transactionId, e);
throw new RuntimeException("库存补偿失败", e);
}
}
}
}
java@Service
@Slf4j
public class PointService {
@RocketMQMessageListener(
topic = "TransactionTopic",
selectorExpression = "point-service",
consumerGroup = "point-consumer-group"
)
public class PointConsumer implements RocketMQListener<MessageExt> {
@Override
@Transactional
public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
Map<String, Object> data = JSON.parseObject(body, Map.class);
String transactionId = (String) data.get("transactionId");
PointAddRequest request = JSON.parseObject(
JSON.toJSONString(data.get("messageBody")),
PointAddRequest.class
);
// 检查是否补偿消息
if (isCompensationMessage(message)) {
compensatePoints(transactionId, request);
return;
}
// 执行积分增加
boolean success = addPoints(request);
if (success) {
// 标记整个事务完成
completeTransaction(transactionId);
log.info("分布式事务完成: transactionId={}", transactionId);
} else {
// 积分增加失败,触发补偿
transactionCoordinator.triggerCompensation(transactionId);
}
} catch (Exception e) {
log.error("积分服务处理失败", e);
String transactionId = extractTransactionId(message);
transactionCoordinator.triggerCompensation(transactionId);
}
}
/**
* 积分服务补偿操作
*/
@Transactional
public void compensatePoints(String transactionId, PointAddRequest request) {
log.info("执行积分服务补偿: transactionId={}", transactionId);
try {
// 扣减积分(恢复原状)
deductPoints(request.getUserId(), request.getPoints());
log.info("积分补偿完成: userId={}, points={}",
request.getUserId(), request.getPoints());
} catch (Exception e) {
log.error("积分补偿失败: transactionId={}", transactionId, e);
throw new RuntimeException("积分补偿失败", e);
}
}
}
}
java@Component
@Slf4j
public class CompensationConsumer {
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
@Autowired
private PointService pointService;
@RocketMQMessageListener(
topic = "CompensationTopic",
consumerGroup = "compensation-consumer-group"
)
public void handleCompensation(String transactionId) {
log.info("处理补偿事务: transactionId={}", transactionId);
try {
// 按照调用链的逆序执行补偿
// 1. 补偿积分服务
pointService.compensatePoints(transactionId, getPointCompensationData(transactionId));
// 2. 补偿库存服务
inventoryService.compensateInventory(transactionId, getInventoryCompensationData(transactionId));
// 3. 补偿订单服务
orderService.compensateOrder(transactionId, getOrderCompensationData(transactionId));
log.info("补偿流程完成: transactionId={}", transactionId);
} catch (Exception e) {
log.error("补偿流程执行失败: transactionId={}", transactionId, e);
// 记录失败日志,需要人工干预
}
}
}
核心要点,
每个补偿操作必须保证幂等性,防止重复补偿:
要明确事务的状态机
javapublic enum TransactionStatus {
INITIATED(0, "已初始化"),
IN_PROGRESS(1, "进行中"),
ORDER_COMPLETED(2, "订单服务完成"),
INVENTORY_COMPLETED(3, "库存服务完成"),
POINT_COMPLETED(4, "积分服务完成"),
SUCCESS(5, "事务成功"),
COMPENSATING(6, "补偿中"),
COMPENSATED(7, "已补偿"),
FAILED(8, "事务失败");
// ... getters and constructors
}
超时控制,防止事务长时间挂起
java// 1. 定义补偿消息结构
@Data
public class CompensationMessage {
private String originalMessageId; // 原始消息ID
private String businessId;
private String serviceName; // 需要补偿的服务
private String compensationType; // 补偿类型
}
// 2. 服务C失败时的处理
@Component
public class LogisticsMessageConsumer {
public void handleLogisticsMessage(MessageRecord message) {
try {
logisticsService.createDeliveryOrder(message.getBusinessId());
message.setStatus("SUCCESS");
messageRecordRepository.save(message);
} catch (Exception e) {
message.setStatus("FAILED");
messageRecordRepository.save(message);
// 发送补偿消息
CompensationMessage compensation = new CompensationMessage();
compensation.setOriginalMessageId(message.getMessageId());
compensation.setBusinessId(message.getBusinessId());
compensation.setServiceName("inventory-service"); // 需要补偿的上游服务
compensation.setCompensationType("ROLLBACK_INVENTORY");
rocketMQTemplate.convertAndSend("compensation-topic", compensation);
throw e;
}
}
}
// 3. 服务B:处理补偿消息
@Component
public class InventoryCompensationConsumer {
public void handleCompensation(CompensationMessage compensation) {
try {
// 执行补偿操作:增加库存(回滚扣减操作)
inventoryService.increaseInventory(compensation.getBusinessId());
// 发送进一步的补偿消息给服务A
CompensationMessage upstreamCompensation = new CompensationMessage();
upstreamCompensation.setBusinessId(compensation.getBusinessId());
upstreamCompensation.setServiceName("order-service");
upstreamCompensation.setCompensationType("ROLLBACK_ORDER_STATUS");
rocketMQTemplate.convertAndSend("compensation-topic", upstreamCompensation);
} catch (Exception e) {
// 补偿失败需要人工干预或重试
log.error("补偿操作失败", e);
// 可以发送告警或记录到人工处理队列
}
}
}
// 4. 服务A:处理最终补偿
@Component
public class OrderCompensationConsumer {
public void handleCompensation(CompensationMessage compensation) {
// 更新订单状态为失败
orderService.updateOrderStatus(compensation.getBusinessId(), "FAILED");
// 可能还需要通知用户或其他业务处理
notificationService.notifyOrderFailed(compensation.getBusinessId());
}
}
补偿消息的可靠性保证
java// 补偿消息表(每个服务都需要)
@Data
public class CompensationRecord {
private String compensationId;
private String originalBusinessId;
private String serviceName;
private String compensationType;
private String status; // PENDING, SUCCESS, FAILED
private int retryCount;
private Date createTime;
}
// 补偿服务实现
@Service
public class CompensationService {
@Transactional
public void createCompensation(String businessId, String serviceName, String type) {
CompensationRecord record = new CompensationRecord();
record.setCompensationId(UUID.randomUUID().toString());
record.setOriginalBusinessId(businessId);
record.setServiceName(serviceName);
record.setCompensationType(type);
record.setStatus("PENDING");
compensationRecordRepository.save(record);
// 发送补偿消息
rocketMQTemplate.convertAndSend("compensation-topic", record);
}
// 定时任务:重试失败的补偿
@Scheduled(fixedRate = 30000)
public void retryFailedCompensations() {
List<CompensationRecord> failedRecords =
compensationRecordRepository.findByStatus("FAILED");
for (CompensationRecord record : failedRecords) {
if (record.getRetryCount() < 5) {
rocketMQTemplate.convertAndSend("compensation-topic", record);
record.setRetryCount(record.getRetryCount() + 1);
compensationRecordRepository.save(record);
} else {
// 超过重试次数,需要人工处理
alertService.sendAlert("补偿失败需要人工处理: " + record.getCompensationId());
}
}
}
}
幂等性保证
java// 补偿操作的幂等性处理
@Service
public class InventoryService {
public void increaseInventory(String orderId) {
// 检查是否已经执行过补偿
if (compensationRecordRepository.existsByBusinessIdAndType(orderId, "INCREASE_INVENTORY")) {
return; // 已经执行过,直接返回
}
// 执行补偿逻辑
inventoryRepository.increaseStock(orderId);
// 记录补偿执行
CompensationExecution execution = new CompensationExecution();
execution.setBusinessId(orderId);
execution.setCompensationType("INCREASE_INVENTORY");
execution.setExecuteTime(new Date());
compensationExecutionRepository.save(execution);
}
}
通过Saga模式 + RocketMQ事务消息 + 补偿事务的组合方案,我们可以有效处理多服务之间的分布式事务:
正向流程:通过事务消息依次调用各个服务
异常处理:任何服务失败时触发补偿流程
补偿机制:按照调用链的逆序执行补偿操作
可靠性保障:通过幂等性、状态跟踪、超时控制等机制确保数据一致性
监控和运维:建立完整的链路追踪,监控消息处理延迟和失败率,提供人工干预的入口
这种方案虽然实现相对复杂,但能够很好地解决多服务调用的事务一致性问题,是分布式系统中常用的成熟模式。
| 特性 | RocketMQ事务消息 | 本地消息表 |
|---|---|---|
| 实现复杂度 | 中等 | 中等 |
| 性能 | 高 | 中等 |
| 数据一致性 | 强 | 强 |
| 中间件依赖 | 强(RocketMQ) | 弱 |
| 通用性 | 低 | 高 |
| 运维成本 | 中等 | 中等 |
选择建议:
如果已经使用RocketMQ:优先考虑事务消息方案,实现更简洁
如果需要跨多种消息中间件:选择本地消息表,更具通用性
对性能要求极高:RocketMQ事务消息性能更优
技术团队熟悉度:选择团队更熟悉的技术方案
幂等性设计:无论使用哪种方案,消费者都必须实现幂等性处理,防止重复消费
监控与告警
监控消息积压情况
设置事务失败告警
定期检查补偿任务运行状态
重试策略
实现指数退避重试机制
设置最大重试次数
超过重试次数后人工干预
数据对账:定期执行数据对账任务,发现并修复不一致的数据
java@Component
@Slf4j
public class DataReconciliationTask {
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void reconcileData() {
// 检查订单与库存的一致性
List<Order> orders = orderService.findInconsistentOrders();
for (Order order : orders) {
log.warn("发现数据不一致订单: {}", order.getId());
// 触发补偿流程或通知人工处理
}
}
}
可靠消息投递是实现分布式事务最终一致性的有效方案,而 RocketMQ事务消息 和 本地消息表 是两种主流的实现方式。RocketMQ事务消息通过MQ的内置事务机制提供简洁的实现,而本地消息表则通过应用层实现提供更高的灵活性。
在实际项目中,应根据业务特点、系统架构和现有技术栈选择合适的方案。对于事务操作时间短、已使用RocketMQ的系统,RocketMQ事务消息是理想选择;对于需要与MQ解耦、事务操作时间长的系统,本地消息表更为合适。
无论选择哪种方式,都需注意以下关键点:
通过合理运用这些可靠消息投递的实现方式,我们可以在分布式系统中构建出高可靠、高可用的事务处理机制,为业务系统的稳定运行提供有力保障。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!