0
点赞
收藏
分享

微信扫一扫

深入理解RocketMQ事务源码--TransactionMQProducer、TransactionListener

Gascognya 2022-02-17 阅读 14

前言
上篇文章,介绍了RocketMQ消息类型支持事务消息,常见用在分布式系统中,保证数据的一致性。接下来一起去走进 TransactionMQProducer、TransactionListener俩个核心类

一.基于apache-rocketmq事务消息的实现
(1) pom.xml配置
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <!--lombok插件 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
 
        <!--RocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
 

(2)事务消息生产者配置
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.stereotype.Component;
 
@Component
public class PayProducer {
    /**
     * 生产组,消费者通过订阅发布,在同一个组内
     */
    private String producerGroup = "pay_group";
    /**
     * 端口
     */
    private String nameServer = "127.0.0.1:9876";
    //事务监听,其中rocketMQLisenter实现TransactionListener接口
    private RocketMQLisenter rocketMQLisenter;
    //事务消息生产者配置
    private TransactionMQProducer producer;
 
    public PayProducer() {
        producer = new TransactionMQProducer(producerGroup);
        rocketMQLisenter = new RocketMQLisenter();
        // 指定nameServer地址,多个地址之间以 ; 隔开
        producer.setNamesrvAddr(nameServer);
        producer.setTransactionListener(rocketMQLisenter);
        start();
    }
 
    public TransactionMQProducer getProducer() {
        return producer;
    }
 
    /**
     * 对象在使用之前必须调用一次,并且只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        producer.shutdown();
    }
 
}
(3)事务监听器接口实现
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
 
@Slf4j
public class RocketMQLisenter implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=========本地事务开始执行=============");
        String message = new String(msg.getBody());
        int type = 0;
        // 事务消息id,唯一 
        String tid = msg.getTransactionId(); 
        //todo 为了解决重复消息《==》幂等性,可以将tid放入redis,并设置状态,消费端,每次从redis中获取事务id,根据状态并判断是否被消费过
        //模拟执行本地事务begin=======
        /**
         * 本地事务执行会有三种可能
         * 1、commit 成功
         * 2、Rollback 失败
         * 3、网络等原因服务宕机收不到返回结果
         */
        log.info("本地事务执行参数----------------------");
        //模拟执行本地事务end========
        //TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回
        //1、二次确认消息,然后消费者可以消费
        if (type == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        //2、回滚消息,Broker端会删除半消息
        if (type == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        //3、Broker端会进行回查消息
        if (type == 2) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
 
    /**
     * 当executeLocalTransaction ,返回LocalTransactionState.UNKNOW 调用此方法
     * @param messageExt
     * @return
     */
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("==========回查接口=========");
        String key = messageExt.getKeys();
        //TODO 1、必须根据key先去检查本地事务消息是否完成。
        /**
         * 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候
         * 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。
         * 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。
         */
        // TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
(4)消费者配置
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
 
@Component
public class PayConsumer {
 
    private DefaultMQPushConsumer consumer;
 
    private String consumerGroup = "pay_group";
 
    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(Topic.NAME_SERVER);
        // 设置消费地点,从最后一个进行消费(其实就是消费策略)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅主题的哪些Topic,及标签
        consumer.subscribe("order_topic", "*");
        // 注册监听器
        consumer.registerMessageListener((MessageListenerConcurrently)
                (msgs, context) -> {
                    try {
                        // 获取Message
                        Message msg = msgs.get(0);
                        //获取事务id,并从redis获取事务的状态,判断是否消费过。
                        String tid = msg.getTransactionId();
                        System.out.printf("%s Receive New Messages: %s %n",
                                Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                        String topic = msg.getTopic();
                        String body = new String(msg.getBody(), "utf-8");
                        // 标签
                        String tags = msg.getTags();
                        String keys = msg.getKeys();
                        System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                });
        consumer.start();
        System.out.println("Consumer Listener");
    }
 
}
(5)通过生产者发送事务消息
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class PayController {
    @Autowired
    private PayProducer payProducer;
    private static final String topic = "order_topic";
 
    @GetMapping("/pay")
    @ResponseBody
    public void callback(int type) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        // 创建消息  主题   二级分类   消息内容好的字节数组
        Message message = new Message(topic, "taga", ("rocketMQ").getBytes());
        //调用TransactionMQProducer的sendMessageInTransaction事务方法,并获取发送结果
        SendResult send = payProducer.getProducer().sendMessageInTransaction(message,type);
    }
 
}
二.TransactionMQProducer、TransactionListener源码阅读
1.TransactionMQProducer,可以看出继承DefaultMQProducer类,核心方法是sendMessageInTransaction(Message msg,Object org)

public class TransactionMQProducer extends DefaultMQProducer {
    private TransactionListener transactionListener;
    private ExecutorService executorService;
 
    public TransactionMQProducer() {
    }
 
    public TransactionMQProducer(String producerGroup) {
        super(producerGroup);
    }
 
    public TransactionMQProducer(String producerGroup, RPCHook rpcHook) {
        super(producerGroup, rpcHook);
    }
 
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.initTransactionEnv();
        super.start();
    }
 
    public void shutdown() {
        super.shutdown();
        this.defaultMQProducerImpl.destroyTransactionEnv();
    }
 
    public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", (Throwable)null);
        } else {
            return this.defaultMQProducerImpl.sendMessageInTransaction(msg, this.transactionListener, arg);
        }
    }
 
