分布式事务-2PC实战

在了解分布式事务之前,先了解一下什么是事务的基本要素及事务属性:

一、 事务的基本要素

     事务的四个基本要素:ACID

     原子性:整个事务中的操作,要么全部完成, 要么全部不完成(全部撤销)。

     一致性:事务开始之前和结束之后,数据库的完整性没有遭到破坏。

     隔离性:在同一时间,只允许一个事务请求同一数据。

     持久性:事务完成以后,该事务对数据库所做的操作持久化在数据库中,并不会被回滚。

二、 事务的属性:

   1.传播行为(事务的传递):

   2.隔离级别(控制并发的弊端):

   3.只读(优化);

   4.超时(释放资源);

   5.回滚规则(指定要不要再出错后回滚事务);

使用Spring注解管理传播行为:

   // 如果有事务,那么加入事务,没有的话新建一个(默认)

        1) @Transactional(propagation=Propagation.REQUIRED) 

    // 容器不为这个方法开启事务(如果有事务将其挂起,执行方法)

        2) @Transactional(propagation=Propagation.NOT_SUPPORTED)    

     // 不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务

        3) @Transactional(propagation=Propagation.REQUIRES_NEW)  

    // 必须在一个已有的事务中执行,否则抛出异常

       4)@Transactional(propagation=Propagation.MANDATORY)   

   // 必须在一个没有的事务中执行,否则抛出异常(与Propagation.MANDATORY相反)

       5)@Transactional(propagation=Propagation.NEVER)   

    // 如果其他bean调用这个方法,在其他bean中声明事务,那就用事务.如果其他bean没有声明事务,那就不用事务.

       6) @Transactional(propagation=Propagation.SUPPORTS) 

    //内嵌到一个事务里面,当运行到内部Transaction时会停止,执行内部的Transaction 等其执行完了才执行外部的;

       7) @Transactional(propagation=Propagation.NESTED) 

    /*

    public void methodName(){

       // 本类的修改方法 1

       update();

       // 调用其他类的修改方法

       otherBean.update();

       // 本类的修改方法 2

       update();

    }

    other失败了不会影响 本类的修改提交成功

    本类update的失败,other也失败

    */

事务中经常出现的并发问题

 分析几个场景:

  脏读:一个事务读取了另一个事务操作但未提交的数据。

         比如A、B两个事务,都操作同一张表,A刚刚对数据进行了操作(插入、修改等)但还没有提交,这时B读取到了A刚刚操作的数据,因为A有可能回滚,所以这部分数据有可能只是临时的、无效的,即脏数据。

不可重复读:一个事务中的多个相同的查询返回了不同数据。

     比如A、B两个事务,A中先后有两次查询相同数据的操作,第一次查询完之后,B对相关数据进行了修改,造成A事务第二次查询出的数据与第一次不一致。

 幻读:事务并发执行时,其中一个事务对另一个事务中操作的结果集的影响。

      比如A、B两个事务,事务A操作表中符合条件的若干行。事务B插入符合A操作条件的数据行,然后再提交。后来发现事务A并没有如愿对“所有”符合条件的数据行做了修改~~

SQL规范定义的四个事务隔离级别

以上都是事务中经常发生的问题,所以为了兼顾并发效率和异常控制,SQL规范定义了四个事务隔离级别:

      Read uncommitted (读未提交):如果设置了该隔离级别,则当前事务可以读取到其他事务已经修改但还没有提交的数据。这种隔离级别是最低的,会导致上面所说的脏读

      Read committed (读已提交):如果设置了该隔离级别,当前事务只可以读取到其他事务已经提交后的数据,这种隔离级别可以防止脏读,但是会导致不可重复读和幻读。这种隔离级别最效率较高,并且不可重复读和幻读在一般情况下是可以接受的,所以这种隔离级别最为常用。

      Repeatable read (可重复读):如果设置了该隔离级别,可以保证当前事务中多次读取特定记录的结果相同。可以防止脏读、不可重复读,但是会导致幻读。

     Serializable (串行化):如果设置了该隔离级别,所有的事务会放在一个队列中执行,当前事务开启后,其他事务将不能执行,即同一个时间点只能有一个事务操作数据库对象。这种隔离级别对于保证数据完整性的能力是最高的,但因为同一时刻只允许一个事务操作数据库,所以大大降低了系统的并发能力。 

    引用一张很经典的表格来按隔离级别由弱到强来标示为:

