探讨 | SpringBoot + MyBatis 多数据源事物问题 - mySoul

/ 0评 / 0

这是小小本周的第二篇,本篇将会着重讲解关于SpringBoot + MyBatis 多数据源的事物的问题。

多数据源

此处模拟创建订单和扣减库存。
先创建订单表和库存表

CREATE TABLE `t_storage` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `t_order` (
  `id` bigint(16) NOT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  `amount` double(14,2) DEFAULT '0.00',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

数据库连接

这里使用yal文件把数据库进行配置

spring:
  datasource:
    ds1:
      jdbc_url: jdbc:mysql://127.0.0.1:3306/db1
      username: root
      password: root
    ds2:
      jdbc_url: jdbc:mysql://127.0.0.1:3306/db2
      username: root
      password: root

基本的目录结构

切换数据源的抽象类

这里切换数据源的抽象类为 AbstractRoutingDataSource
这里看一下相关的源码

public abstract class AbstractRoutingDataSource{
    //数据源的集合
    @Nullable
    private Map<Object, Object> targetDataSources;
    //默认的数据源
    @Nullable
    private Object defaultTargetDataSource;

    //返回当前的路由键,根据该值返回不同的数据源
    @Nullable
    protected abstract Object determineCurrentLookupKey();

    //确定一个数据源
    protected DataSource determineTargetDataSource() {
        //抽象方法 返回一个路由键
        Object lookupKey = determineCurrentLookupKey();
        DataSource dataSource = this.targetDataSources.get(lookupKey);
        return dataSource;
    }
}

对于该源码来说,核心为用Map保存多个数据源信息,根据key获取不同的数据源。

修改数据源的核心

那么这里修改数据源的核心就在于重写determineCurrentLookupKey方法,让其返回一个数据源名称

public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();
        return dataBaseType;
    }
}

此时的目录结构为

再添加一个工具类

这里再添加一个工具类,用来保存当前线程的数据源类型

public class DataSourceType {

    public enum DataBaseType {
        ds1, ds2
    }
    // 使用ThreadLocal保证线程安全
    private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>();
    // 往当前线程里设置数据源类型
    public static void setDataBaseType(DataBaseType dataBaseType) {
        if (dataBaseType == null) {
            throw new NullPointerException();
        }
        TYPE.set(dataBaseType);
    }
    // 获取数据源类型
    public static DataBaseType getDataBaseType() {
        DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();
        return dataBaseType;
    }
}

此时目录结构为

实现数据源的注入

@Configuration
public class DataSourceConfig {

    /**
     * 创建多个数据源 ds1 和 ds2
     * 此处的Primary,是设置一个Bean的优先级
     * @return
     */
    @Primary
    @Bean(name = "ds1")
    @ConfigurationProperties(prefix = "spring.datasource.ds1")
    public DataSource getDateSource1() {
        return DataSourceBuilder.create().build();
    }
    @Bean(name = "ds2")
    @ConfigurationProperties(prefix = "spring.datasource.ds2")
    public DataSource getDateSource2() {
        return DataSourceBuilder.create().build();
    }


    /**
     * 将多个数据源注入到DynamicDataSource
     * @param dataSource1
     * @param dataSource2
     * @return
     */
    @Bean(name = "dynamicDataSource")
    public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,
                                        @Qualifier("ds2") DataSource dataSource2) {
        Map<Object, Object> targetDataSource = new HashMap<>();
        targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);
        targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);
        DynamicDataSource dataSource = new DynamicDataSource();
        dataSource.setTargetDataSources(targetDataSource);
        dataSource.setDefaultTargetDataSource(dataSource1);
        return dataSource;
    }


    /**
     * 将动态数据源注入到SqlSessionFactory
     * @param dynamicDataSource
     * @return
     * @throws Exception
     */
    @Bean(name = "SqlSessionFactory")
    public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dynamicDataSource);
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));
        bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");
        return bean.getObject();
    }
}

  1. 创建多个数据源DataSource,ds1 和 ds2;
  2. 将ds1 和 ds2 数据源放入动态数据源DynamicDataSource;
  3. 将DynamicDataSource注入到SqlSessionFactory。

设置路由键

创建相关的mapper接口

public interface OrderMapper {
    void createOrder(Order order);
}
public interface StorageMapper {
    void decreaseStorage(Order order);
}

设置切面

这里设置相关的切面

@Component
@Aspect
public class DataSourceAop {
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")
    public void setDataSource1() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
    }
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")
    public void setDataSource2() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);
    }
}

在执行订单的操作时,切到数据源ds1,执行库存操作时,切到数据源ds2。

测试

public class OrderServiceImpl implements OrderService {
    @Override
    public void createOrder(Order order) {
        storageMapper.decreaseStorage(order);
        logger.info("库存已扣减,商品代码:{},购买数量:{}。创建订单中...",order.getCommodityCode(),order.getCount());
        orderMapper.createOrder(order);
    }
}

总目录结构


经过测试两个数据表都已经发生了变化

事物模式

这里事物模式,不能切换数据源,可以在,service上添加Transactional上实现。

