【Spring 源码深度解析】09 事务

阅读 64

2022-03-11

文章目录

1 JDBC 方式下的事务使用示例

1)创建数据表结构

CREATE TABLE user ( 
	id int(11) NOT NULL auto increment,
	name varchar (255) default NULL , 
	age int ( 11 ) default NULL , 
	sex varchar (255 ) default NULL , 
	PRIMARY KEY (id) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2)创建对应的 PO

public class User {

	private int id;
	private String name;
	private int age;
	private String sex;

	// 省略 getter 和 setter 方法
}

3)创建表与实体间的映射

public class UserRowMapper implements RowMapper<User> {

	@Override
	public User mapRow(ResultSet rs, int rowNum) throws SQLException {
		User person = new User(rs.getInt("id"), rs.getString("name"),
				rs.getInt("age"), rs.getString("sex"));
		return person;
	}
}

4)创建数据操作接口

public interface UserService {

	void save(User user);
}

5)创建数据操作接口实现类

public class UserServiceImpl implements UserService {

	private JdbcTemplate jdbcTemplate;

	/**
	 * 设置数据源
	 */
	public void setDataSource(DataSource dataSource) {
		jdbcTemplate = new JdbcTemplate(dataSource);
	}

	@Override
	public void save(User user) {
		dbcTemplate.update(
			"INSERT INTO `user` (`name`, age, sex) VALUES (?, ? ,?)",
			new Object[]{user.getName(), user.getAge(), user.getSex()}, 
			new int[]{Types.VARCHAR, Types.INTEGER, Types.VARCHAR});

		// 事务测试,加上这句代码则数据不会保存到数据库中
		throw new RuntimeException("aa");
	}
}

6)创建 Spring 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:tx="http://www.springframework.org/schema/tx"
	   xsi:schemaLocation="
	   http://www.springframework.org/schema/beans
	   http://www.springframework.org/schema/beans/spring-beans.xsd
	   http://www.springframework.org/schema/tx
	   http://www.springframework.org/schema/tool/spring-tx.xsd">

	<tx:annotation-driven transaction-manager="transactionManager" />
	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- 配置数据源-->
	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
		<property name="driverClass" value="com.mysql.jdbc.Driver" />
		<property name="jdbcUrl" value="jdbc:mysql://localhost:3306/study" />
		<property name="user" value="root" />
		<property name="password" value="123456" />
		<!-- 连接池启动的初始值-->
		<property name="initialPoolSize" value="1" />
		<!-- 连接池最大值-->
		<property name="maxPoolSize" value="300" />
		<!-- 连接池最小值-->
		<property name="minPoolSize" value="1" />
	</bean>

	<!-- 配置业 bean-->
	<bean id="userService" class="org.springframework.test.tx.UserServiceImpl">
		<property name="dataSource" ref="dataSource" />
	</bean>
</beans>

7)测试

public class Main {

	public static void main(String[] args) {
		ClassPathXmlApplicationContext factory = new ClassPathXmlApplicationContext("test/tx/bean.xml");
		UserService userService = (UserService) factory.getBean("userService");

		User user = new User();
		user.setName("李四");
		user.setAge(20);
		user.setSex("男");
		userService.save(user);
	}
}

默认情况下 Spring 中的事务处理只对 RuntimeException 方法进行回滚。如果抛出异常,这段保存的代码不会生效。

2 事务自定义标签

在 Spring 配置文件中的<tx:annotation-driven />配置是事务的开关,如果没有此处配置,那么 Spring 中将不存在事务的功能,需要从这个配置开始分析。

关键是TxNamespaceHandler#init方法。

public void init() {
	registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
	registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
	registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}

根据自定义标签的使用规则及上面的代码,可以知道,在遇到诸如<tx:annotation-driven />为开头的配置后,Spring 都会使用AnnotationDrivenBeanDefinitionParser类的parse方法进行解析。

public BeanDefinition parse(Element element, ParserContext parserContext) {
	registerTransactionalEventListenerFactory(parserContext);
	String mode = element.getAttribute("mode");
	// 如果使用 AspectJ
	if ("aspectj".equals(mode)) {
		// mode="aspectj"
		registerTransactionAspect(element, parserContext);
		if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) {
			registerJtaTransactionAspect(element, parserContext);
		}
	}
	else {
		// mode="proxy"
		AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
	}
	return null;
}	

在解析中存在对于 mode 属性的判断,根据代码需要使用 AspectJ 方式进行事务切入(Spring 中的书屋是以 AOP 为基础的),那么可以使用这个的配置:

<tx:annotation-driven transaction-manager="transactionManager" mode="aspectj"/>

2.1 注册 InfrastructureAdvisorAutoProxyCreator

以默认配置为例子进行分析,进入AopAutoProxyConfigurer#configureAutoProxyCreator的方法进行解析:

public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
	// 注册 InfrastructureAdvisorAutoProxyCreator
	AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

	// 事务增强器的名称 org.springframework.transaction.config.internalTransactionAdvisor
	String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
	if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
		Object eleSource = parserContext.extractSource(element);

		// Create the TransactionAttributeSource definition.
		// 创建 TransactionAttributeSource 的 bean
		RootBeanDefinition sourceDef = new RootBeanDefinition(
				"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
		sourceDef.setSource(eleSource);
		sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);

		// Create the TransactionInterceptor definition.
		// 创建 TransactionInterceptor 的 bean
		RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
		interceptorDef.setSource(eleSource);
		interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		registerTransactionManager(element, interceptorDef);
		interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
		String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);

		// Create the TransactionAttributeSourceAdvisor definition.
		// 创建 BeanFactoryTransactionAttributeSourceAdvisor 的 bean
		RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
		advisorDef.setSource(eleSource);
		advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		// 将 sourceName 的 bean 注入 advisorDef 的 transactionAttributeSource 属性中
		advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
		// 将 interceptorName 的 bean 注入 advisorDef 的 adviceBeanName 属性中
		advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
		// 如果配置了 order 属性
		if (element.hasAttribute("order")) {
			advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
		}
		//注册 beanDefinition
		parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);

		// 创建 CompositeComponentDefinition
		CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
		compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
		compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
		compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
		parserContext.registerComponent(compositeDef);
	}
}

以上代码注册了 3 个 bean,TransactionAttributeSourceTransactionInterceptorBeanFactoryTransactionAttributeSourceAdvisor。这 3 个 bean 支撑了整个的事务功能,那么这 3 个 bean 是怎么组织起来的呢?

其中TransactionAttributeSourceTransactionInterceptor作为属性注入到一个名为advisorDef的 bean 中,advisorDef使用BeanFactoryTransactionAttributeSourceAdvisor作为其 class 属性。也就是说BeanFactoryTransactionAttributeSourceAdvisor代表这当前 bean,如下图所示。.
在这里插入图片描述