blob.png

 查看事务隔离级别

  命令行登录mysql,查看当前事务隔离级别:

   select @@tx_isolation; 或者  select @@session.tx_isolation;

 3.只读;

     // readOnly=true只读,不能更新,删除 

      @Transactional (propagation = Propagation.REQUIRED,readOnly=true) 

 4.超时;释放资源

      // 设置超时时间

      @Transactional (propagation = Propagation.REQUIRED,timeout=30)

  5.回滚规则;

       

 默认遇到throw new RuntimeException("...");会回滚
       需要捕获的throw new Exception("...");不会回滚
   // 指定回滚
     @Transactional(rollbackFor=Exception.class) 
    public void methodName() {
       // 不会回滚
       throw new Exception("...");
    } 
  //指定不回滚
     @Transactional(noRollbackFor=Exception.class)
    public ItimDaoImpl getItemDaoImpl() {
        // 会回滚
        throw new RuntimeException("注释");
    }

本地事务

以支付宝转账余额宝为例,假设有

  支付宝账户表:A(id,userId,amount)  

  余额宝账户表:B(id,userId,amount)

  用户的userId=1;

  从支付宝转账1万块钱到余额宝的动作分为两步:

      1)支付宝表扣除1万:update A set amount=amount-10000 where userId=1;

     2)余额宝表增加1万:update B set amount=amount+10000 where userId=1;

  如何确保支付宝余额宝收支平衡呢?有人说这个很简单嘛,可以用事务解决。

 blob.png

      非常正确!如果你使用spring的话一个注解就能搞定上述事务功能。  

blob.png

     如果系统规模较小,数据表都在一个数据库实例上,上述本地事务方式可以很好地运行,但是如果系统规模较大,比如支付宝账户表和余额宝账户表显然不会在同一个数据库实例上,他们往往分布在不同的物理节点上,这时本地事务已经失去用武之地。

   既然本地事务失效,分布式事务自然就登上舞台。

什么是分布式事务?

     分布式事务是指事务的参与者、支持事务的服务器、资源管理器以及事务管理器分别位于分布系统的不同节点之上,在两个或多个网络计算机资源上访问并且更新数据,将两个或多个网络计算机的数据进行的多次操作作为一个整体进行处理。如不同银行账户之间的转账。

分布式事务—两阶段提交协议

     两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器C和若干事务执行者Si两种角色,这里的事务执行者就是具体的数据库,协调器可以和事务执行器在一台机器上。

blob.png

  1) 我们的应用程序(client)发起一个开始请求到TC;

  2) TC先将<prepare>消息写到本地日志,之后向所有的Si发起<prepare>消息。以支付宝转账到余额宝为例,TC给A的prepare消息是通知支付宝数据库相应账目扣款1万,TC给B的prepare消息是通知余额宝数据库相应账目增加1w。为什么在执行任务前需要先写本地日志,主要是为了故障后恢复用,本地日志起到现实生活中凭证 的效果,如果没有本地日志(凭证),出问题容易死无对证;

  3) Si收到<prepare>消息后,执行具体本机事务,但不会进行commit,如果成功返回<yes>,不成功返回<no>。同理,返回前都应把要返回的消息写到日志里,当作凭证。

  4) TC收集所有执行器返回的消息,如果所有执行器都返回yes,那么给所有执行器发生送commit消息,执行器收到commit后执行本地事务的commit操作;如果有任一个执行器返回no,那么给所有执行器发送abort消息,执行器收到abort消息后执行事务abort操作。

