深圳惠源办公设备租赁
联系电话 :13713619827

Spring源码解析(五)-Spring事务原理分析之代理对象执行过程

2
发表时间:2021-11-22 21:42

前言

上一节分析了Spring怎么去创建事务代理对象,这一节分析下代理对象的执行过程。


一、CglibAopProxy

Spring创建的事务代理对象

1、intercept

代理对象拦截原方法执行

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {Object oldProxy = null;boolean setProxyContext = false;Object target = null;TargetSource targetSource = this.advised.getTargetSource();try {// 判断是否要将代理对象放到ThreadLocal,@EnableAspectJAutoProxy(exposeProxy = true)if (this.advised.exposeProxy) {
					oldProxy = AopContext.setCurrentProxy(proxy);
					setProxyContext = true;}// 获取被代理对象
				target = targetSource.getTarget();Class<?> targetClass = (target != null ? target.getClass() : null);// 获取切面方法调用链 ,对于Spring事务,我们可以拿到TransactionInteceporList<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);Object retVal;// 判断无有切面拦截并且时public方法直接调用被代理对象方法if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
					retVal = methodProxy.invoke(target, argsToUse);}else {// 需要进行代理
					retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();}
				retVal = processReturnType(proxy, target, method, retVal);return retVal;}finally {if (target != null && !targetSource.isStatic()) {
					targetSource.releaseTarget(target);}if (setProxyContext) {// Restore old proxy.AopContext.setCurrentProxy(oldProxy);}}}

1、procced

循环调用链便利调用切面方法

public Object proceed() throws Throwable {// 判断调用链是否执行完,执行完则调用被代理对象的方法if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {return invokeJoinpoint();}// 获取当前该执行切面方法Object interceptorOrInterceptionAdvice =this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);// 对于切面类型为InterceptorAndDynamicMethodMatcher的,需要判断该方法是否需要被该切面拦截if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {InterceptorAndDynamicMethodMatcher dm =(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {return dm.interceptor.invoke(this);}else {// 不需要被拦截return proceed();}}else {// 切面时拦截器类型的,直接执行其invoke方法(事务切面拦截TransactionInterceptor)return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);}}

二、TransactionInterceptor

1、inovke

在TransactionInterceptor的invoke进行事务处理

public Object invoke(MethodInvocation invocation) throws Throwable {// Work out the target class: may be {@code null}.// The TransactionAttributeSource should be passed the target class// as well as the method, which may be from an interface.Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// 开始进行事务处理return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}

2、invokeWithinTransaction

进行事务处理

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {// 获取事务属性TransactionAttributeSource tas = getTransactionAttributeSource();final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);final TransactionManager tm = determineTransactionManager(txAttr);PlatformTransactionManager ptm = asPlatformTransactionManager(tm);final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {// 创建并开启事务TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);Object retVal;try {// 事务内调用被代理对象方法
				retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {// 出现异常回归事务completeTransactionAfterThrowing(txInfo, ex);throw ex;}finally {// 清理当前事务信息cleanupTransactionInfo(txInfo);}if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {TransactionStatus status = txInfo.getTransactionStatus();if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}}commitTransactionAfterReturning(txInfo);return retVal;}else {final ThrowableHolder throwableHolder = new ThrowableHolder();// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.try {Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);try {Object retVal = invocation.proceedWithInvocation();if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {// Set rollback-only in case of Vavr failure matching our rollback rules...
							retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}return retVal;}catch (Throwable ex) {if (txAttr.rollbackOn(ex)) {// A RuntimeException: will lead to a rollback.if (ex instanceof RuntimeException) {throw (RuntimeException) ex;}else {throw new ThrowableHolderException(ex);}}else {// A normal return value: will lead to a commit.
							throwableHolder.throwable = ex;return null;}}finally {cleanupTransactionInfo(txInfo);}});// Check result state: It might indicate a Throwable to rethrow.if (throwableHolder.throwable != null) {throw throwableHolder.throwable;}return result;}catch (ThrowableHolderException ex) {throw ex.getCause();}catch (TransactionSystemException ex2) {if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
					ex2.initApplicationException(throwableHolder.throwable);}throw ex2;}catch (Throwable ex2) {if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);}throw ex2;}}}