还有上述方法的第一句代码注册了一个 InfrastructureAdvisorAutoProxyCreator 类型的 bean。

AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

进入该函数。

/**
 * 注册 InfrastructureAdvisorAutoProxyCreator
 */
public static void registerAutoProxyCreatorIfNecessary(
		ParserContext parserContext, Element sourceElement) {

	BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
			parserContext.getRegistry(), parserContext.extractSource(sourceElement));
	useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
	registerComponentIfNecessary(beanDefinition, parserContext);
}

public static BeanDefinition registerAutoProxyCreatorIfNecessary(
	BeanDefinitionRegistry registry, @Nullable Object source) {

	return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

查看InfrastructureAdvisorAutoProxyCreator的类层次结构。
在这里插入图片描述
从该结构中可以看出其间接实现了 BeanPostProcessor 接口,所有所有 bean 实例化都会调用其postProcessAfterInitialization方法,在其父类AbstractAutoProxyCreator中实现。

public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
	if (bean != null) {
		// 根据给定的 bean 的 class 和 name 构建出一个 key, 格式 beanClassName_beanName
		Object cacheKey = getCacheKey(bean.getClass(), beanName);
		if (this.earlyProxyReferences.remove(cacheKey) != bean) {
			// 如果它合适被代理,就需要封装指定 bean
			return wrapIfNecessary(bean, beanName, cacheKey);
		}
	}
	return bean;
}

这里实现的主要目的是对指定 bean 进行封装,检测以及封装的工作委托给 wrapIfNecessary 方法实现。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
	// 如果已经处理过
	if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
		return bean;
	}
	// 无须增强
	if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
		return bean;
	}
	// 给定的 bean 类是否代表一个基础设施类,基础设施类不应代理,或者配置了指定 bean 不需要自动代理
	if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
		this.advisedBeans.put(cacheKey, Boolean.FALSE);
		return bean;
	}

	//EmbeddedValueResolverAware
	// Create proxy if we have advice.
	// 如果存在增强方法则创建代理
	// Advisor @Before()等
	Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
	// 如果获取到了增强则需要针对增强类创建代理
	if (specificInterceptors != DO_NOT_PROXY) {
		this.advisedBeans.put(cacheKey, Boolean.TRUE);
		// 创建代理
		Object proxy = createProxy(
				bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
		this.proxyTypes.put(cacheKey, proxy.getClass());
		return proxy;
	}

	this.advisedBeans.put(cacheKey, Boolean.FALSE);
	return bean;
}

上述方法的主要逻辑为:

  • 找出指定 bean 对应的增强器。
  • 根据找出的增强器创建代理。

2.2 获取对应 class/method 的增强器

获取对应的增强器,即在getAdvicesAndAdvisorsForBean方法中,不但要找出增强器,还需要判断增强器是否满足要求。

// AbstractAdvisorAutoProxyCreator.java
protected Object[] getAdvicesAndAdvisorsForBean(
	Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) {
	// 查找合适的增强器
	List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName);
	if (advisors.isEmpty()) {
		return DO_NOT_PROXY;
	}
	return advisors.toArray();
}

protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
	// 获取所有的候选增强器
	List<Advisor> candidateAdvisors = findCandidateAdvisors();
	// 筛选出合适当前 bean 的 advisors
	List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
	extendAdvisors(eligibleAdvisors);
	if (!eligibleAdvisors.isEmpty()) {
		eligibleAdvisors = sortAdvisors(eligibleAdvisors);
	}
	return eligibleAdvisors;
}

上述方法,Spring 将任务进行了拆分,分成了获取所有增强器与增强器是否匹配两个功能点。

1)寻找候选增强器
方法findCandidateAdvisors完成了该功能。

// AbstractAdvisorAutoProxyCreator.java
protected List<Advisor> findCandidateAdvisors() {
	Assert.state(this.advisorRetrievalHelper != null, "No BeanFactoryAdvisorRetrievalHelper available");
	return this.advisorRetrievalHelper.findAdvisorBeans();
}

// BeanFactoryAdvisorRetrievalHelper.java
public List<Advisor> findAdvisorBeans() {
	// Determine list of advisor bean names, if not cached already.
	String[] advisorNames = this.cachedAdvisorBeanNames;
	if (advisorNames == null) {
		// Do not initialize FactoryBeans here: We need to leave all regular beans
		// uninitialized to let the auto-proxy creator apply to them!
		// 获取所有对应 Advisor.class 的类,因为前面注册的 BeanFactoryTransactionAttributeSourceAdvisor
		// 也实现了 Advisor 接口,在获取所有增强器时也会将此 bean 提取出来,随其他增强器一起在后续的步骤中被织入
		advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
				this.beanFactory, Advisor.class, true, false);
		this.cachedAdvisorBeanNames = advisorNames;
	}
	if (advisorNames.length == 0) {
		return new ArrayList<>();
	}

	List<Advisor> advisors = new ArrayList<>();
	for (String name : advisorNames) {
		if (isEligibleBean(name)) {
			if (this.beanFactory.isCurrentlyInCreation(name)) {
				if (logger.isTraceEnabled()) {
					logger.trace("Skipping currently created advisor '" + name + "'");
				}
			}
			else {
				try {
					advisors.add(this.beanFactory.getBean(name, Advisor.class));
				}
				catch (BeanCreationException ex) {
					Throwable rootCause = ex.getMostSpecificCause();
					if (rootCause instanceof BeanCurrentlyInCreationException) {
						BeanCreationException bce = (BeanCreationException) rootCause;
						String bceBeanName = bce.getBeanName();
						if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) {
							if (logger.isTraceEnabled()) {
								logger.trace("Skipping advisor '" + name +
										"' with dependency on currently created bean: " + ex.getMessage());
							}
							// Ignore: indicates a reference back to the bean we're trying to advise.
							// We want to find advisors other than the currently created bean itself.
							continue;
						}
					}
					throw ex;
				}
			}
		}
	}
	return advisors;
}

上述方法首先通过BeanFactoryUtils#beanNamesForTypeIncludingAncestors方法获取所有对应的Advisor.class类的 bean 名称。

public static String[] beanNamesForTypeIncludingAncestors(
			ListableBeanFactory lbf, Class<?> type, boolean includeNonSingletons, boolean allowEagerInit);

当得知增强器在容器中的 beanName 时,通过BeanFactory#getBean获取对应的实例。

<T> T getBean(String name, Class<T> requiredType) throws BeansException;

由于一开始 Spring 在解析<tx:annotation-driven />标签时,注册了一个BeanFactoryTransactionAttributeSourceAdvisor的 bean,而在此 bean 中又注入了另外两个 bean,此时该 bean 就会被开始使用。因为其同样实现了 Advisor 接口,那么在获取所有增强器的是否自然也会将此 bean 提取并随着其他增强器一起在后续步骤中被织入代理。