    public TransactionListener getTransactionListener() {
        return this.transactionListener;
    }
 
    public void setTransactionListener(TransactionListener transactionListener) {
        this.transactionListener = transactionListener;
    }
 
    public ExecutorService getExecutorService() {
        return this.executorService;
    }
 
    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }
}
2.DefaultMQProducer.sendMessageInTransaction方法具体实现,

  (1)可以看出先获取消息发送到broker的状态(可以理解成半状态,等待本地事务执行结果,执行commit或在rollback),没有成功则返回事务状态设置为LocalTransactionState.ROLLBACK_MESSAGE;发送成功,则获取消息事务的ID(每个消息都会由一个transactionId)

  (2)接着调用TransactionListener.executeLocalTransaction方法(上面已经沾贴了事务监听器的实现代码),执行本地事务。

/**
 * 本地事务执行会有三种可能
 * 1、commit 成功 ,对应 return LocalTransactionState.COMMIT_MESSAGE;
 * 2、Rollback 失败,对应 return LocalTransactionState.ROLLBACK_MESSAGE;
 * 3、网络等原因服务宕机收不到返回结果,对应return LocalTransactionState.UNKNOW;
 */
(3)调用this.endTransaction(sendResult, localTransactionState, localException)方法,将本地事务执行的状态、消息发送的状态、异常信息,一起发送给broker,broker会根据提交的信息,确定是否确定投递消息、删除消息、或在回查本地事务。

更多详细信息,请查看事务消息

public TransactionSendResult sendMessageInTransaction(Message msg, TransactionListener tranExecuter, Object arg) throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", (Throwable)null);
        } else {
            Validators.checkMessage(msg, this.defaultMQProducer);
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
 
            try {
                sendResult = this.send(msg);
            } catch (Exception var10) {
                throw new MQClientException("send message Exception", var10);
            }
 
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            switch(sendResult.getSendStatus()) {
            case SEND_OK:
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
 
                    String transactionId = msg.getProperty("UNIQ_KEY");
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
 
                    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
 
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        this.log.info(msg.toString());
                    }
                } catch (Throwable var9) {
                    this.log.info("executeLocalTransactionBranch exception", var9);
                    this.log.info(msg.toString());
                    localException = var9;
                }
                break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }
 
            try {
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception var8) {
                this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
            }
 
            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }
    }
 
  //将本地事务执行情况,及消息发送状态,异常信息,一起发送给broker,broker根据提交的信息,确定是否确定投递消息、删除消息、或在回查本地事务
 
    public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
 
        String transactionId = sendResult.getTransactionId();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch(localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(8);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(12);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(0);
        }
 
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
    }
 
————————————————
转自:https://blog.csdn.net/y532798113/article/details/111285548

举报

相关推荐

0 条评论