为什么

这里为什么不能切换数据源
首先查看相关的注解

public class TransactionInterceptor{
    public Object invoke(MethodInvocation invocation) throws Throwable {
        //获取目标类
        Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());
        //事务调用
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }
}

创建事物

这里首先执行的是创建事物

protected Object doGetTransaction() {
    //DataSource的事务对象
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    //设置事务自动保存
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    //给事务对象设置ConnectionHolder
    ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

重点是给事务对象设置了ConnectionHolder属性,不过此时还是为空。

开启事物

这里主要是通过ThreadLocal将资源和当前的事务对象绑定,然后设置一些事务状态。

protected void doBegin(Object txObject, TransactionDefinition definition) {

    Connection con = null;
    //从数据源中获取一个连接
    Connection newCon = obtainDataSource().getConnection();
    //重新设置事务对象中的connectionHolder,此时已经引用了一个连接
    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    //将这个connectionHolder标记为与事务同步
    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    con = txObject.getConnectionHolder().getConnection();
    con.setAutoCommit(false);
    //激活事务活动状态
    txObject.getConnectionHolder().setTransactionActive(true);
    //将connection holder绑定到当前线程,通过threadlocal
    if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    }
    //事务管理器,激活事务同步状态
    TransactionSynchronizationManager.initSynchronization();
}

执行Mapper接口

这里执行mapper接口

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
                PersistenceExceptionTranslator exceptionTranslator) {

    //从ThreadLocal中获取SqlSessionHolder,第一次获取不到为空
    SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);

    //如果SqlSessionHolder为空,那也肯定获取不到SqlSession;
    //如果SqlSessionHolder不为空,直接通过它来拿到SqlSession
    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {
        return session;
    }
    //创建一个新的SqlSession
    session = sessionFactory.openSession(executorType);
    //如果当前线程的事务处于激活状态,就将SqlSessionHolder绑定到ThreadLocal
    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
    return session;
}

执行SQL之前,会先获取到SqlSession对象。

拿到SqlSession之后,就开始调用Mybatis的执行器,准备执行SQL语句。在执行SQL之前呢,当然需要先拿到Connection连接。

public Connection getConnection() throws SQLException {
    //通过数据源获取连接
    //比如我们配置了多数据源,此时还会正常切换
    if (this.connection == null) {
        openConnection();
    }
    return this.connection;
}

至此可以看openConnection方法,作用是获取一个连接,如果我们配置了多数据源,此时可以切换,如果添加事物,此时不能切换,因为多了if判断,此时返回的还是上一次保存的链接。

两道小题
1: Spring 如何保证事物:把业务放在同一个数据库连接中,一起提交,一起回滚。
2: 这枚做到在一个连接中,在线程中,通过数据库资源和当前事物相绑定实现。

事物模式,切换数据源

创建SqlSessionFactory

在文件中,添加,如下的两个方法

@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}

@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}

这里再添加 CustomSqlSessionTemplate 用来代替原来的SqlSessionTemplate,把SqlSessionFactory注入
在。同一个文件中添加该方法

@Bean(name = "sqlSessionTemplate")
public CustomSqlSessionTemplate sqlSessionTemplate(){
    Map<Object,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
    sqlSessionFactoryMap.put("ds1",factory1);
    sqlSessionFactoryMap.put("ds2",factory2);
    CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);
    customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
    customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);
    return customSqlSessionTemplate;
}

补充上自定义的CustomSqlSessionTemplate

public class CustomSqlSessionTemplate extends SqlSessionTemplate {
    @Override
    public SqlSessionFactory getSqlSessionFactory() {
        //当前数据源的名称
        String currentDsName = DataSourceType.getDataBaseType().name();
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        }
        return this.sqlSessionFactory;
    }
}

核心在于根据是否为空,获取到sqlsessionfactory

 if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        }

测试

修改完配置之后,我们把Service方法加上事务的注解,此时数据也是可以正常更新的。

@Transactional
@Override
public void createOrder(Order order) {
    storageMapper.decreaseStorage(order);
    orderMapper.createOrder(order);
}

XA协议分布式事务

这里借助 Atomikos 框架

引入maven

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

更改getDataSource

把DataSource对象改成AtomikosDataSourceBean。

public DataSource getDataSource(Environment env, String prefix, String dataSourceName){
    Properties prop = build(env,prefix);
    AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
    ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
    ds.setUniqueResourceName(dataSourceName);
    ds.setXaProperties(prop);
    return ds;
}

测试

这样配完之后,获取Connection连接的时候,拿到的其实是MysqlXAConnection对象。在提交或者回滚的时候,走的就是MySQL的XA协议了。

public void commit(Xid xid, boolean onePhase) throws XAException {
    //封装 XA COMMIT 请求
    StringBuilder commandBuf = new StringBuilder(300);
    commandBuf.append("XA COMMIT ");
    appendXid(commandBuf, xid);
    try {
        //交给MySQL执行XA事务操作
        dispatchCommand(commandBuf.toString());
    } finally {
        this.underlyingConnection.setInGlobalTx(false);
    }
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注