2)候选增强器找到匹配项
当找出对应增强器后,需要查找 class/method 是否匹配。

protected List<Advisor> findAdvisorsThatCanApply(
	List<Advisor> candidateAdvisors, Class<?> beanClass, String beanName) {

	ProxyCreationContext.setCurrentProxiedBeanName(beanName);
	try {
		// 过滤已经得到的 advisors
		return AopUtils.findAdvisorsThatCanApply(candidateAdvisors, beanClass);
	}
	finally {
		ProxyCreationContext.setCurrentProxiedBeanName(null);
	}
}

// AopUtils.java
public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) {
	if (candidateAdvisors.isEmpty()) {
		return candidateAdvisors;
	}
	List<Advisor> eligibleAdvisors = new ArrayList<>();
	// 首先处理引介增强 DeclareParents 注解标识的 DeclareParentsAdvisor
	for (Advisor candidate : candidateAdvisors) {
		if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) {
			eligibleAdvisors.add(candidate);
		}
	}
	boolean hasIntroductions = !eligibleAdvisors.isEmpty();
	for (Advisor candidate : candidateAdvisors) {
		// 引介增强已经处理
		if (candidate instanceof IntroductionAdvisor) {
			// already processed
			continue;
		}
		// 对于普通 bean 的处理
		if (canApply(candidate, clazz, hasIntroductions)) {
			eligibleAdvisors.add(candidate);
		}
	}
	return eligibleAdvisors;
}

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
	// DeclareParent
	if (advisor instanceof IntroductionAdvisor) {
		// classFilter = TypePatternClassFilter
		return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
	}
	// 普通 advisor
	else if (advisor instanceof PointcutAdvisor) {
		PointcutAdvisor pca = (PointcutAdvisor) advisor;
		return canApply(pca.getPointcut(), targetClass, hasIntroductions);
	}
	else {
		// It doesn't have a pointcut so we assume it applies.
		return true;
	}
}

当分析对于 UserService 是否适用于此增强方法,那么当前的 advisor 就是之前查找出来的BeanFactoryTransactionAttributeSourceAdvisor
在这里插入图片描述
从类的层次就够中可以看到 BeanFactoryTransactionAttributeSourceAdvisor 间接实现了 PointcutAdvisor,所以在第二个 if 判断时就会通过判断。会将 BeanFactoryTransactionAttributeSourceAdvisor 中的 getPointcut() 方法返回的参数继续调用 canApply 方法,而其返回的是 TransactionAttributeSourcePointcut 的实例。而对于 transactionAttributeSource 属性,是在解析自定义标签时注入的AnnotationTransactionAttributeSource

// BeanFactoryTransactionAttributeSourceAdvisor.java
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
	@Override
	@Nullable
	protected TransactionAttributeSource getTransactionAttributeSource() {
		return transactionAttributeSource;
	}
};

public Pointcut getPointcut() {
	return this.pointcut;
}

继续使用 transactionAttributeSource 类型的实例作为函数参数跟踪 canApply。

public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
	Assert.notNull(pc, "Pointcut must not be null");
	if (!pc.getClassFilter().matches(targetClass)) {
		return false;
	}

	// pc = TransactionAttributeSourcePointcut
	// 返回是自身
	MethodMatcher methodMatcher = pc.getMethodMatcher();
	if (methodMatcher == MethodMatcher.TRUE) {
		// No need to iterate the methods if we're matching any method anyway...
		return true;
	}

	IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
	if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
		introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
	}

	Set<Class<?>> classes = new LinkedHashSet<>();
	if (!Proxy.isProxyClass(targetClass)) {
		classes.add(ClassUtils.getUserClass(targetClass));
	}
	// 获取目标类所有实现的接口
	classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));

	for (Class<?> clazz : classes) {
		Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
		for (Method method : methods) {
			// 如果有一个方法匹配上就返回 true
			if (introductionAwareMethodMatcher != null ?
					introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
					methodMatcher.matches(method, targetClass)) {
				return true;
			}
		}
	}

	return false;
}

首先获取对应类的所有接口并连同类本身一起遍历,遍历过程中又对类中的方法再次遍历,一旦匹配成功便认为这个类适用于当前增强器。

调用 methodMatcher.matches(method, targetClass) 该方法进行匹配,会调用 TransactionAttributeSourcePointcut 类的 matches 方法。

// TransactionAttributeSourcePointcut.java
public boolean matches(Method method, Class<?> targetClass) {
	if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
			PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
			PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
		return false;
	}
	// 自定义标签解析时注入, AnnotationTransactionAttributeSource
	TransactionAttributeSource tas = getTransactionAttributeSource();
	return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

此时的 tas 表示 AnnotationTransactionAttributeSource 类型。

// AbstractFallbackTransactionAttributeSource.java
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
	if (method.getDeclaringClass() == Object.class) {
		return null;
	}

	// First, see if we have a cached value.
	// 尝试从缓存中获取
	Object cacheKey = getCacheKey(method, targetClass);
	TransactionAttribute cached = this.attributeCache.get(cacheKey);
	if (cached != null) {
		// Value will either be canonical value indicating there is no transaction attribute,
		// or an actual transaction attribute.
		if (cached == NULL_TRANSACTION_ATTRIBUTE) {
			return null;
		}
		else {
			return cached;
		}
	}
	else {
		// We need to work it out.
		// 提取事务标签
		TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
		// Put it in the cache.
		if (txAttr == null) {
			this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
		}
		else {
			String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
			if (txAttr instanceof DefaultTransactionAttribute) {
				((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
			}
			if (logger.isTraceEnabled()) {
				logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
			}
			this.attributeCache.put(cacheKey, txAttr);
		}
		return txAttr;
	}
}

上述方法首先此时从缓存中加载,如果没有找到就委托computeTransactionAttribute方法获取事务标签。

3)提取事务标签

// AbstractFallbackTransactionAttributeSource.java
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
	// Don't allow no-public methods as required.
	if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
		return null;
	}

	// The method may be on an interface, but we need attributes from the target class.
	// If the target class is null, the method will be unchanged.
	// method 代表接口中的方法,specificMethod 代表实现类中的方法
	Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

	// First try is the method in the target class.
	// 查看方法中是否存在事务声明
	TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
	if (txAttr != null) {
		return txAttr;
	}

	// Second try is the transaction attribute on the target class.
	// 查找方法所在的类中是否存在事务声明
	txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
	if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
		return txAttr;
	}

	// 如果存在接口,则到接口中去寻找
	if (specificMethod != method) {
		// Fallback is to look at the original method.
		// 查找接口的方法
		txAttr = findTransactionAttribute(method);
		if (txAttr != null) {
			return txAttr;
		}
		// Last fallback is the class of the original method.
		// 到接口中的类中去寻找
		txAttr = findTransactionAttribute(method.getDeclaringClass());
		if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
			return txAttr;
		}
	}

	return null;
}

