0
点赞
收藏
分享

微信扫一扫

分布式事务(二):基于可靠消息的分布式事务

项目使用技术

springboot、dubbo、zookeeper、定时任务、消息中间件MQ

一、项目结构

maven父子工程:

父工程:consis

子工程:api-service、order、product、message

api-service:该项目主要是提供接口调用的,还包含实体类、枚举等一些通用内容

order:该项目是专门处理订单相关操作的系统

product:该项目是专门处理产品相关操作的系统

message:该项目是提供消息服务的系统,好包括定时任务

它们的依赖关系如下图:

enter image description here

根据上一篇的原理分别介绍下各个系统的实现

二、order订单系统

核心代码:

@Override
@Transactional
public void add(Orders order) {
String messageBody = JSONObject.toJSONString( order );
//添加消息到数据库
String messageId = transactionMessageService.savePreparMessage(order.getMessageId(), messageBody, Constant.ORDER_QUEUE_NAME );
log.info(>>> 预发送消息,消息编号:{}, messageId);
boolean flag = false;
boolean success = false;
try{

Orders orders = orderDao.saveAndFlush( order );
//int i = 1/0 ;
log.info(>>> 插入订单,订单编号:{}, orders.getId());
flag = true;
}catch (Exception e){
transactionMessageService.delete( messageId );
log.info(>>> 业务执行异常删除消息,消息编号:{}, messageId, e);
throw new RuntimeException( >>> 创建订单失败 );
}finally {
if(flag){
try {
transactionMessageService.confirmAndSend( messageId );
success = true;
log.info(>>> 确认并且发送消息到实时消息中间件,消息编号:{}, messageId);

}catch (Exception e){
log.error(>>> 消息确认异常,消息编号:{}, messageId, e);
if(!success){
transactionMessageService.delete( messageId );
throw new RuntimeException( >>> 确认消息异常,创建订单失败 );
}
}
}
}
}

  • 插入订单表之前,首先创建预发送消息,保存到事务消息表中,此时消息状态为:未发送
  • 插入订单,如果插入订单失败则将事务消息表中预发送消息删除
  • 插入订单成功后,修改消息表预发送消息状态为发送中,并发送消息至mq
  • 如果发送消息失败,则订单回滚并删除事务消息表消息

三、message消息系统

核心代码一:

@Override
public void sendMessageToMessageQueue(String queueName,final String messageBody) {

jmsTemplate.convertAndSend( queueName,messageBody );

log.info(>>> 发送消息到mq 队列:{},消息内容:{}, queueName, messageBody);
}

  • 主要是activemq生产者讲消息发送至MQ消息中间件

核心代码二:

/**
* 定时重发消息(每分钟)
*/

@Scheduled(cron = 0 */1 * * * ?)
public void handler(){
//查询transaction_message表中已发送但未被删除的消息
List<TransactionMessage> list = transactionMessageService.queryRetryList( Constant.MESSAGE_UNDEAD, maxTimeOut, Constant.MESSAGE_SENDING );
if(list!=null && list.size() > 0){
for (TransactionMessage message:list){
try {
transactionMessageService.retry( message.getMessageId() );
} catch (Exception e) {
log.warn(>>> 消息不存在,可能已经被消费,消息编号:{}, message.getMessageId());
}
}
}
}

/**
* 定时通知工作人员(每隔5分钟)
*/

@Scheduled(cron = 0 */5 * * * ?)
public void advance(){
List<Long> messages = transactionMessageService.queryDeadList();
log.warn(>>> 共有:{}条消息需要人工处理, messages.size());
String ids = JSONObject.toJSONString( messages );
//发邮件或者是发送短信通知工作人员处理

}

  • 定时重发消息
  • 定时将死亡的消息通知给工作人员,进行人工补偿操作

四、product产品系统

核心代码:

@Transactional
@JmsListener( destination = Constant.ORDER_QUEUE_NAME)
public void receiveQueue(String msg){
boolean flag = false;
Orders orders = JSONObject.parseObject( msg, Orders.class );
log.info(>>> 接收到mq消息队列,消息编号:{} ,消息内容:{}, orders.getMessageId(), msg);

TransactionMessage transactionMessage = transactionMessageService.findByMessageId( orders.getMessageId() );
try {
//保证幂等性
if(transactionMessage!=null){
List<OrderDetail> list = orders.getList();
for(OrderDetail detail : list){
Product product = productService.findById( detail.getId() );
Long skuNum = product.getProductSku() - detail.getNum();
if(skuNum >= 0){
product.setProductSku( skuNum );
productService.update( product );
}else {
throw new Exception( >>> 库存不足,修改库存失败! );
}

}
//int i = 1 /0 ;
flag = true;
}

}catch (Exception e){
e.printStackTrace();
throw new RuntimeException( e );
}finally {
if(flag){
transactionMessageService.delete( orders.getMessageId() );
DbLog dbLog = dbLogService.findByMesageId( orders.getMessageId() );
if(dbLog!=null){
dbLog.setState( 1 );//已处理成功
dbLogService.update( dbLog );
}
log.info(>>> 业务执行成功删除消息! messageId:{}, orders.getMessageId());
}
}

}

  • 从mq消息中间件中监听并消费消息,将json消息转为订单对象
  • 根据消息编号查询该消息是否已被消费,保证幂等性
  • 如果消息未被消费(即存在此消息),则产品表扣减库存;如果已经消费(不存在此消息),则不做处理
  • 产品表扣减库存成功,则删除此消息,如果待处理消息日志表中有此消息,则更改状态为1,表示已处理;扣减失败,则不做处理

该项目源码已上传至github和码云,链接如下,希望喜欢的朋友都能给个star支持一下!谢谢~

github链接: https://github.com/wanglinyong/consis

码云链接: https://gitee.com/wanglinyong/consis

原文出处:

分布式事务(二):基于可靠消息的分布式事务

举报

相关推荐

0 条评论