Spring XML形式整合Seata

776

1、背景

​ 项目中为传统的Spring项目,全XML配置,接手时服务与服务之间的调用没有做事务控制,遇网络问题等会偶发数据不一致问题。故开始改造,计划引入分布式事务,经过调研实际可使用的方案不多,Atomikos、TX-LCN、Seata、本地消息表 + 定时任务、包括RocketMQ的半消息事务等,此项目需要较实时的事务方案,故通过本地消息表、消息队列来实现事务的最终一致性的不太能够接受,TX-LCN Gayhub上已经停更很久了,故只能选择Atomikos和Seata,之前有尝试过使用Atomikos来做单体服务的多库事务的控制,效果还是很OK的,但国内公司应该更倾向于Seata,包括现在的公司总部在杭州,所有的组件都很偏向阿里系,同时Seata相关的帖子也较多,故选择Seata,但整合进项目还是费了一点点力气,千篇一律的Spring Boot、Cloud,加上Seata官网的教程也写的一言难尽,最后浅尝则止的研究了Spring Boot下部分核心源码完成了接入。

2、导包

引入seata-all的pom坐标,我这边采用的是1.5.2,这包里什么都有了包括和Spring的适配模块。

⚠️注意如果你项目中对Json处理的框架是Jackson,需要保证>=2.6.3的版本,具体是会在undo_log(seata的)表的数据拿出来进行反序列化的时候会报错,具体错误MapperFeature缺少PROPAGATE_TRANSIENT_MARKER

⚠️其次,如果数据库为Oracle且数据库连接池用的Druid,如果版本互不匹配还会有问题

1、Cause: java.sql.SQLException: 索引中丢失 IN 或 OUT 参数:: 5 换成ojdbc8,druid在seata中有使用其sqlparser故版本保持不变

2、java.sql.SQLException: 关闭的语句 druid的问题,我看在2.0版本似乎才修复,具体问题复现可参见 seata的issue

    <dependency>
      <groupId>io.seata</groupId>
      <artifactId>seata-all</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.6.3</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.6.3</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.6.3</version>
    </dependency>

		    <!-- oracle -->
    <dependency>
      <groupId>com.oracle.database.jdbc</groupId>
      <artifactId>ojdbc8</artifactId>
      <version>19.3.0.0</version>
    </dependency>

3、整合

启动数据源代理的注解,可以明显看到Import导入的配置类中会向Spring 的BeanDefinitionRegistry 工厂里注入一个SeataAutoDataSourceProxyCreator.class的bean定义信息

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(AutoDataSourceProxyRegistrar.class)
@Documented
public @interface EnableAutoDataSourceProxy {
    /**
     * Whether use JDK proxy instead of CGLIB proxy
     *
     * @return useJdkProxy
     */
    boolean useJdkProxy() default false;

    /**
     * Specifies which datasource bean are not eligible for auto-proxying
     *
     * @return excludes
     */
    String[] excludes() default {};

    /**
     * Data source proxy mode, AT or XA
     *
     * @return dataSourceProxyMode
     */
    String dataSourceProxyMode() default "AT";
}

AutoDataSourceProxyRegistrar 中进行beandefinition注册的时候也会判断工厂中是否已经有注册过,所以我们也可以不用注解EnableAutoDataSourceProxy 直接在Spring XML中配置一个SeataAutoDataSourceProxyCreator.class的bean定义就好。

@Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(EnableAutoDataSourceProxy.class.getName());

        boolean useJdkProxy = Boolean.parseBoolean(annotationAttributes.get(ATTRIBUTE_KEY_USE_JDK_PROXY).toString());
        String[] excludes = (String[]) annotationAttributes.get(ATTRIBUTE_KEY_EXCLUDES);
        String dataSourceProxyMode = (String) annotationAttributes.get(ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE);

      	// 会判断是否已经存在了
        //register seataAutoDataSourceProxyCreator bean def
        if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)) {
            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
                .genericBeanDefinition(SeataAutoDataSourceProxyCreator.class)
                .addConstructorArgValue(useJdkProxy)
                .addConstructorArgValue(excludes)
                .addConstructorArgValue(dataSourceProxyMode)
                .getBeanDefinition();
            registry.registerBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR, beanDefinition);
        }
    }