事务属性的获取规则:

  1. 如果方法中存在事务属性,则使用方法上的属性。
  2. 否则使用方法所在类上的属性。
  3. 如果方法所在类上没有,则搜索接口中的方法。
  4. 最后尝试搜索接口的类上的属性。

其中真正的搜索事务属性任务委托给了findTransactionAttribute方法。

// AnnotationTransactionAttributeSource.java
protected TransactionAttribute findTransactionAttribute(Method method) {
	return determineTransactionAttribute(method);
}

protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
	// this.annotationParsers 是当前类 AnnotationTransactionAttributeSource 初始化的时候初始化的
	// 其中的值被加入了 SpringTransactionAnnotationParser
	for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
		TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
		if (attr != null) {
			return attr;
		}
	}
	return null;
}

this.annotationParsers 是当前类 AnnotationTransactionAttributeSource 初始化的时候初始化的,其中的值被加入了 SpringTransactionAnnotationParser,即进行属性获取时使用了SpringTransactionAnnotationParser#parseTransactionAnnotation方法解析。

public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
	// 获取 Transactional 注解信息
	AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
			element, Transactional.class, false, false);
	if (attributes != null) {
		return parseTransactionAnnotation(attributes);
	}
	else {
		return null;
	}
}

上述方法首先会判断当前类是否有Transactional注解,这是事务的基础,有的话继续调用parseTransactionAnnotation方法解析详细的属性。

protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
	RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

	// 解析 propagation
	Propagation propagation = attributes.getEnum("propagation");
	rbta.setPropagationBehavior(propagation.value());
	// 解析 isolation
	Isolation isolation = attributes.getEnum("isolation");
	rbta.setIsolationLevel(isolation.value());
	// 解析 timeout
	rbta.setTimeout(attributes.getNumber("timeout").intValue());
	// 解析 readOnly
	rbta.setReadOnly(attributes.getBoolean("readOnly"));
	// 解析 value
	rbta.setQualifier(attributes.getString("value"));

	List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
	// 解析 rollbackFor
	for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
		rollbackRules.add(new RollbackRuleAttribute(rbRule));
	}
	// 解析 rollbackForClassName
	for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
		rollbackRules.add(new RollbackRuleAttribute(rbRule));
	}
	// 解析 noRollbackFor
	for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
		rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
	}
	// 解析 noRollbackForClassName
	for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
		rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
	}
	rbta.setRollbackRules(rollbackRules);

	return rbta;
}

上面方法中实现了对对应类或者方法的事务属性解析,你会在这个类中看到任何你常用或者不常用的事务属性提取。

至此已经完成了事务标签的解析。一开始,我们的任务是找出某个增强器是否适合于对应的类,是否匹配的关键在于谁是否从指定的类或类中的方法找到对应的事务属性,已 UserServicemImpl 为例,已经在其接口 UserService 中找到了事务属性,所以,它是与事务增强器匹配的,也就是它会被事务功能修饰。

至此事务功能的初始化工作已结束,当判断某个 bean 适用于事务增强时,也就是适用于增强器BeanFactoryTractionAttributeSourceAdvisor

在代理类被调用时,会调用这个类的增强方法,就是 BeanFactoryTransactionAttributeSourceAdvisor 实例的 Advise,因为在 BeanFactoryTransactionAttributeSourceAdvisor 中注入了 TransactionInterceptor,所以在调用事务增强器的代理类时会首先执行 TransactionInterceptor 进行增强,同时在 TransactionInterceptor 实例中的 invoke 方法中完成了整个事务的逻辑。

3 事务增强器

TransactionInterceptor 这个类支撑整个事务功能的架构。TransactionInterceptor 实现了 MethodInterceptor,因此调用该类是从 invoke 方法开始的。

public Object invoke(MethodInvocation invocation) throws Throwable {
	// Work out the target class: may be {@code null}.
	// The TransactionAttributeSource should be passed the target class
	// as well as the method, which may be from an interface.
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

	// Adapt to TransactionAspectSupport's invokeWithinTransaction...
	return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

// TransactionAspectSupport.java
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
	final InvocationCallback invocation) throws Throwable {

	// If the transaction attribute is null, the method is non-transactional.

	TransactionAttributeSource tas = getTransactionAttributeSource();
	// 获取对应的事务属性
	final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
	// 获取 beanFactory 中的 transactionManager
	final PlatformTransactionManager tm = determineTransactionManager(txAttr);
	// 构造方法唯一标识(类.方法,如 service.UserServiceImpl.save)
	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

	// 声明式事务
	if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
		// Standard transaction demarcation with getTransaction and commit/rollback calls.
		// 创建事务信息
		TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

		Object retVal;
		try {
			// This is an around advice: Invoke the next interceptor in the chain.
			// This will normally result in a target object being invoked.
			// 执行被增强方法
			retVal = invocation.proceedWithInvocation();
		}
		catch (Throwable ex) {
			// target invocation exception
			// 异常回滚
			completeTransactionAfterThrowing(txInfo, ex);
			throw ex;
		}
		finally {
			// 清除信息
			cleanupTransactionInfo(txInfo);
		}
		// 提交事务
		commitTransactionAfterReturning(txInfo);
		return retVal;
	}
	// 编程式事务
	else {
		final ThrowableHolder throwableHolder = new ThrowableHolder();

		// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
		try {
			Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
				TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
				try {
					return invocation.proceedWithInvocation();
				}
				catch (Throwable ex) {
					if (txAttr.rollbackOn(ex)) {
						// A RuntimeException: will lead to a rollback.
						if (ex instanceof RuntimeException) {
							throw (RuntimeException) ex;
						}
						else {
							throw new ThrowableHolderException(ex);
						}
					}
					else {
						// A normal return value: will lead to a commit.
						throwableHolder.throwable = ex;
						return null;
					}
				}
				finally {
					cleanupTransactionInfo(txInfo);
				}
			});

			// Check result state: It might indicate a Throwable to rethrow.
			if (throwableHolder.throwable != null) {
				throw throwableHolder.throwable;
			}
			return result;
		}
		catch (ThrowableHolderException ex) {
			throw ex.getCause();
		}
		catch (TransactionSystemException ex2) {
			if (throwableHolder.throwable != null) {
				logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
				ex2.initApplicationException(throwableHolder.throwable);
			}
			throw ex2;
		}
		catch (Throwable ex2) {
			if (throwableHolder.throwable != null) {
				logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
			}
			throw ex2;
		}
	}
}