3、createTransactionIfNecessary

创建事务

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {// If no name specified, apply method identification as transaction name.if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {@Overridepublic String getName() {return joinpointIdentification;}};}TransactionStatus status = null;if (txAttr != null) {if (tm != null) {// 从AbstractPlatformTransactionManager获取事务
				status = tm.getTransaction(txAttr);}else {if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +"] because no transaction manager has been configured");}}}return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}

三、AbstractPlatformTransactionManager

1、getTransaction

获取事务

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException {// Use defaults if no transaction definition given.TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());// 获取当前线程的事务Object transaction = doGetTransaction();boolean debugEnabled = logger.isDebugEnabled();// 已经存在事务,则以当前存在事务去处理if (isExistingTransaction(transaction)) {// Existing transaction found -> check propagation behavior to find out how to behave.return handleExistingTransaction(def, transaction, debugEnabled);}// Check definition settings for new transaction.if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());}// 设置了事务隔离级别PROPAGATION_MANDATORY,并且当前无事务,报错if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");}// 设置了事务隔离级别PROPAGATION_REQUIRED 、PROPAGATION_REQUIRES_NEW 、PROPAGATION_NESTED,开启新事务,设置挂起的事务为nullelse if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {SuspendedResourcesHolder suspendedResources = suspend(null);if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);// 开启新事务DefaultTransactionStatus status = newTransactionStatus(
						def, transaction, true, newSynchronization, debugEnabled, suspendedResources);// 获取新链接,启动新事务doBegin(transaction, def);prepareSynchronization(status, def);return status;}catch (RuntimeException | Error ex) {resume(null, suspendedResources);throw ex;}}else {// Create "empty" transaction: no actual transaction, but potentially synchronization.if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}}

2. doBegin

获取新的connection,启动新事务

 protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;Connection con = null;try {if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {// 获取新的conectionConnection newCon = this.obtainDataSource().getConnection();if (this.logger.isDebugEnabled()) {this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}// 将connection放到当前线程txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);if (this.logger.isDebugEnabled()) {this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");}con.setAutoCommit(false);}this.prepareTransactionalConnection(con, definition);txObject.getConnectionHolder().setTransactionActive(true);int timeout = this.determineTimeout(definition);if (timeout != -1) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());}} catch (Throwable var7) {if (txObject.isNewConnectionHolder()) {DataSourceUtils.releaseConnection(con, this.obtainDataSource());txObject.setConnectionHolder((ConnectionHolder)null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);}}

3、handleExistingTransaction

分析下如果当前有事务存在的情况

// 当前事务隔离级别为PROPAGATION_NEVER,抛出异常if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}// 当前事务隔离级别为PROPAGATION_NEVERif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {
				logger.debug("Suspending current transaction");}// 挂起当前事务Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);}// 当前事务隔离级别为PROPAGATION_REQUIRES_NEWif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");}// 挂起当前事务SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);// 启动新事务doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (useSavepointForNestedTransaction()) {// Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();return status;}else {// Nested transaction through nested begin and commit/rollback calls.// Usually only for JTA: Spring synchronization might get activated here// in case of a pre-existing JTA transaction.boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}}// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.if (debugEnabled) {
			logger.debug("Participating in existing transaction");}if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);

四、TransactionInterceptor

1、cleanupTransactionInfo

清理当前事务信息

private void restoreThreadLocalStatus() {// 还原挂起的旧事务
			transactionInfoHolder.set(this.oldTransactionInfo);}

总结

事务的原理其实就是将被代理对象的方法执行放在try-catch里面,对catch的异常后进行事务回滚。事务的隔离级别通过挂起旧事务来实现,即报存旧事务信息,获取新的Connection。
至此,Spring事务分析到处结束。

分享到:
产品推荐 理光mpC6001/5504/4504数码复印机 理光mpC5503/4503数码复印机 理光mpC3503/3303
服务支持


厂家直接供货
24小时客户服务
办公方案支持
名字:
*
邮箱:
*
电话:
*
留言:
*
验证码
 换一张
*
提交留言
在线留言 :
会员登录
登录
其他帐号登录:
留言
回到顶部