关于分布式事务的解决方案被提到最多的就是“TCC”、“基于消息”。但其实还有一种非常经典的解决方案:JTA。而 Atomikos 就是 XA、JTA 那一套的实现,属于 2PC。
之前在项目中使用 Atomikos 主要是用来解决单服务多数据源的场景,当然它也可以解决跨服务之间的分布式事务问题。本文主要是从源码的角度简要分析一下 Atomikos 的实现。
#Atomikos+Spring Boot+MyBatis+Druid 的使用
在网上找了一个 Atomikos+Spring Boot+MyBatis 的使用例子,然后修改了一下,本文使用的源码地址:https://gitee.com/dongguabai/blog/tree/master/atomikos。
这里简单介绍下使用 Atomikos 时的主要配置。
多数据源配置:
spring:
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
druid:
...
可以看到这里使用的是 DruidXADataSource,而不是我们常使用的 DruidDataSource。我这个项目中有两个数据源:systemDB 和 businessDB。
数据源和事务管理器的配置:
/**
* @author Dongguabai
* @description atomikos配置
* @date 2021-09-23 23:07
*/
@Configuration
public class AtomikosDruidConfig {
@Bean(name = "systemDataSource")
@Primary
@Autowired
public DataSource systemDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.systemDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("systemDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Autowired
@Bean(name = "businessDataSource")
public AtomikosDataSourceBean businessDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.businessDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("businessDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
/**
* 注入事物管理器
* @return
*/
@Bean(name = "xatx")
public JtaTransactionManager regTransactionManager () {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
}
可以看到数据源使用的是 AtomikosDataSourceBean,根据 JDBC 规范,自然 AtomikosDataSourceBean 是 DataSource 的实现。事务管理器使用的是 JtaTransactionManager ,根据Spring 的实现标准,自然 JtaTransactionManager 是 PlatformTransactionManager 的实现。
使用示例:
/**
* @author Dongguabai
* @description jtaService
* @date 2021-09-22 17:25
*/
@Service
public class JtaService {
@Autowired
private TService tService;
@Autowired
private T2Service t2Service;
//@Transactional(propagation = Propagation.REQUIRED, rollbackFor = { RuntimeException.class })
@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { RuntimeException.class })
public Object test01() {
tService.insertId(1);
//t2Service.insertId2("abc");
t2Service.insertId2(1);
int i = 0/0;
return "OK";
}
}
使用也很简单,首先指定了事务管理器是配置的 JtaTransactionManager,然后在一个被 @Transactional 标注的方法中分别往两个数据源中都插入了数据,然后整个方法最后会抛出一个异常。
运行项目,可以发现当打开第 21 行的异常代码,两个数据源的事务操作都会回滚,而注释掉异常代码,那么两个数据源的事务操作都会正常提交,事务生效。
源码分析
接下来就从源码的角度对 Atomikos 进行简要的分析。而 JTA 是依赖数据库对于 XA 规范的实现来处理分布式事务的,比如我这里使用的 MySQL,那么 MySQL 就肯定会提供实现 XA 规范的类库,所以先看下 MySQL 的 XA 语法。
MySQL 的 XA 语法
MySQL XA 事务基本语法
-
XA {START|BEGIN} xid [JOIN|RESUME] 启动一个 XA 事务 (xid 必须是一个唯一值; [JOIN|RESUME] 字句不被支持) -
XA END xid [SUSPEND [FOR MIGRATE]] 结束一个XA事务 ( [SUSPEND [FOR MIGRATE]] 字句不被支持) -
XA PREPARE xid 准备 -
XA COMMIT xid [ONE PHASE] 提交XA事务 -
XA ROLLBACK xid 回滚XA事务 -
XA RECOVER 查看处于 PREPARE 阶段的所有XA事务
XA 事务状态流转过程
- 使用
XA START 启动一个 XA 事务,并把它置为 ACTIVE 状态。 - 对一个
ACTIVE XA 事务,发布构成事务的 SQL 语句,然后发布一个 XA END 语句,XA END 把事务置为 IDLE 状态。 - 对一个
IDLE XA 事务, 发布一个 XA PREPARE 语句或者一个 XA COMMIT ... ONE PHASE 语句: 前者把事务置为 PREPARE 状态,此时 XA RECOVER 语句的输出包含事务的 xid 值(XA RECOVER 语句会列出所有处于 PREPARE 状态的 XA 事务); 后者用于预备和提交事务,不会被 XA RECOVER 列出,因为事务已经终止。 - 对一个
PREPARE XA 事务,根据执行情况,可以发布一个 XA COMMIT 语句来提交和终止事务,或者发布一个 XA ROLLBACK 来回滚并终止事务。
可以看到这是一个典型的 2PC 的过程。
源码分析
首先会进入 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction 方法:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
...
if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
...
} else {
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
result = null;
try {
result = invocation.proceedWithInvocation();
} catch (Throwable var16) {
this.completeTransactionAfterThrowing(txInfo, var16);
throw var16;
} finally {
this.cleanupTransactionInfo(txInfo);
}
this.commitTransactionAfterReturning(txInfo);
return result;
}
}又会调用 org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary 方法去创建事务:
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//这里的tm就是我们配置的JtaTransactionManager
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}然后又会调用 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction 方法:
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
...
try {
boolean newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启事务
this.doBegin(transaction, (TransactionDefinition)definition);
this.prepareSynchronization(status, (TransactionDefinition)definition);
return status;
} catch (RuntimeException var7) {
this.resume((Object)null, suspendedResources);
throw var7;
} catch (Error var8) {
this.resume((Object)null, suspendedResources);
throw var8;
}
}
}而在这个方法中会调用 org.springframework.transaction.jta.JtaTransactionManager#doBegin 方法开启事务:
protected void doBegin(Object transaction, TransactionDefinition definition) {
JtaTransactionObject txObject = (JtaTransactionObject)transaction;
try {
this.doJtaBegin(txObject, definition);
}
...
}会调用到 com.atomikos.icatch.jta.TransactionManagerImp#begin(int) 方法,然后在 com.atomikos.icatch.imp.BaseTransactionManager#createCompositeTransaction 中获取 CompositeTransaction:
public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
{
Stack errors = new Stack();
CompositeTransaction ct = null , ret = null;
//获取CompositeTransaction
ct = getCurrentTx ();
if ( ct == null ) {
ret = service_.createCompositeTransaction ( timeout );
if(LOGGER.isInfoEnabled()){
LOGGER.logInfo("createCompositeTransaction ( " + timeout + " ): "
+ "created new ROOT transaction with id " + ret.getTid ());
}
} else {
if(LOGGER.isInfoEnabled()) LOGGER.logInfo("createCompositeTransaction ( " + timeout + " )");
ret = ct.getTransactionControl ().createSubTransaction ();
}
Thread thread = Thread.currentThread ();
setThreadMappings ( ret, thread );
return ret;
}
而 CompositeTransaction 实际上是被存到了一个全局 Map 中,key 是当前线程:
private Map<Thread, Stack<CompositeTransaction>> threadtotxmap_ = null;
这里的 CompositeTransaction 就是一个分布式事务对象。
同时也有日志打印出来:
01:12:22.268 logback [http-nio-8080-exec-1] DEBUG c.a.icatch.imp.TransactionServiceImp - Creating composite transaction: 127.0.0.1.tm0000100007
01:12:23.458 logback [http-nio-8080-exec-1] INFO c.a.i.imp.BaseTransactionManager - createCompositeTransaction ( 10000 ): created new ROOT transaction with id 127.0.0.1.tm0000100007
但是到这里还是没有找到关键的 Connection,代码接着往下走,直到执行对数据库的事务操作代码:
tService.insertId(1);
先抛开分布式事务不谈,如果我们的项目使用了 MyBatis+Druid 的话,肯定有这么一个大致流程:Mapper 通过 SqlSessionFactory 从 Druid 的 DataSource 中通过获取 Connection。
代码接着会走到 com.atomikos.jdbc.AbstractDataSourceBean#getConnection() 方法,又会调用到 com.atomikos.jdbc.AtomikosXAPooledConnection#doCreateConnectionProxy 方法:
protected Reapable doCreateConnectionProxy ( HeuristicMessage hmsg ) throws CreateConnectionException
{
if ( LOGGER.isDebugEnabled() ) LOGGER.logDebug ( this + ": creating connection proxy..." );
JdbcConnectionProxyHelper.setIsolationLevel ( connection , getDefaultIsolationLevel() );
return AtomikosConnectionProxy.newInstance ( connection , sessionHandleState , hmsg );
}
发现是用 JDK 动态代理构造了 com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl 对象(是 Reapable 的实现),它里面就封装了数据库的 Connection(即通过动态代理扩展了 Connection 的行为,从而与 MySQL 的 XA 指令结合起来,可以通过 com.atomikos.jdbc.AtomikosConnectionProxy#invoke 方法查看)。
代码执行到这里,已经获得了 Connection,根据 MySQL 对 XA 的支持,可以猜想在执行事务操作之前肯定要执行 START。
代码继续执行,走到 com.atomikos.jdbc.AtomikosConnectionProxy#invoke 方法:
public Object invoke ( Object proxy, Method method, Object[] args ) throws SQLException
{
//这里的methodName就是getAutoCommit
final String methodName = method.getName();
boolean jtaTxFound = false;
if (CLOSE_METHODS.contains(methodName) && args == null ) {
...
}
else {
try {
if ( LOGGER.isInfoEnabled() ) LOGGER.logInfo ( this + ": calling " + formatCallDetails(method,args) + "...");
//methodName就是getAutoCommit,这里返回的true
ret = method.invoke(delegate, args);
} catch (Exception ex) {
sessionHandleState.notifySessionErrorOccurred();
JdbcConnectionProxyHelper.convertProxyError ( ex , "Error delegating '" + methodName + "' call" );
}
}
...
return ret;
}
代码接着往下走,又会走到 com.atomikos.jdbc.AtomikosConnectionProxy#invoke 方法,此时 methodName 是 prepareStatement。然后会进入 com.atomikos.jdbc.AtomikosConnectionProxy#enlist 方法,断点一步步往下执行,再观察控制台打印出来的日志:

可以发现当执行了 com.atomikos.datasource.xa.session.SessionHandleState#notifyBeforeUse 方法的时候,会触发 START 指令(START 指令是在 com.atomikos.datasource.xa.XAResourceTransaction#resume 方法中执行的)。
02:04:51.231 logback [http-nio-8080-exec-3] INFO c.a.d.xa.XAResourceTransaction - XAResource.start ( 3132372E302E302E312E746D30303030333030303037:3132372E302E302E312E746D35 , XAResource.TMNOFLAGS ) on resource systemDB represented by XAResource instance com.mysql.jdbc.jdbc2.optional.MysqlXAConnection@6e001cdc
接下来会走到 com.atomikos.datasource.xa.SiblingMapper#findOrCreateBranchForTransaction 方法:

这里可以看到 Atomikos 在实现分布式事务的时候,CompositeTansaction 代表一个分布式事务,分布式事务会由多个 XAResourceTransaction 子事务组成。
02:26:20.288 logback [http-nio-8080-exec-4] INFO c.a.i.imp.CompositeTransactionImp - addParticipant ( XAResourceTransaction: 3132372E302E302E312E746D30303030343030303037:3132372E302E302E312E746D37 ) for transaction 127.0.0.1.tm0000400007
接下来当要执行下一个库的事务操作:
t2Service.insertId2(1);
又会把上一个库中事务的 相关操作再走一遍:
02:28:23.831 logback [http-nio-8080-exec-4] INFO c.a.i.imp.CompositeTransactionImp - addParticipant ( XAResourceTransaction: 3132372E302E302E312E746D30303030343030303037:3132372E302E302E312E746D38 ) for transaction 127.0.0.1.tm0000400007
02:28:26.274 logback [http-nio-8080-exec-4] DEBUG c.a.d.xa.XAResourceTransaction - XAResourceTransaction: 3132372E302E302E312E746D30303030343030303037:3132372E302E302E312E746D38: about to switch to XAResource com.mysql.jdbc.jdbc2.optional.MysqlXAConnection@79631853
02:28:26.281 logback [http-nio-8080-exec-4] DEBUG c.a.d.xa.XAResourceTransaction - XAResourceTransaction 127.0.0.1.tm0000400007127.0.0.1.tm8: switched to XAResource com.mysql.jdbc.jdbc2.optional.MysqlXAConnection@79631853
02:28:27.176 logback [http-nio-8080-exec-4] INFO c.a.d.xa.XAResourceTransaction - XAResource.start ( 3132372E302E302E312E746D30303030343030303037:3132372E302E302E312E746D38 , XAResource.TMNOFLAGS ) on resource businessDB represented by XAResource instance com.mysql.jdbc.jdbc2.optional.MysqlXAConnection@79631853
也就是对每个数据源连接都会执行 START 指令。
接下来会走到异常代码:
int i = 0/0;
此时会进入org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback 方法,又会执行到 com.atomikos.datasource.xa.session.SessionHandleState#notifySessionClosed 方法:

可以看到这里会挨个将事务进行结束,这里为啥只有一个呢,明明应该有两个。这是因为 allContexts 是一个 SessionHandleState 的:
public class SessionHandleState
{
private TransactionContext currentContext;
private Set<TransactionContext> allContexts;
}
而一个 SessionHandleState 与一个 AtomikosXAPooledConnection 对应:
public class AtomikosXAPooledConnection extends AbstractXPooledConnection
{
private SessionHandleState sessionHandleState;
}
在执行了 com.atomikos.datasource.xa.session.BranchEnlistedStateHandler#sessionClosed 方法后,可以发现打印出了执行 END 指令的日志:
03:03:28.645 logback [http-nio-8080-exec-5] INFO c.a.d.xa.XAResourceTransaction - XAResource.end ( 3132372E302E302E312E746D30303030353030303038:3132372E302E302E312E746D39 , XAResource.TMSUCCESS ) on resource systemDB represented by XAResource instance com.mysql.jdbc.jdbc2.optional.MysqlXAConnection@24a1647e
当然 com.atomikos.datasource.xa.session.SessionHandleState#notifySessionClosed 也会被执行两次,因为每一个事务都要关闭。
正常理解应该是在 com.atomikos.icatch.imp.ActiveStateHandler#prepare 方法中会去校验所有事务操作的的 prepare 执行结果(只要有一个失败就回滚),从而选择是 commit 还是 rollback,但是当异常回滚的时候我发现程序并未断点调用到这个方法,不过这块先不纠结了。
到这里其实后面的源码都没必要继续跟了,首先整个分布式事务的状态流转都是与 MySQL XA 事务的指令执行一致的。核心方法都在 XAResourceTransaction 中,指令的执行都在 XAResource 中,方法命令也与 XA 指令一致,如 START 指令就对应 javax.transaction.xa.XAResource#start 方法;END 指令就对应 javax.transaction.xa.XAResource#end 方法。所有指令的流转都可以在日志中通过 XAResource.* 查看,整体流程还是比较清晰的。
我这里调试的结果:
分布式事务提交成功指令状态流转:
XAResource.start->XAResource.end->XAResource.prepare->XAResource.commit
分布式事务异常回滚指令状态流转:
XAResource.start->XAResource.end->XAResource.rollback
总结
- Atomikos 在 DTP 模型中属于 TM,承接 AP 事务交互,同时通过 MySQL 对 XA 的支持协调各个 RM;
- Atomikos 更适合单服务多数据源的场景,虽然我们可以将其扩展成支持解决跨服务的分布式事务问题,但是扩展过程中分布式这块都得我们自己实现,比较复杂;
- Atomikos 性能不太好,也有单机问题,没有日志追溯,这也是 2PC 的典型问题;
- Atomikos 通过动态代理的方式扩展了
Connection 的行为,从而与 MySQL 的 XA 指令结合起来;
References
- https://dev.mysql.com/doc/refman/5.7/en/xa-statements.html