Sping 中支持声明式事务和编程式事务,以声明式事务为例。主要逻辑如下。

  1. 获取事务的属性 TransactionAttribute (@Transtractional)
  2. 加载配置文件中的 TransactionManager
  3. 不同的事务处理方式使用不同的逻辑
    声明式事务和编程式事务的区别在于,一是编程式事务是不需要事务属性的,第二点是 TransactionManager 的不同,CallbackPreferringPlatformTransactionManager 继承 PlatformTransactionManager 接口,暴露出一个方法用于执行事务处理中的回调。
  4. 在目标方法执行前获取事务并收集事务信息
    事务属性和事务信息并不相同,即 TransactionInfo 和 TransactionAttribute 不相同。TransactionInfo 包含 TransactionAttribute 以及 PlatformTransactionManager,TransactionStatus 等信息。
  5. 执行目标方法
  6. 出现异常,尝试异常处理
    默认只对 RuntimeException 异常进行回滚。
  7. 提交事务前的事务信息清理
  8. 提交事务

3.1 创建事务

事务的创建由createTransactionIfNecessary方法实现。

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
	@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

	// If no name specified, apply method identification as transaction name.
	// 如果没有名称则使用方法唯一标识,并使用 DelegatingTransactionAttribute 封装 txAttr
	if (txAttr != null && txAttr.getName() == null) {
		txAttr = new DelegatingTransactionAttribute(txAttr) {
			@Override
			public String getName() {
				return joinpointIdentification;
			}
		};
	}

	TransactionStatus status = null;
	if (txAttr != null) {
		if (tm != null) {
			// 获取 TransactionStatus
			status = tm.getTransaction(txAttr);
		}
		else {
			if (logger.isDebugEnabled()) {
				logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
						"] because no transaction manager has been configured");
			}
		}
	}
	//根据指定的属性于 status 准备一个 TransactionInfo
	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

上述方法主要逻辑:

  1. 使用 DelegatingTransactionAttribute 封装 TransactionAttribute
    当前的实际类型为 RuleBasedTransactionAttribute。
  2. 获取事务,核心任务
  3. 构建事务信息

3.1.1 获取事务

使用 getTransaction 方法来处理事务的准备工作。

// AbstractPlatformTransactionManager.java
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
	// 事务获取
	Object transaction = doGetTransaction();

	// Cache debug flag to avoid repeated checks.
	boolean debugEnabled = logger.isDebugEnabled();

	if (definition == null) {
		// Use defaults if no transaction definition given.
		definition = new DefaultTransactionDefinition();
	}
	// 判断当前线程是否存在事务,判断依据为当前线程记录的连接不为空且连接中(connectionHolder)中的 transactionActive 属性不为空
	if (isExistingTransaction(transaction)) {
		// Existing transaction found -> check propagation behavior to find out how to behave.
		// 当前线程已经存在事务
		return handleExistingTransaction(definition, transaction, debugEnabled);
	}

	// Check definition settings for new transaction.
	// 事务超时验证
	if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
		throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
	}

	// No existing transaction found -> check propagation behavior to find out how to proceed.
	// 如果当前线程不存在线程,当时 propagation 却被声明为 PROPAGATION_MANDATORY 抛出异常
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
		throw new IllegalTransactionStateException(
				"No existing transaction found for transaction marked with propagation 'mandatory'");
	}
	else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
			definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
			definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		// 新建事务
		// 空挂起
		SuspendedResourcesHolder suspendedResources = suspend(null);
		if (debugEnabled) {
			logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
		}
		try {
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
			// 构造 transaction,包括设置 ConnectionHolder,隔离级别,timeout,如果是新连接,绑定到当前线程
			doBegin(transaction, definition);
			// 新同步事务的设置,针对于当前线程的设置
			prepareSynchronization(status, definition);
			return status;
		}
		catch (RuntimeException | Error ex) {
			resume(null, suspendedResources);
			throw ex;
		}
	}
	else {
		// Create "empty" transaction: no actual transaction, but potentially synchronization.
		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
			logger.warn("Custom isolation level specified but no actual transaction initiated; " +
					"isolation level will effectively be ignored: " + definition);
		}
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
	}
}

上述方法主要逻辑:

  1. 获取事务
    创建对应的事务实例,这里使用的是 DataSourceTransactionManager 中的 doGetTransaction 方法,创建基于 JDBC 的事务实例。如果当前线程中存在关于 dataSource 的连接,就直接使用。这里有一个对于保存点的设置,是否开启允许保存点取决于是否设置了允许嵌入式事务。
// DataSourceTransactionManager.java
protected Object doGetTransaction() {
	DataSourceTransactionObject txObject = new DataSourceTransactionObject();
	txObject.setSavepointAllowed(isNestedTransactionAllowed());
	// 如果当前线程已经记录数据库则使用原有连接
	ConnectionHolder conHolder =
			(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
	// false 表示非新建连接
	txObject.setConnectionHolder(conHolder, false);
	return txObject;
}
  1. 当前线程是否存在事务,则转向嵌套事务的处理
  2. 事务超时设置验证
  3. 事务 propagationBehavior 属性的设置验证
  4. 构建 DefaultTransactionStatus
  5. 完善 transaction,包括设置 ConnectionHolder,隔离级别,timeout,如果是新连接,则绑定到当前线程

对于一些隔离级别,timeout 等功能的设置不是有 Spring 来完成的,而是委托给给底层的数据库连接去做的,而对于数据库连接的设置就是在 doBegin 函数中处理的。

protected void doBegin(Object transaction, TransactionDefinition definition) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;

	try {
		if (!txObject.hasConnectionHolder() ||
				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			Connection newCon = obtainDataSource().getConnection();
			if (logger.isDebugEnabled()) {
				logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
			}
			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
		}

		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		// 设置隔离级别
		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		txObject.setPreviousIsolationLevel(previousIsolationLevel);

		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		//更改自动提交设置,由 Spring 控制提交
		if (con.getAutoCommit()) {
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			con.setAutoCommit(false);
		}

		// 设置判断当前线程是否存在事务的依据
		prepareTransactionalConnection(con, definition);
		txObject.getConnectionHolder().setTransactionActive(true);

		int timeout = determineTimeout(definition);
		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
		}

		// Bind the connection holder to the thread.
		if (txObject.isNewConnectionHolder()) {
			// 将当前获取到的连接绑定到当前线程
			TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}

	catch (Throwable ex) {
		if (txObject.isNewConnectionHolder()) {
			DataSourceUtils.releaseConnection(con, obtainDataSource());
			txObject.setConnectionHolder(null, false);
		}
		throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
	}
}

可以说事务是从这个方法开始的,因为i在这个方法中已经开始尝试了对数据库连接的获取,在获取数据库连接的同时,需要设置一些同步设置。

  • 尝试获取连接
    当前线程中 connectionHolder 已经存在,则没有必要再次获取,或者,对于事务同步表示设置为 true 的需要重新获取连接。
  • 设置隔离级别以及只读标识
    事务中的只读操作做了一些处理,但是核心的实现是设置 connection 上的 readOnly 属性。对于隔离级别的控制也是交由 connection 去控制的。
  • 更改默认的提交设置,将提交操作委托给 Spring 处理
  • 设置标志位,标识当前连接已经被事务激活
  • 设置过期时间
  • 将 connectionHolder 绑定到当前线程
    设置隔离级别的prepareConnectionForTransaction方法用于负责对底层数据库的连接设置,只是包含只读标识和隔离级别的设置。
