seata源码解析<1>

阅读 70

2022-05-03

在配置seata 服务的时候 需要配置一些db(表)

在这里插入图片描述

并且在 对应的业务库中加入 undo_log  这个表  
然后开启seata 服务
在发起调用方配置个@GlobalTransactional  这个注解  seata-server 就可以帮我们做到分布式事务的管理
呢么他是怎么做到的呢

两阶段提交的第一个阶段是 只执行 不提交事务,但是seata 的核心是既提交又回滚

还是这个图,比如说我user 服务 事务提交成功了  order 服务事务提交失败了 我该怎么做到全局事务的
回滚操作呢


在这里插入图片描述

比方说我user 执行
update user set name ='zhangsan' where userId=1  这是我此时的业务代码

#他在执行之前会查询一次 ---->前置快照
select  * from user  where userId=1  ---> 查询结果 name = lisi  
#业务sql ------------>业务sql
update user set name ='zhangsan' where userId=1  
#他在执行之后会查询一次 ---->后置快照
select  * from user  where userId=1   查询结果 name = zhangsan

他会在执行业务sql 之前查询一次当前的sql 生成前置快照 ,并且在执行业务sql 之后再查询一次生成后置快照  ,然后再吧前置快照和后置快照保存在seata的undo.log 表中
回头要是出现异常了 我再根据前置快照和后置快照 生成反向sql 再吧数据改回来
 
  try {

            // 1. begin transaction  开启事务
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {
			// 执行业务
                 rs = business.execute();
			} catch (Throwable ex) {
			 // 3. 业务异常  回滚 事务
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }
					 // 3. 业务z正常提交事务
            commitTransaction(tx);
			  return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
我们学习事务的时候就是
1,开启事务
2.执行业务
3.业务执行成功提交事务
4.业务执行异常回滚事务
    /**
     * 保存
     */
    @RequestMapping("/save")
    
    @GlobalTransactional
         public R save(){
        String id = String.format("%04d", new Random().nextInt(9999));
        UserEntity userEntity = new UserEntity();
        Integer integer = Integer.valueOf(id);
        userEntity.setAge(integer);
        userEntity.setUsername(id);
        userService.save(userEntity);
        OrderEntity orderEntity = new OrderEntity();
         orderEntity.setUserId(id);
        orderEntity.setCommodityCode(id);
        // 模拟调用其他服务进行保存 因为此时 我没有其他微服务
        orderFeignService.saveFirst(orderEntity);
        orderFeignService.save(orderEntity);

        return R.ok();
    }
    还是上篇文章的这段代码  user服务调用Order 服务
    

在这里插入图片描述

seata是一个单独的项目或者说是单独的包 
1.我们要开启全局事务 的告诉seata 服务端开启全局事务
2.执行业务这里的业务不就是我们对应的sql 么
3,执行业务sql 如果没有异常 就全局提交
4,执行业务sql 如果有异常的话 就全局回滚

我们想一下seata的功能,或者基于功能大致想一下seata 的接口文档 
1.接受请求 开启全局事务的功能
2.接受分支事务注册
2.接受分支事务上报事务执行情况(成功/失败 )
3.通知或者下发 (我的通知你们是全局提交或者全局回滚)
顺序


在这里插入图片描述

所以基于seata 无非就是这些功能

在这里插入图片描述
在这里插入图片描述

我们seata加入个pom文件 加入个GlobalTransactional  注解放在发起方 就可以用了

我们看下他这个pom的spring.factory中的GlobalTransactionAutoConfiguration

这里通过@Bean 往容器中放了一个 GlobalTransactionScanner

他这个GlobalTransactionScanner  实现了AbstractAutoProxyCreator,InitializingBean 这两个接口(基于spring生命周期的接口我们以后的记住 )
AbstractAutoProxyCreator这个 实现了后置处理器 他是spring的aop和事务的顶层抽象父类

AOP 是通过AbstractAutoProxyCreator.postProcessAfterInitialization 这个方法生成的代理类

io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary
所以当我们创建这个bean的时候就会走到这个方法中 他大概就是说如果我们的bean的方法上面有GlobalTransactional
这个注解的时候  他创建这个bean 的时候会植入一个拦截器
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

在这里插入图片描述

当我们 localhost:8080/save 的时候 他这个拦截器 就像aop 一样对我们的方法进行增强,
具体增强的逻辑就在这个invoke方法上	
/ 如果有这个注解的话
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);

在这里插入图片描述

我们点进去
io.seata.tm.api.TransactionalTemplate#execute



    /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction  生成一个对象
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction  开启全局事务
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

这个不就是执行事务的执行流程吗
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeBegin();
            // 我们主要看这里 开启全局事务
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }
开启全局事务的相关方法
  @Override
    public void begin(int timeout, String name) throws TransactionException {
       表明当前是全局事务的发起者(Launcher)还是参与者(Participant)
        if (role != GlobalTransactionRole.Launcher) {
            check();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid != null) {
            throw new IllegalStateException();
        }
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        // 开启全局事务  然后 生成一个xid  这里是通过netty 发送的数据
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [" + xid + "]");
        }

    }
我们看一下服务端接受到请求会怎么处理

在这里插入图片描述

他的main 方法中我们就知道 这个类可以处理rpc信息
 DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer); 

在这里插入图片描述

当一个请求发过来的时候 我们可以看看他服务端是怎么处理的

在这里插入图片描述我们看看这里
在这里插入图片描述

  @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // 基于条件 --->创建一个对象
        GlobalSession session = GlobalSession.createGlobalSession(
            applicationId, transactionServiceGroup, name, timeout);
        // 添加一个监听器 为什么SessionManager
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 开启全局事务
        session.begin();

        //transaction start event
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));

        LOGGER.info("Successfully begin global transaction xid = {}", session.getXid());
        return session.getXid();
    }

1. 这里基于条件创建了一个对象
2. 这里添加了一个监听器 我们就得明白可能是事件监听模式(我们此时就是db模式)


    @Override
    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;

        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
         	//这里得lifecycleListener  就是一步得DataBaseSessionManager
            lifecycleListener.onBegin(this);
        }
    }
io.seata.core.store.db.LogStoreDataBaseDAO#insertGlobalTransactionDO
我们一直追到这里 这里不就是给表中进行insert 么   这里得insert 就是插入到了 global_table
这个表中


也就是说 当我们客户端 向服务端发起[开启全局事务] 得请求得时候
服务端会给[global_table]  后面会有定时任务来删除这条数据的  默认是60s如果不删除会影响性能的
 表中记录一条数据 并且返回xid


在这里插入图片描述

我们再回到客户端的这个方法 因为这个方法是实现了InitializingBean 这个接口
这里主要初始化了
TM和RM 
这个TM和RM 是发请求用的  也就是和seata 交互用的

也就是说这个GlobalTransactionScanner 实现了 2个生命周期有关的接口,
一个是InitializingBean,---------------->初始化了2个客户端和seata-server交互
一个是AbstractAutoProxyCreator------------>给我们的类中植入了一个拦截器
当我们运行 localhost 的时候 他的invoke 就会生效
当我们开启了全局事务的话 我们开始注册分支事务
当我们执行业务sql的时候,不管任何ORM框架,底层对数据库的操作都封装了jdbc


在这里插入图片描述

我们来看看这里的执行方法
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
            @Override
            public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.execute();
            }
        });
    }


精彩评论(0)

0 0 举报