SpringCloud整合Seata 实现分布式事务
Seata 简介和工作原理
看seata官网吧
安装Seata
使用浏览器访问“/seata/seata/releases/tag/v1.4.2”,在 Seata Server 下载页面分别下载“seata-server-1.4.2.zip”,如下图。 解压到本地,目录如下修改conf目录下的file.conf和registry.conf
registry 我这里采用的是nacos,所以值粘贴了naocs的内容
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "nacos"nacos {application = "seata-server"serverAddr = "127.0.0.1:8848"group = "SEATA_GROUP"namespace = "" #可以不填写cluster = "default"username = ""#必填password = ""#必填}}
file (事务日志存储,仅在seata-server中使用 ),我这里使用的db,所以值粘贴了db的内容
TC Server运行环境部署
我们先部署单机环境的 Seata TC Server,用于学习或测试,在生产环境中要部署集群环境;
因为TC需要进行全局事务和分支事务的记录,所以需要对应的存储,目前,TC有三种存储模式( store.mode ):
file模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高;
db模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点;
redis模式:解决db存储的性能问题;
这里选择db模式
store {## store mode: file、db、redismode = "db"## rsa decryption public keypublicKey = ""## database store propertydb {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.dbType = "mysql"driverClassName = "com.mysql.cj.jdbc.Driver"## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection paramurl = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"user = "root"password = "root"minConn = 5maxConn = 100globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100maxWait = 5000}}
因为采用的db所以需要创建seata所需的数据库
-- 数据库名要与file.conf中设置数据库名保持一致 CREATE TABLE IF NOT EXISTS `global_table`(`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status`TINYINTNOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(32),`transaction_name`VARCHAR(128),`timeout` INT,`begin_time`BIGINT,`application_data`VARCHAR(2000),`gmt_create`DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),KEY `idx_transaction_id` (`transaction_id`)) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store BranchSession dataCREATE TABLE IF NOT EXISTS `branch_table`(`branch_id` BIGINT NOT NULL,`xid`VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified`DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`)) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store lock dataCREATE TABLE IF NOT EXISTS `lock_table`(`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(128),`transaction_id` BIGINT,`branch_id`BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name`VARCHAR(32),`pk` VARCHAR(36),`gmt_create`DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_branch_id` (`branch_id`)) ENGINE = InnoDBDEFAULT CHARSET = utf8;CREATE TABLE IF NOT EXISTS `distributed_lock`(`lock_key` CHAR(20) NOT NULL,`lock_value`VARCHAR(20) NOT NULL,`expire` BIGINT,primary key (`lock_key`)) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
上面都配置好之后运行脚本即可(nacos已经启动好)
Windows双击 bat
Linux 运行 sh,需要传参数(官网上有)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qIie7Kxb-1642733971994)(assets/微信图片_101439.png)]
nacos上就可以看到seata服务了
整合Java使用
需求现在有两个服务,一个是订单服务(orderServer) 和 库存服务(stockServer);由订单服务调用库存服务,分别往不同的两个库中插入数据。
准备工作
他采用的是nacos当做配置中心来使用,我这里只用了nacos当做注册中,未使用配置中心
调入依赖
<--版本需要跟seata服务版本一致,我的是1.4.2--><properties><alibaba.seata.version>2.2.0.RELEASE</alibaba.seata.version><seata.version>1.4.2</seata.version></properties><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-seata</artifactId><version>${alibaba.seata.version}</version><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>${seata.version}</version></dependency>
编些application.yml文件
需要使用seata的服务都需要配置,基本上都一样,只是这个tx-service-group
不同。这里只是order服务的
# order 服务的seata:enabled: true# 事务协调器组(下文中resources下`file.conf`中`tx-service-group`保持一致)tx-service-group: order-server-groupapplication-id: order-id# 默认开启数据源代理enable-auto-data-source-proxy: true# 代理模式默认ATdata-source-proxy-mode: AT
编写配置文件
在resources下创建file.conf
和registry.conf
registry.conf
跟seata服务中的一样,复制过来就可以了
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "nacos"nacos {application = "seata-server"serverAddr = "127.0.0.1:8848"group = "SEATA_GROUP"namespace = ""cluster = "default"username = ""password = ""}}
file.conf
这个就不同
需要注意的地方这个vgroupmapping
后面的内容跟上文application.yml文件中的tx-service-group
一致。"="
号后面的需要跟registry.conf
中nacos下面的cluster
相同
transport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = true#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThread-prefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"}service {#这里注意,等号前后都是配置,前面是yml里配置的事务组,后面是register.conf里定义的seata-servervgroupMapping.order-server-group = "default"#only support when registry.type=file, please don't set multiple addressesseata_tc_server.grouplist = "127.0.0.1:8091"#degrade, current not supportenableDegrade = false#disable seatadisableGlobalTransaction = false}client {rm {asyncCommitBufferLimit = 10000lock {retryInterval = 10retryTimes = 30retryPolicyBranchRollbackOnConflict = true}reportRetryCount = 5tableMetaCheckEnable = falsereportSuccessEnable = false}tm {commitRetryCount = 5rollbackRetryCount = 5}undo {dataValidation = truelogSerialization = "jackson"logTable = "undo_log"}log {exceptionRate = 100}}
各个微服务的数据库添加undo_log
表
CREATE TABLE IF NOT EXISTS `undo_log`(`branch_id`BIGINT NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOBNOT NULL COMMENT 'rollback info',`log_status` INT(11)NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
在启动类上添加@EnableAutoDataSourceProxy
在调用方加上@GlobalTransactional
服务启动后seata中会有响应的显示
代码
order服务service代码@Override@GlobalTransactionalpublic void createOrder() {System.out.println("事务id:"+RootContext.getXID());//调用添加方法,往数据库插入一条数据addOrder();//远程调用stock服务stockFeign.add();}
stock服务service代码
@Override@Transactionalpublic void addStock() throws TransactionException {System.out.println("事务ID:"+ RootContext.getXID());//往数据库插入一条数据Stock stock = new Stock();stock.setSkuId(1);stock.setUserName("wsl");baseMapper.insert(stock);//模拟异常//throw new RuntimeException("阿巴阿巴...");}
未出现异常效果
事务执行结果为Committed
,查看两个数据库都已经添加成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W8GdvZfb-164273397)(assets/printle-result-image.png)]
当stock服务出现异常(有 BUG)
@Override@Transactionalpublic void addStock() throws TransactionException {System.out.println("事务ID:"+ RootContext.getXID());Stock stock = new Stock();stock.setSkuId(1);stock.setUserName("wsl");baseMapper.insert(stock);//模拟异常throw new RuntimeException("阿巴阿巴...");}
查询order服务居然发现事务状态是提交的,查看数据库发现order服务正常插入了数据,stock服务因抛出了异常未插入数据是正常的。
修复BUG
百度了一下,发现seata使用feign调用的时候,如果服务中有@ControllerAdvice,异常统一处理类,或者feign降级就会出现失效的情况,但异常处理也是很重要的。所以这里采用AOP的方式来进行手动回滚,在stock服务中添加了如下代码
@Component@Aspectpublic class RollBackAspect {@Pointcut(value = "@annotation(org.springframework.transaction.annotation.Transactional)")public void pointcut(){}@AfterThrowing(pointcut = "pointcut()",throwing = "e")public void rollBack(Throwable e) throws TransactionException {if(StringUtils.isNotEmpty(RootContext.getXID())) {//回滚事务GlobalTransactionContext.reload(RootContext.getXID()).rollback();System.out.println("事务回滚了");}}}
再次查看效果
这次就正常了,两边数据库都没有插入数据
如果觉得《最详细SpringCloud+nacos整合Seata1.4.2 实现分布式事务》对你有帮助,请点赞、收藏,并留下你的观点哦!