// DataSourceUtils.java
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
	throws SQLException {

	Assert.notNull(con, "No Connection specified");

	// Set read-only flag.
	// 设置数据库只读标识
	if (definition != null && definition.isReadOnly()) {
		try {
			if (logger.isDebugEnabled()) {
				logger.debug("Setting JDBC Connection [" + con + "] read-only");
			}
			con.setReadOnly(true);
		}
		catch (SQLException | RuntimeException ex) {
			Throwable exToCheck = ex;
			while (exToCheck != null) {
				if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
					// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
					throw ex;
				}
				exToCheck = exToCheck.getCause();
			}
			// "read-only not supported" SQLException -> ignore, it's just a hint anyway
			logger.debug("Could not set JDBC Connection read-only", ex);
		}
	}

	// Apply specific isolation level, if any.
	// 设置数据库连接的隔离级别
	Integer previousIsolationLevel = null;
	if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
		if (logger.isDebugEnabled()) {
			logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
					definition.getIsolationLevel());
		}
		int currentIsolation = con.getTransactionIsolation();
		if (currentIsolation != definition.getIsolationLevel()) {
			previousIsolationLevel = currentIsolation;
			con.setTransactionIsolation(definition.getIsolationLevel());
		}
	}

	return previousIsolationLevel;
}
  1. 将事务信息记录在当前线程中
// AbstractPlatformTransactionManager.java
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
 if (status.isNewSynchronization()) {
		TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
		TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
				definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
						definition.getIsolationLevel() : null);
		TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
		TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
		TransactionSynchronizationManager.initSynchronization();
	}
}

3.1.2 处理已存在的事务

Spring 中支持多种事务的传播方式,比如PROPAGATION_NESTEDPROPAGATION_REQUIRES_NEW等,这些都是在已存在事务的基础上进行进一步的处理,对于已存在的事务是如何处理的?在 getTransaction 方法中判断如果已存在事务(isExistingTransaction 方法判断),会调用 handleExistingTransaction 方法处理已存在的事务。

private TransactionStatus handleExistingTransaction(
	TransactionDefinition definition, Object transaction, boolean debugEnabled)
		throws TransactionException {


	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}

	// 不支持的事务传播类型
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction");
		}
		Object suspendedResources = suspend(transaction);
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(
				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
	}

	// 新建事务的处理
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction, creating new transaction with name [" +
					definition.getName() + "]");
		}

		SuspendedResourcesHolder suspendedResources = suspend(transaction);
		try {
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
			doBegin(transaction, definition);
			prepareSynchronization(status, definition);
			return status;
		}
		catch (RuntimeException | Error beginEx) {
			resumeAfterBeginException(transaction, suspendedResources, beginEx);
			throw beginEx;
		}
	}

	// 嵌入事务处理
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		if (!isNestedTransactionAllowed()) {
			throw new NestedTransactionNotSupportedException(
					"Transaction manager does not allow nested transactions by default - " +
					"specify 'nestedTransactionAllowed' property with value 'true'");
		}
		if (debugEnabled) {
			logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
		}
		if (useSavepointForNestedTransaction()) {
			// Create savepoint within existing Spring-managed transaction,
			// through the SavepointManager API implemented by TransactionStatus.
			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
			// 如果没有使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始建立保存点
			DefaultTransactionStatus status =
					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
			status.createAndHoldSavepoint();
			return status;
		}
		else {
			// Nested transaction through nested begin and commit/rollback calls.
			// Usually only for JTA: Spring synchronization might get activated here
			// in case of a pre-existing JTA transaction.
			// 有些情况不能使用保存点操作,比如 JTA,那么建立新事务
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, null);
			doBegin(transaction, definition);
			prepareSynchronization(status, definition);
			return status;
		}
	}

	// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
	if (debugEnabled) {
		logger.debug("Participating in existing transaction");
	}
	if (isValidateExistingTransaction()) {
		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
			Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
				Constants isoConstants = DefaultTransactionDefinition.constants;
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] specifies isolation level which is incompatible with existing transaction: " +
						(currentIsolationLevel != null ?
								isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
								"(unknown)"));
			}
		}
		if (!definition.isReadOnly()) {
			if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] is not marked as read-only but existing transaction is");
			}
		}
	}
	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

上述方法对于已存在的事务考虑两种情况:

  1. PROPAGATION_REQUIRES_NEW 表示当前事务必须在它自己的事务里运行,一个新的事务将被启动,而如果有一个事务正在允许的话,那么在这个方法允许运行期间被挂起。Spring 中对于此种传播方式处理与建立新事务的不同在于使用 suspend 方法将原事务挂起,并在当前事务处理完毕后将原事务还原。
  2. PROPAGATION_NESTED 表示如果当前正有一个事务在运行中,则该方法应该运行在一个嵌套事务中,被嵌套的事务可以独立于封装事务进行提交或回滚,如果封装事务不存在,行为就像 PROPAGATION_REQUIRES_NEW 。嵌入式事务主要考虑两种方式。
    • Spring 中允许嵌入事务的时候,则首选设置保存点的方式作为异常处理的回滚。
    • 对于其他方式,比如 JTA 无法使用保存点的方式,那么处理方式与 PROPAGATION_REQUIRES_NEW 相同,一旦出现异常,则由 Spring 的事务异常处理机制去完成后续操作。

对于挂起操作主要记录原有事务的状态,便于后续操作对事务的恢复。

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
	if (TransactionSynchronizationManager.isSynchronizationActive()) {
		List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
		try {
			Object suspendedResources = null;
			if (transaction != null) {
				suspendedResources = doSuspend(transaction);
			}
			String name = TransactionSynchronizationManager.getCurrentTransactionName();
			TransactionSynchronizationManager.setCurrentTransactionName(null);
			boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
			Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
			boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
			TransactionSynchronizationManager.setActualTransactionActive(false);
			return new SuspendedResourcesHolder(
					suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
		}
		catch (RuntimeException | Error ex) {
			// doSuspend failed - original transaction is still active...
			doResumeSynchronization(suspendedSynchronizations);
			throw ex;
		}
	}
	else if (transaction != null) {
		// Transaction active but no synchronization active.
		Object suspendedResources = doSuspend(transaction);
		return new SuspendedResourcesHolder(suspendedResources);
	}
	else {
		// Neither transaction nor synchronization active.
		return null;
	}
}

3.1.3 准备事务信息

当已经建立事务连接完成了事务信息的提取后,需要将所有的事务信息统一记录在 TransactionInfo 类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring 会通过 TransactionInfo 实例中的信息进行回滚等后续操作。