注:TC或Si把发送或接收到的消息先写到日志里,主要是为了故障后恢复用。如某一Si从故障中恢复后,先检查本机的日志,如果已收到<commit >,则提交,如果<abort >则回滚。如果是<yes>,则再向TC询问一下,确定下一步。如果什么都没有,则很可能在<prepare>阶段Si就崩溃了,因此需要回滚。

现如今实现基于两阶段提交的分布式事务也没那么困难了,如果使用java,那么可以使用开源软件atomikos(http://www.atomikos.com/)来快速实现。

不过但凡使用过的上述两阶段提交的同学都可以发现性能实在是太差,根本不适合高并发的系统。为什么?

    1)两阶段提交涉及多次节点间的网络通信,通信时间太长!

    2)事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!

正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。在高并发的时候不建议使用。

   对于在项目中接触到JTA,大部分的原因是因为在项目中需要操作多个数据库,同时,可以保证操作的原子性,保证对多个数据库的操作一致性。

   下面我将基于Spring4.1.7+atomikos+mybaits 实现两阶段的分布式事务处理,通过AOP面向切面实现动态实现数据源的切换

   所需JAR:

<!-- atomikos分布式事务 -->
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jta</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>atomikos-util</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jms</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-osgi</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-api</artifactId>
			<version>4.0.4</version>
		</dependency>
		<dependency>
			<groupId>javax.transaction</groupId>
			<artifactId>jta</artifactId>
			<version>1.1</version>
		</dependency>
		<!-- atomikos分布式事务 -->

配置applicationContext-db.xml:

<!-- 公共的数据源 -->
	<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close" abstract="true">
		<property name="xaDataSourceClassName" value="${jdbc.xaDataSourceClassName}" />
		<!-- SQLErrorCodes loaded: [DB2, Derby, H2, HSQL, Informix, MS-SQL, MySQL, 
			Oracle, PostgreSQL, Sybase, Hana] -->
		<property name="poolSize" value="10" />
		<property name="minPoolSize" value="10" />
		<property name="maxPoolSize" value="30" />
		<property name="borrowConnectionTimeout" value="60" />
		<property name="reapTimeout" value="20" />
		<property name="maxIdleTime" value="60" />
		<property name="maintenanceInterval" value="60" />
		<property name="loginTimeout" value="60" />
		<property name="testQuery" value="SELECT 'x' FROM DUAL" />
	</bean>

	<bean id="orderDataSource" parent="abstractXADataSource">
		<property name="uniqueResourceName" value="orderDataSource" />
		<property name="xaProperties">
			<props>
				<prop key="driverClassName">${jdbc.driverClassName}</prop>
				<prop key="url">${order.jdbc.url}</prop>
				<prop key="password">${order.jdbc.password}</prop>
				<prop key="username">${order.jdbc.username}</prop>   <!-- durid -->
				<prop key="initialSize">${jdbc.initialSize}</prop>
				<prop key="maxActive">${jdbc.maxActive}</prop> 
				<!-- 若不配置则代码执行"{dataSource-1} inited"此处停止 -->
				<prop key="minIdle">${jdbc.minIdle}</prop>
				<prop key="maxWait">${jdbc.maxWait}</prop>
				<prop key="validationQuery">SELECT 'x' FROM DUAL</prop>
				<prop key="testOnBorrow">${jdbc.testOnBorrow}</prop>
				<prop key="testOnReturn">${jdbc.testOnReturn}</prop>
				<prop key="testWhileIdle">${jdbc.testWhileIdle}</prop>
				<prop key="removeAbandoned">true</prop>
				<prop key="removeAbandonedTimeout">1800</prop>
				<prop key="logAbandoned">true</prop>
				<prop key="filters">log4j</prop>
			</props>
		</property>
	</bean>

   	<bean id="dataSource" parent="abstractXADataSource">
		<property name="uniqueResourceName" value="dataSource" />
		<property name="xaProperties">
			<props>
				<prop key="driverClassName">${jdbc.driverClassName}</prop>
				<prop key="url">${common.jdbc.url}</prop>
				<prop key="password">${common.jdbc.password}</prop>
				<prop key="username">${common.jdbc.username}</prop>   <!-- durid -->
				<prop key="initialSize">${jdbc.initialSize}</prop>
				<prop key="maxActive">${jdbc.maxActive}</prop> <!-- 若不配置则代码执行"{dataSource-1} inited"此处停止 -->
				<prop key="minIdle">${jdbc.minIdle}</prop>
				<prop key="maxWait">${jdbc.maxWait}</prop>
				<prop key="validationQuery">SELECT 'x' FROM DUAL</prop>
				<prop key="testOnBorrow">${jdbc.testOnBorrow}</prop>
				<prop key="testOnReturn">${jdbc.testOnReturn}</prop>
				<prop key="testWhileIdle">${jdbc.testWhileIdle}</prop>
				<prop key="removeAbandoned">true</prop>
				<prop key="removeAbandonedTimeout">1800</prop>
				<prop key="logAbandoned">true</prop>
				<prop key="filters">log4j</prop>
			</props>
		</property>
	</bean>


	<bean id="order_sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
		<property name="dataSource" ref="orderDataSource" />
		<property name="mapperLocations" value="classpath*:/atomikos/*/*.xml" />
	</bean>
	
	<bean id="common_sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
		<property name="dataSource" ref="dataSource" />
		<property name="mapperLocations" value="classpath*:/atomikos/*/*.xml" />
	</bean>
	
	<!-- 扫描basePackage下所有以@Repository标识的 接口 -->
	<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
		<property name="basePackage" value="com.viemall.atomikos.mapper" />
		<!-- <property name="annotationClass" value="org.springframework.stereotype.Repository" /> -->
		<property name="sqlSessionTemplateBeanName" value="routingSqlSessionTemplate" />
	</bean>


	<bean id="routingSqlSessionTemplate" class="com.viemall.atomikos.datasource.RoutingSqlSessionTemplate">
		<constructor-arg ref="common_sqlSessionFactory" />
		<property name="targetSqlSessionFactorys">
			 <!-- 注册所有的数据源 -->
			<map>
				<entry key="dataSource" value-ref="common_sqlSessionFactory" />
				<entry key="orderDataSource" value-ref="order_sqlSessionFactory" />
			</map>
		</property>
	</bean>

配置applicationContext-aop.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" 
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:oxm="http://www.springframework.org/schema/oxm"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
                http://www.springframework.org/schema/context 
                http://www.springframework.org/schema/context/spring-context-4.1.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-4.1.xsd">

	<aop:config>
		<aop:advisor id="managerTx" advice-ref="txAdvice"
			pointcut="execution(* com..*Service.*(..))" order="1" />
		
		<aop:advisor id="dsAdvice"  
		              advice-ref="dataSourceAdvice"  
		              pointcut="execution(* com.viemall.atomikos.mapper.*.*(..))"   order="2"  /> 
		              
	</aop:config>
	
	<tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
		<tx:attributes>
			<tx:method name="load*" propagation="SUPPORTS" read-only="true" />
			<tx:method name="list*" propagation="SUPPORTS" read-only="true" />
			<tx:method name="get*" propagation="SUPPORTS" read-only="true" />
			<tx:method name="is*" propagation="SUPPORTS" read-only="true" />
			<!-- REQUIRED:支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。  -->
			<tx:method name="*" propagation="REQUIRED" />
		</tx:attributes>
	</tx:advice>
	
	
	<!-- atomikos事务管理器 -->  
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
         init-method="init" 
         destroy-method="close">  
        <property name="forceShutdown">  
            <value>true</value>  
        </property>  
    </bean>

	<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
		<property name="transactionTimeout" value="3000" />
	</bean>  
    
    
    <!-- spring 事务管理器 -->    
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">  
        <property name="transactionManager" ref="atomikosTransactionManager" />  
        <property name="userTransaction" ref="atomikosUserTransaction" />  
        <!-- 必须设置,否则程序出现异常 JtaTransactionManager does not support custom isolation levels by default -->  
        <property name="allowCustomIsolationLevels" value="true"/>   
    </bean> 
    
     <!-- 切换数据源 -->  
    <bean id="dataSourceAdvice" class="com.viemall.atomikos.datasource.DataSourceAdvice" >
    	<property name="supportReady" value="false" />
    </bean>