​ SeataAutoDataSourceProxyCreator.class 继承了AbstractAutoProxyCreator ,这又是一个BeanPostProcessor后置处理器,它会在创建bean的时候在Object getEarlyBeanReference(Object bean, String beanName) throws BeansException方法(是不是很眼熟,就是老八股三级缓存里面的)中调用wrapIfNecessary然后完成对默认数据源的动态代理,不要去Spring Dao层配置中手动定义数据源的配置,因为在SeataAutoDataSourceProxyAdvice会去检查每次调用上下文是否处于一个全局事务中,或者是否加了@GlobalLock注解,如果是则调用seata的数据源包装对象DataSourceProxy对应的方法,如果不是,则放行调用原始的数据源方法。

3.1 我的部分配置

Spring-Seata.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ssdev="http://www.bsoft.com.cn/schema/ssdev"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">

		<!-- 我采用了直接申明bean实例,而没有用注解,推荐注解 -->
    <bean class="io.seata.spring.annotation.datasource.SeataAutoDataSourceProxyCreator">
        <constructor-arg index="0" value="false"/>
        <constructor-arg index="1" value=""/>
        <constructor-arg index="2" value="AT"/>
    </bean>

  	<!-- 核心 -->
  	<!-- 1、初始化TM(事务管理器) -->
  	<!-- 2、初始化RM(资源管理器) -->
  	<!-- 3、扫描Spring容器里所有的Bean,如果Bean上有指定的注解(@GlobalTransactional/@GlobalLock),则,对这个Bean进行AOP代理 -->
    <bean id="globalTransactionScanner" class="io.seata.spring.annotation.GlobalTransactionScanner">
      	<!-- 每个服务唯一Id -->
        <constructor-arg value="remote-service"/>
      	<!-- 事务组 -->
        <constructor-arg value="default-tx-group"/>
    </bean>
</beans>

​ 其实以上的配置都是比较简单的,但是这个不同模块版本兼容是个比较麻烦的问题,费了一些时间进行处理。当然以上配置完成之后咋们的分布式事务配置依然还是没完成,仍然无法使用,因为不同微服务模块之间如服务A的方法通过RPC去调用服务B的方法,此时就算A、B都做了上面的seata的所有配置,A中的服务上也标记上了@GlobalTransactional注解,seata也无法识别这次调用应该是一次分布式事务的调用,故这两个方法的事务还是各玩各的,都是本地事务。

​ 关键点就是需要把seata需要的TX_XID从分布式事务的发起方A要传递给B并绑定到执行B模块方法的那个线程上,具体实现要依据自己使用的RPC框架来做变更,我这边使用的Hessian做的封装,故直接在Header中添加上这个全局事务Id然后在我RPC远端拿到这个Header中的Id然后在方法的调用前和finally中分别通过反射调用doSomeThingBeforeMethodExecutedoSomeThingFinal即可完成xid的绑定,这样就能把A对B的调用纳入到一个事务中。

public class EnhanceMethod {
    private static final Logger LOGGER = LoggerFactory.getLogger(EnhanceMethod.class);

    private boolean bind = false;
    private String rpcXid = null;

    public void doSomeThingBeforeMethodExecute() {
        // 判断Context中是否有分布式框架seata的全局事务xid
        // 有就需要绑定给当前线程
        // 获取当前事务 XID
        String xid = RootContext.getXID();

        rpcXid = Objects.toString(ContextUtils.getContext().get(RootContext.KEY_XID), "");
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[ {} ] xid in RpcContext[ {} ]", xid, rpcXid);
        }
        bind = false;
        // Consumer:把 XID 置入 RPC 的 attachment 中
        if (xid != null) {
            ContextUtils.getContext().put(RootContext.KEY_XID, xid);
        } else {
            // Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[ {} ] to RootContext", rpcXid);
                }
            }
        }
    }

    public void doSomeThingFinal() {
        // Provider:调用完成后,对 XID 的清理
        if (bind) {
            String unbindXid = RootContext.unbind();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("unbind[ {} ] from RootContext", unbindXid);
            }
            if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                LOGGER.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                // 调用过程有新的事务上下文开启,则不能清除
                if (unbindXid != null) {
                    RootContext.bind(unbindXid);
                    LOGGER.warn("bind [ {} ] back to RootContext", unbindXid);
                }
            }
        }
    }

    public String getSeataXid() {
        String xid = RootContext.getXID();
        if (xid != null && !xid.isEmpty()) {
            return xid;
        }

        return null;
    }
}

​ 我是基于公司现有RPC模块进行的一个增强改造,会单独把上面的EnhanceMethod等打为一个rpc-seata的支持包,然后在原有的RPC中在类加载过程中来判断是否引入次模块的关键class用来决定是否开启seata相关的功能,基本能够满足业务需求,可能还有更好的解法,有更深入的需求可再深入研究。