// TransactionAspectSupport.java
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
	@Nullable TransactionAttribute txAttr, String joinpointIdentification,
		@Nullable TransactionStatus status) {

	TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
	if (txAttr != null) {
		// We need a transaction for this method...
		if (logger.isTraceEnabled()) {
			logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
		}
		// The transaction manager will flag an error if an incompatible tx already exists.
		// 记录事务状态
		txInfo.newTransactionStatus(status);
	}
	else {
		// The TransactionInfo.hasTransaction() method will return false. We created it only
		// to preserve the integrity of the ThreadLocal stack maintained in this class.
		if (logger.isTraceEnabled()) {
			logger.trace("No need to create transaction for [" + joinpointIdentification +
					"]: This method is not transactional.");
		}
	}

	// We always bind the TransactionInfo to the thread, even if we didn't create
	// a new transaction here. This guarantees that the TransactionInfo stack
	// will be managed correctly even if no transaction was created by this aspect.
	txInfo.bindToThread();
	return txInfo;
}

3.2 回滚处理

当出现错误时,Spring 会进行回滚操作。

// TransactionAspectSupport.java
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
	// 当抛出异常时首先判断是否存在事务
	if (txInfo != null && txInfo.getTransactionStatus() != null) {
		if (logger.isTraceEnabled()) {
			logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
					"] after exception: " + ex);
		}
		// 判断是否回滚默认的依据是抛出的异常是否是 RuntimeException 或者是 Error 的类型
		// transactionAttribute 为 RuleBasedTransactionAttribute
		if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
			try {
				// 根据 transaction 信息进行回滚处理
				txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
			}
			catch (TransactionSystemException ex2) {
				logger.error("Application exception overridden by rollback exception", ex);
				ex2.initApplicationException(ex);
				throw ex2;
			}
			catch (RuntimeException | Error ex2) {
				logger.error("Application exception overridden by rollback exception", ex);
				throw ex2;
			}
		}
		else {
			// We don't roll back on this exception.
			// Will still roll back if TransactionStatus.isRollbackOnly() is true.
			// 当不满足回滚条件时及时抛出异常也会提交
			try {
				txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
			}
			catch (TransactionSystemException ex2) {
				logger.error("Application exception overridden by commit exception", ex);
				ex2.initApplicationException(ex);
				throw ex2;
			}
			catch (RuntimeException | Error ex2) {
				logger.error("Application exception overridden by commit exception", ex);
				throw ex2;
			}
		}
	}
}

对于执行目标方法,一旦出现 Throwable 就会被引导至此方法处理,当时不代表所有的 Throwable 都会被回滚处理,默认只处理 RuntimeException 和 Error。关键的判断在于txInfo.transactionAttribute.rollbackOn(ex)
1)回滚条件

// DefaultTransactionAttribute.java
public boolean rollbackOn(Throwable ex) {
	return (ex instanceof RuntimeException || ex instanceof Error);
}

默认情况下 Spring 中的事务异常处理机制只对 RuntimeException 和 Error 两种清空进行处理。可以通过扩展类进行处理,但是最常用的还是使用事务提供的属性设置,利用注解方式的使用,例如:

@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)

2)回滚处理
一旦符合回滚条件,Spring 就会将程序引导至回滚函数中。

// AbstractPlatformTransactionManager.java
public final void rollback(TransactionStatus status) throws TransactionException {
	// 如果事务已经完成,再次回滚会抛出异常
	if (status.isCompleted()) {
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}

	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	processRollback(defStatus, false);
}

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
	try {
		boolean unexpectedRollback = unexpected;

		try {
			//  激活所有 TransactionSynchronization 中对应的方法
			triggerBeforeCompletion(status);

			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Rolling back transaction to savepoint");
				}
				// 如果有保存点,也就是当前事务为单独的线程则会退到保存点
				status.rollbackToHeldSavepoint();
			}
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction rollback");
				}
				// 如果是新事务,直接回滚
				doRollback(status);
			}
			else {
				// Participating in larger transaction
				if (status.hasTransaction()) {
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						// 如果当前事务不是独立的事务,那么只能标记状态,等待事务执行完毕后统一回滚
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
				// Unexpected rollback only matters here if we're asked to fail early
				if (!isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = false;
				}
			}
		}
		catch (RuntimeException | Error ex) {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			throw ex;
		}
		// 激活所有 TransactionSynchronization 中对应的方法
		triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

		// Raise UnexpectedRollbackException if we had a global rollback-only marker
		if (unexpectedRollback) {
			throw new UnexpectedRollbackException(
					"Transaction rolled back because it has been marked as rollback-only");
		}
	}
	finally {
		// 清空记录的资源并将挂起的资源恢复
		cleanupAfterCompletion(status);
	}
}

上述方法主要逻辑:

  1. 自定义触发器的调用,包括回滚前后的调用,触发器的注册通过 TransactionSynchronizationManager#registerSynchronization方法。
public static void registerSynchronization(TransactionSynchronization synchronization)
  1. 回滚逻辑的处理
  • 当之前已经保存的事务信息中有保存点信息时,使用保存点信息进行回滚。常用于嵌入式事务,对于嵌入式事务的处理,内嵌的事务异常并不会引起外部事务的回滚。根据保存点回滚的实现方式其实是根据底层的数据库连接进行的。
// AbstractTransactionStatus.java
public void rollbackToHeldSavepoint() throws TransactionException {
	Object savepoint = getSavepoint();
	if (savepoint == null) {
		throw new TransactionUsageException(
				"Cannot roll back to savepoint - no savepoint associated with current transaction");
	}
	// 回滚至保存点
	getSavepointManager().rollbackToSavepoint(savepoint);
	getSavepointManager().releaseSavepoint(savepoint);
	setSavepoint(null);
}

这里使用的是 JDBC 方式进行的数据库连接,那么getSavepointManager方法返回的就是JdbcTransactionObjectSupport,即上述方法会调用JdbcTransactionObjectSupport#rollbackToSavepoint方法。

// JdbcTransactionObjectSupport.java
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
	ConnectionHolder conHolder = getConnectionHolderForSavepoint();
	try {
		// 使用 Jdbc 来回滚
		conHolder.getConnection().rollback((Savepoint) savepoint);
		conHolder.resetRollbackOnly();
	}
	catch (Throwable ex) {
		throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
	}
}
  • 当之前已经保存的事务信息中的事务为新事务,那么直接回滚。常用于单独事务的处理。对于没有保存点的回滚,Spring 也是使用底层数据库连接提供的 API 来操作的。使用DataSourceTransactionManager#doRollback方法。