</beans>

   

部分Java代码:

/**
 * 使用AOP思想切换数据源
 * @author Tony
 *
 */
public class DataSourceAdvice implements MethodBeforeAdvice, AfterReturningAdvice, ThrowsAdvice {

	/** The log. */
	final Log log = LogFactory.getLog(getClass());
	
	boolean supportReady = false; //是否支持只读实例

	/**
	 * service方法执行完之后被调用  后置通知
	 */ 
	@Override
	public void afterReturning(Object returnValue, Method method,
			Object[] args, Object target) throws Throwable {
		DataSourceContextHolder.clearDataSourceType();
	}

	/**
	 *  前置通知 
	 */
	@Override
	public void before(Method method, Object[] args, Object target)
			throws Throwable {
		DataSource dataSource = resolveDataSourceFromClass(target);
		if(log.isInfoEnabled()){
			log.info("Enter point cut " + target.getClass().getName() + ", method " +  method.getName());
		}
		if(dataSource==null){ //
			//设置默认的数据源
			DataSourceContextHolder.setDataSourceType(DataSourceGroup.dataSource.name());
			return;
		}
		String dsName = dataSource.source().name();
		if(supportReady && isReadyOnlyMethod(method.getName())){//是否只读方法
			//切换读库
			//DataSourceContextHolder.setDataSourceType(readOnlyDataSourceCalculator.
			getDataSource(dsName, dataSource.readOnlyNum()));
		}
		else {
			if(log.isInfoEnabled()){
				log.info("Switch to  datasource " + dsName);
			}
			DataSourceContextHolder.setDataSourceType(dsName);
		}
	}
	
	/**
	 * 获取方法数据源
	 * @param target
	 * @return
	 */
	@SuppressWarnings({ "rawtypes" })
	private DataSource resolveDataSourceFromClass(Object target) {
		Class[] interfaces = target.getClass().getInterfaces();
		for (Class<?> clz : interfaces) {
            if (clz.isInterface()) {
            	DataSource dataSource=(DataSource) clz.getAnnotation(DataSource.class);
            	if(dataSource!=null){
        			return dataSource;
        		}
            }
        }
		return null;
	}
	 
	
	/**
	 * 是否是只读函数,决定是否用只读的数据源
	 * @param methodName
	 * @return
	 */
	private boolean isReadyOnlyMethod(String methodName){
		if(methodName.startsWith("get") || methodName.startsWith("query") ||
				methodName.startsWith("is") || methodName.startsWith("load")){
			return true;
		}
		return false;
	}
	
	
	/**
	 * 抛出Exception之后被调用
	 */
	public void afterThrowing(Method method, Object[] args, Object target, Exception ex) throws Throwable {
		DataSourceContextHolder.clearDataSourceType();
	}

	public void setSupportReady(boolean supportReady) {
		this.supportReady = supportReady;
	}
	
	public static void main(String[] args) {
		DataSource dataSource=SubMapper.class.getAnnotation(DataSource.class);
		System.out.println(dataSource.source());
		
	}
}

源码文件下载:

  http://download.csdn.net/detail/tang06211015/9727004

后续将会出常用的分布式事务的处理解决方案和实战演练;

参考资料:

    https://yq.aliyun.com/articles/10#

发表评论