Spring XML形式整合Seata
1、背景
项目中为传统的Spring项目,全XML配置,接手时服务与服务之间的调用没有做事务控制,遇网络问题等会偶发数据不一致问题。故开始改造,计划引入分布式事务,经过调研实际可使用的方案不多,Atomikos、TX-LCN、Seata、本地消息表 + 定时任务、包括RocketMQ的半消息事务等,此项目需要较实时的事务方案,故通过本地消息表、消息队列来实现事务的最终一致性的不太能够接受,TX-LCN Gayhub上已经停更很久了,故只能选择Atomikos和Seata,之前有尝试过使用Atomikos来做单体服务的多库事务的控制,效果还是很OK的,但国内公司应该更倾向于Seata,包括现在的公司总部在杭州,所有的组件都很偏向阿里系,同时Seata相关的帖子也较多,故选择Seata,但整合进项目还是费了一点点力气,千篇一律的Spring Boot、Cloud,加上Seata官网的教程也写的一言难尽,最后浅尝则止的研究了Spring Boot下部分核心源码完成了接入。
2、导包
引入seata-all的pom坐标,我这边采用的是1.5.2,这包里什么都有了包括和Spring的适配模块。
⚠️注意如果你项目中对Json处理的框架是Jackson,需要保证>=2.6.3的版本,具体是会在undo_log(seata的)表的数据拿出来进行反序列化的时候会报错,具体错误MapperFeature缺少PROPAGATE_TRANSIENT_MARKER
⚠️其次,如果数据库为Oracle且数据库连接池用的Druid,如果版本互不匹配还会有问题
1、Cause: java.sql.SQLException: 索引中丢失 IN 或 OUT 参数:: 5
换成ojdbc8,druid在seata中有使用其sqlparser故版本保持不变
2、java.sql.SQLException: 关闭的语句
druid的问题,我看在2.0版本似乎才修复,具体问题复现可参见 seata的issue
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.3</version>
</dependency>
<!-- oracle -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>19.3.0.0</version>
</dependency>
3、整合
启动数据源代理的注解,可以明显看到Import导入的配置类中会向Spring 的BeanDefinitionRegistry 工厂里注入一个SeataAutoDataSourceProxyCreator.class的bean定义信息
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(AutoDataSourceProxyRegistrar.class)
@Documented
public @interface EnableAutoDataSourceProxy {
/**
* Whether use JDK proxy instead of CGLIB proxy
*
* @return useJdkProxy
*/
boolean useJdkProxy() default false;
/**
* Specifies which datasource bean are not eligible for auto-proxying
*
* @return excludes
*/
String[] excludes() default {};
/**
* Data source proxy mode, AT or XA
*
* @return dataSourceProxyMode
*/
String dataSourceProxyMode() default "AT";
}
AutoDataSourceProxyRegistrar
中进行beandefinition注册的时候也会判断工厂中是否已经有注册过,所以我们也可以不用注解EnableAutoDataSourceProxy
直接在Spring XML中配置一个SeataAutoDataSourceProxyCreator.class的bean定义就好。
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(EnableAutoDataSourceProxy.class.getName());
boolean useJdkProxy = Boolean.parseBoolean(annotationAttributes.get(ATTRIBUTE_KEY_USE_JDK_PROXY).toString());
String[] excludes = (String[]) annotationAttributes.get(ATTRIBUTE_KEY_EXCLUDES);
String dataSourceProxyMode = (String) annotationAttributes.get(ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE);
// 会判断是否已经存在了
//register seataAutoDataSourceProxyCreator bean def
if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)) {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(SeataAutoDataSourceProxyCreator.class)
.addConstructorArgValue(useJdkProxy)
.addConstructorArgValue(excludes)
.addConstructorArgValue(dataSourceProxyMode)
.getBeanDefinition();
registry.registerBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR, beanDefinition);
}
}
SeataAutoDataSourceProxyCreator.class 继承了AbstractAutoProxyCreator ,这又是一个BeanPostProcessor后置处理器,它会在创建bean的时候在Object getEarlyBeanReference(Object bean, String beanName) throws BeansException
方法(是不是很眼熟,就是老八股三级缓存里面的)中调用wrapIfNecessary然后完成对默认数据源的动态代理,不要去Spring Dao层配置中手动定义数据源的配置,因为在SeataAutoDataSourceProxyAdvice
会去检查每次调用上下文是否处于一个全局事务中,或者是否加了@GlobalLock注解,如果是则调用seata的数据源包装对象DataSourceProxy对应的方法,如果不是,则放行调用原始的数据源方法。
3.1 我的部分配置
Spring-Seata.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ssdev="http://www.bsoft.com.cn/schema/ssdev"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 我采用了直接申明bean实例,而没有用注解,推荐注解 -->
<bean class="io.seata.spring.annotation.datasource.SeataAutoDataSourceProxyCreator">
<constructor-arg index="0" value="false"/>
<constructor-arg index="1" value=""/>
<constructor-arg index="2" value="AT"/>
</bean>
<!-- 核心 -->
<!-- 1、初始化TM(事务管理器) -->
<!-- 2、初始化RM(资源管理器) -->
<!-- 3、扫描Spring容器里所有的Bean,如果Bean上有指定的注解(@GlobalTransactional/@GlobalLock),则,对这个Bean进行AOP代理 -->
<bean id="globalTransactionScanner" class="io.seata.spring.annotation.GlobalTransactionScanner">
<!-- 每个服务唯一Id -->
<constructor-arg value="remote-service"/>
<!-- 事务组 -->
<constructor-arg value="default-tx-group"/>
</bean>
</beans>
其实以上的配置都是比较简单的,但是这个不同模块版本兼容是个比较麻烦的问题,费了一些时间进行处理。当然以上配置完成之后咋们的分布式事务配置依然还是没完成,仍然无法使用,因为不同微服务模块之间如服务A的方法通过RPC去调用服务B的方法,此时就算A、B都做了上面的seata的所有配置,A中的服务上也标记上了@GlobalTransactional
注解,seata也无法识别这次调用应该是一次分布式事务的调用,故这两个方法的事务还是各玩各的,都是本地事务。
关键点就是需要把seata需要的TX_XID
从分布式事务的发起方A要传递给B并绑定到执行B模块方法的那个线程上,具体实现要依据自己使用的RPC框架来做变更,我这边使用的Hessian做的封装,故直接在Header中添加上这个全局事务Id然后在我RPC远端拿到这个Header中的Id然后在方法的调用前和finally中分别通过反射调用doSomeThingBeforeMethodExecute
与doSomeThingFinal
即可完成xid的绑定,这样就能把A对B的调用纳入到一个事务中。
public class EnhanceMethod {
private static final Logger LOGGER = LoggerFactory.getLogger(EnhanceMethod.class);
private boolean bind = false;
private String rpcXid = null;
public void doSomeThingBeforeMethodExecute() {
// 判断Context中是否有分布式框架seata的全局事务xid
// 有就需要绑定给当前线程
// 获取当前事务 XID
String xid = RootContext.getXID();
rpcXid = Objects.toString(ContextUtils.getContext().get(RootContext.KEY_XID), "");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[ {} ] xid in RpcContext[ {} ]", xid, rpcXid);
}
bind = false;
// Consumer:把 XID 置入 RPC 的 attachment 中
if (xid != null) {
ContextUtils.getContext().put(RootContext.KEY_XID, xid);
} else {
// Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
if (rpcXid != null) {
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[ {} ] to RootContext", rpcXid);
}
}
}
}
public void doSomeThingFinal() {
// Provider:调用完成后,对 XID 的清理
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[ {} ] from RootContext", unbindXid);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
// 调用过程有新的事务上下文开启,则不能清除
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind [ {} ] back to RootContext", unbindXid);
}
}
}
}
public String getSeataXid() {
String xid = RootContext.getXID();
if (xid != null && !xid.isEmpty()) {
return xid;
}
return null;
}
}
我是基于公司现有RPC模块进行的一个增强改造,会单独把上面的EnhanceMethod
等打为一个rpc-seata的支持包,然后在原有的RPC中在类加载过程中来判断是否引入次模块的关键class用来决定是否开启seata相关的功能,基本能够满足业务需求,可能还有更好的解法,有更深入的需求可再深入研究。