protected void doRollback(DefaultTransactionStatus status) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
	Connection con = txObject.getConnectionHolder().getConnection();
	if (status.isDebug()) {
		logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
	}
	try {
		// Jdbc
		con.rollback();
	}
	catch (SQLException ex) {
		throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
	}
}
  • 当前事务信息中表明是存在事务的,又不属于以上两种情况,一般是 JTA,只做回滚标识,等到提交的时候统一不提交。
  1. 回滚后的信息清除
    对于回滚逻辑执行结束后,无论回滚是否成功,都会进行事务结束后的清除操作。
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
	// 设置完成状态
	status.setCompleted();
	// 如果是新的同步状态,需要清除绑定在当前线程的事务信息
	if (status.isNewSynchronization()) {
		TransactionSynchronizationManager.clear();
	}
	// 如果是新事务需要清除资源
	if (status.isNewTransaction()) {

		doCleanupAfterCompletion(status.getTransaction());
	}
	if (status.getSuspendedResources() != null) {
		if (status.isDebug()) {
			logger.debug("Resuming suspended transaction after completion of inner transaction");
		}
		Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
		// 结束之前事务的挂起状态
		resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
	}
}

上述方法主要逻辑:

  • 设置完成状态避免重复调用
  • 如果是新的同步状态,需要清除绑定在当前线程的事务信息
  • 如果是新事务需要清除一些资源
// DataSourceTransactionManager.java
protected void doCleanupAfterCompletion(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

	// Remove the connection holder from the thread, if exposed.
	if (txObject.isNewConnectionHolder()) {
		// 将数据库连接从当前线程中解除绑定
		TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

	// Reset connection.
	// 释放链接
	Connection con = txObject.getConnectionHolder().getConnection();
	try {
		if (txObject.isMustRestoreAutoCommit()) {
			// 恢复数据库自动提交属性
			con.setAutoCommit(true);
		}
		// 重置数据库连接
		DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
	}
	catch (Throwable ex) {
		logger.debug("Could not reset JDBC Connection after transaction", ex);
	}

	if (txObject.isNewConnectionHolder()) {
		if (logger.isDebugEnabled()) {
			logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
		}
		// 如果当前事务是独立的新建事务则在事务完成时释放数据库连接
		DataSourceUtils.releaseConnection(con, this.dataSource);
	}

	txObject.getConnectionHolder().clear();
}
  • 如果在事务执行前有事务挂起,那么当前事务结束后需要将挂起事务恢复
// AbstractPlatformTransactionManager.java
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
		throws TransactionException {

	if (resourcesHolder != null) {
		Object suspendedResources = resourcesHolder.suspendedResources;
		if (suspendedResources != null) {
			
			doResume(transaction, suspendedResources);
		}
		List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
		if (suspendedSynchronizations != null) {
			TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
			TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
			doResumeSynchronization(suspendedSynchronizations);
		}
	}
}

// DataSourceTransactionManager.java
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
	TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

3.3 事务提交

当事务执行并没有出现异常,就可以进行事务的提交了。

// TransactionAspectSupport.java
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
	if (txInfo != null && txInfo.getTransactionStatus() != null) {
		if (logger.isTraceEnabled()) {
			logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
		}
		// DataSourceTransactionManager
		txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
	}
}

在事务真正提交前,还需要做个判断。在异常事务处理时,当某个事务既没有保存点又不是新事务,Spring 的处理是会设置一个回滚标识。主要应用场景:某个事务是另一个事务嵌入式事务,但是这些事务不在 Spring 的管理范围之内,或者无法设置保存点,那么会通过设置回滚标识的方式来禁止提交。当外部事务提交时,会判断当前事务流是否设置了回滚标识,由外部事务统一进行整体事务的回滚。所以,当事务没有被异常捕获时也并不意味着一定会执行提交。

// AbstractPlatformTransactionManager.java
public final void commit(TransactionStatus status) throws TransactionException {
	if (status.isCompleted()) {
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}

	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	// 当前事务链中已经标识回滚标识,直接回滚
	if (defStatus.isLocalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Transactional code has requested rollback");
		}
		processRollback(defStatus, false);
		return;
	}

	if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
		}
		processRollback(defStatus, true);
		return;
	}
	// 提交
	processCommit(defStatus);
}

当事务一切执行正常时。

// AbstractPlatformTransactionManager.java
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
	try {
		boolean beforeCompletionInvoked = false;

		try {
			boolean unexpectedRollback = false;
			// 预留
			prepareForCommit(status);
			// TransactionSynchronization 中对于方法的调用
			triggerBeforeCommit(status);
			// TransactionSynchronization 中对于方法的调用
			triggerBeforeCompletion(status);
			beforeCompletionInvoked = true;

			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Releasing transaction savepoint");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				// 如果由保存点信息则清除
				status.releaseHeldSavepoint();
			}
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction commit");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				// 独立新事务直接提交 DataSourceTransactionManager
				doCommit(status);
			}
			else if (isFailEarlyOnGlobalRollbackOnly()) {
				unexpectedRollback = status.isGlobalRollbackOnly();
			}

			// Throw UnexpectedRollbackException if we have a global rollback-only
			// marker but still didn't get a corresponding exception from commit.
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction silently rolled back because it has been marked as rollback-only");
			}
		}
		catch (UnexpectedRollbackException ex) {
			// can only be caused by doCommit
			// TransactionSynchronization 中对于方法的调用
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
			throw ex;
		}
		catch (TransactionException ex) {
			// can only be caused by doCommit
			if (isRollbackOnCommitFailure()) {
				// 出现异常回滚
				doRollbackOnCommitException(status, ex);
			}
			else {
				// TransactionSynchronization 中对于方法的调用
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			}
			throw ex;
		}
		catch (RuntimeException | Error ex) {
			if (!beforeCompletionInvoked) {
				triggerBeforeCompletion(status);
			}
			doRollbackOnCommitException(status, ex);
			throw ex;
		}

		// Trigger afterCommit callbacks, with an exception thrown there
		// propagated to callers but the transaction still considered as committed.
		try {
			triggerAfterCommit(status);
		}
		finally {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
		}

	}
	finally {
		cleanupAfterCompletion(status);
	}
}

在提交中会考虑。

  • 当事务状态中有保存点信息不会提交事务
  • 当事务不是新事务也不会提交事务

此条件主要考虑内嵌事务的情况,对于内嵌事务,在 Spring 处理中会在内嵌事务开始前设置保存点,出现异常会根据保存点进行回滚,但是没有异常时,内嵌的事务不会单独提交,根据事务流由最外层事务负责提交,所以有保存点时标表示不是最外层事务,因此只是清除保存点信息,对于是否是新事务的判断也是基于此考虑。

最后会由数据库连接 API 进行提交。

// DataSourceTransactionManager.java
protected void doCommit(DefaultTransactionStatus status) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
	Connection con = txObject.getConnectionHolder().getConnection();
	if (status.isDebug()) {
		logger.debug("Committing JDBC transaction on Connection [" + con + "]");
	}
	try {
		// 提交
		con.commit();
	}
	catch (SQLException ex) {
		throw new TransactionSystemException("Could not commit JDBC transaction", ex);
	}
}

精彩评论(0)

0 0 举报