目录
Spring的事务管理是Spring AOP最为经典的代表。以下着重分析一下Spring对于事务属性的解析、事务创建、事务回滚、事务提交的核心原理。
事务属性解析优先级
在AbstractFallbackTransactionAttributeSource类的computeTransactionAttribute()方法中可以看到如下逻辑
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
// 不允许非public的方法
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// Ignore CGLIB subclasses - introspect the actual user class.
Class<?> userClass = (targetClass != null ? ClassUtils.getUserClass(targetClass) : null);
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
// If we are dealing with method with generic parameters, find the original method.
specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// First try is the method in the target class.
// 1、首先查看方法中是否存在事务声明
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
// 2、查看方法所在的类是否存在事务声明
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
// 3、如果存在接口,去接口中寻找
if (specificMethod != method) {
// Fallback is to look at the original method.
// 首先查找接口方法
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
// 最后在接口的类中查找
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
通过源码可以得出事务属性解析的优先级为:实现类方法 > 实现类 > 接口方法 > 接口
代码里反复调用了findTransactionAttribute()方法,这个方法实现了具体事务属性的提取。
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {
return determineTransactionAttribute(method);
}
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement ae) {
for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
// 解析事务注解
TransactionAttribute attr = annotationParser.parseTransactionAnnotation(ae);
if (attr != null) {
return attr;
}
}
return null;
}
parseTransactionAnnotation()方法实现了对@Transactional注解属性的解析。
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
// 解析传播属性
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
// 解析隔离级别
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
// 解析事务超时时间
rbta.setTimeout(attributes.getNumber("timeout").intValue());
// 解析readOnly属性
rbta.setReadOnly(attributes.getBoolean("readOnly"));
// 解析value属性
rbta.setQualifier(attributes.getString("value"));
ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<>();
// 解析事务回滚的异常类型
Class<?>[] rbf = attributes.getClassArray("rollbackFor");
for (Class<?> rbRule : rbf) {
RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
// 解析事务回滚的ClassName
String[] rbfc = attributes.getStringArray("rollbackForClassName");
for (String rbRule : rbfc) {
RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
// 解析不回滚的异常类型
Class<?>[] nrbf = attributes.getClassArray("noRollbackFor");
for (Class<?> rbRule : nrbf) {
NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
// 解析不回滚的className
String[] nrbfc = attributes.getStringArray("noRollbackForClassName");
for (String rbRule : nrbfc) {
NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
rbta.getRollbackRules().addAll(rollBackRules);
return rbta;
}
事务的执行过程
Spring事务管理是AOP最典型的应用,事务的执行在TransactionInterceptor拦截器中的invoke方法开始。
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器对象
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 获取连接点的唯一标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 定义事务范围
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行被增强的目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除事务信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 后面是编程式事务忽略
......
}
创建事务
事务的创建由createTransactionIfNecessary方法开启:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
// 1、用DelegatingTransactionAttribute装载事务属性,供后续使用
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 2、获取事务
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 3、准备事务信息并返回
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
它主要完成三件事:
1、事务属性的装载;2、获取事务;3、准备事务信息并返回。
下面主要分析一下2和3。
获取事务
获取事务在getTransaction()方法中
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 判断当前线程是否存在事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 如果存在事务,根据事务的不同传播性进行相应的处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
// 事务超时验证
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 如果当前线程不存在事务,但传播属性设置成了PROPAGATION_MANDATORY,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 三种需要新建事务的传播属性
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起事务,主要目的是记录原有事务的状态,以便后续操作对事务的恢复
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 构造事务,包括设置连接、隔离级别、超时时间等
doBegin(transaction, definition);
// 针对当前线程的事务同步设置
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
总结一下获取事务的过程
1、首先判断当前线程是否存在事务,如果存在,则根据事务的传播属性(七种属性)进行相应的处理并返回;如果不存在,则继续后面创建事务的逻辑。
2、事务超时验证。
3、如果当前线程不存在事务,但传播属性设置成了PROPAGATION_MANDATORY,抛出异常。
4、如果传播属性设置成了PROPAGATION_REQUIRED 或 PROPAGATION_REQUIRES_NEW 或 PROPAGATION_NESTED,执行新建事务的逻辑。
5、新建事务包括连接绑定、事务构建、状态同步等,而隔离级别、超时时间会委托底层数据库完成。
准备事务信息
当已经建立事务连接并完成了事务信息的提取后,需要将所有事务信息统一记录在TransactionInfo类型实例中,这个实例包含了目标方法开始执行前的所有状态信息,一旦事务执行失败,Spring会通过TransactionInfo实例中的信息来进行回滚等后续工作。
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
// 记录事务状态
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled())
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
// 将事务信息绑定到线程
txInfo.bindToThread();
return txInfo;
}
事务回滚
在invokeWithinTransaction方法执行整个事务过程中可以看到,如果发生异常,会将异常捕获,同时执行completeTransactionAfterThrowing()方法。
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 首先判断是否存在事务,这是基础依据
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 默认回滚RuntimeException或Error异常
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 根据事务状态进行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// 如果异常类型不满足,依然执行事务提交逻辑
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
默认的回滚条件
@Override
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
默认情况下,Spring只会对RuntimeException类型或Error类型的异常进行回滚,其它异常仍然会被提交,除非单独设置, 例如
@Transactional(rollbackFor = Exception.class)
一旦满足回滚条件就会进入回滚处理函数rollback()中
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
// 如果事务已经完成,再次回滚会抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 激活前置触发器,即激活所有TransactionSynchronization中对应的方法
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 如果有保存点,则回滚到保存点(处理嵌套事务)
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 如果当前线程为独立的新事物,直接执行回滚处理
doRollback(status);
}
else {
// Participating in larger transaction
// 如果当前事务不是独立的新事务,那么只能进行标记状态,等事务链执行完毕后统一回滚
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 激活后置触发器
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清理并恢复挂起的资源
cleanupAfterCompletion(status);
}
}
总结一下回滚的逻辑
首先是自定义触发器的调用,包括回滚前的处理,以及回滚后的处理。
然后就是具体的回滚逻辑,其中包括对于嵌套事务保存点的回滚处理,独立新事务的回滚处理,以及非独立事务的标记处理。
最后是回滚后的清除工作,无论前面回滚是否成功,都需要进行收尾,例如设置事务状态、清除线程绑定的事务信息、恢复挂起的线程、数据库资源释放等。
事务提交
在invokeWithinTransaction方法的最后会调用commitTransactionAfterReturning()进行事务提交
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果当前事务状态被标记成rollback,直接回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
// 如果全局的事务链已经被标记为rollback,直接回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 执行提交
processCommit(defStatus);
}
在commit方法中会先检查整个事务链的回滚标识,如果发现被标记为rollback,直接执行回滚。
最后看一下processCommit()的逻辑
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预留
prepareForCommit(status);
// 事务提交的前置处理
triggerBeforeCommit(status);
// 事务完成的前置处理
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 如果存在保存点,说明是内嵌事务,如果内嵌事务出现异常或者外层事务决定回滚时,内嵌事务会根据保存点回滚
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 清除保存点信息
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 如果是独立事务,直接提交
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
// 提交过程中发现异常
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
当事务状态中有保存点信息的话(嵌套事务的内层事务),不会直接提交事务。
当事务属于非新事务的时候也不会直接执行提交。
如果前面的验证没有问题,Spring将通过底层操作数据库的API进行事务提交。
转载:https://blog.csdn.net/u011212394/article/details/101993512