失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 达梦数据库DM8支持Seata事务框架

达梦数据库DM8支持Seata事务框架

时间:2019-02-28 09:35:15

相关推荐

达梦数据库DM8支持Seata事务框架

1. 用例说明

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

仓储服务:对给定的商品扣除仓储数量。订单服务:根据采购需求创建订单。帐户服务:从用户帐户中扣除余额。

1.1 项目的架构图

1.2 初始项目搭建

1. 环境介绍

每个模块一个库,也就是需要4个库。

2. 初始表数据

-- 1. business-xa模块所在数据库 新建BIZ_LOG表CREATE TABLE "SYSDBA"."BIZ_LOG"("ID_" VARCHAR(64) NOT NULL,"OP_DATETIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,CLUSTER PRIMARY KEY("ID_")) STORAGE(ON "MAIN", CLUSTERBTR) ;-- 2. storage-xa模块所在数据库 新建STORAGE_TBL表并新增一条数据CREATE TABLE "SYSDBA"."STORAGE_TBL"("ID" INT NOT NULL,"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,"COUNT" INT DEFAULT '0',CLUSTER PRIMARY KEY("ID"),CONSTRAINT "STORAGE_TBL_COMMODITY_CODE" UNIQUE("COMMODITY_CODE")) STORAGE(ON "MAIN", CLUSTERBTR) ;insert into "SYSDBA"."STORAGE_TBL"("ID", "COMMODITY_CODE", "COUNT") VALUES(1,'C100000','10000');-- 3. order-xa模块所在数据库 新建ORDER_TBL表CREATE TABLE "SYSDBA"."ORDER_TBL"("ID" BIGINT NOT NULL,"USER_ID" VARCHAR(255) DEFAULT NULL,"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,"COUNT" INT DEFAULT '0',"MONEY" INT DEFAULT '0',"CREATE_TIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;-- 4. account-xa模块所在数据库 新建ACCOUNT_TBL表并新增一条数据CREATE TABLE "SYSDBA"."ACCOUNT_TBL"("ID" INT NOT NULL,"USER_ID" VARCHAR(255) DEFAULT NULL,"MONEY" INT DEFAULT '0',CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;insert into "SYSDBA"."ACCOUNT_TBL"("ID", "USER_ID", "MONEY") VALUES(1, 'U100000', 600);

3. 搭建项目

启动seata-xa-original项目配置项目中每个模块连接数据库的连接在application.properties文件中

#DMspring.datasource.url=jdbc:dm://127.0.0.1:5238/spring.datasource.driver-class-name=dm.jdbc.driver.DmDriverspring.datasource.username=SYSDBAspring.datasource.password=SYSDBA

4. 存在的问题

每个模块在不同的库,没法保证事务的一致性。所以打算采用Seata分布式事务框架

资料包中seata-xa-original.zip为这块的代码包

2.DM数据库支持Seata事务

2.1 流程分析

因Seata事务框架的AT模式还不支持Dm数据库但支持Oracle数据库,所以整合过程中需修改数据库添加对Oracle的支持。

2.2 修改DM数据库的配置

1.更新jdbc驱动

项目中使用dm的jdbc版本看图。不建议使用的jdbc版本比这个版本低。

2.修改dm.svc.conf配置文件

## 添加下面这两个属性 第一个是兼容mysql 第二个事屏蔽关键字COMPATIBLE_MODE=(oracle)KEY_WORDS=(context)

3.修改dm.ini文件

所有数据库实例修都需要改

COMPATIBLE_MODE= 2 #Server compatible mode, 0:none, 1:SQL92, 2:Oracle, 3:MS SQL Server, 4:MySQL, 5:DM6, 6:Teradata

以上配置修改完以后,需要重启数据库

2.3 搭建TC端

介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌⼊在业务应⽤中的,⽽ TC 则是⼀个独⽴服务。

下载Server端

最新版本下载地址:/seata/seata/releases

官网下载:1.3.0版本的下载地址:/seata/seata/releases/tag/v1.3.0在资料包种已经下载好了seata-server-1.3.0.zip解压即可。

配置Server端

Server端存储模式(store.mode)现有file、db、redis三种(后续将引入raft,mongodb),file模式无需改动,直接启动即可,

file模式为单机模式,全局事务会话信息内存中读写并持久化本地文件bin目录下的root.data,性能较高;

db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些;

redis模式Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置合适当前场景的redis持久化配置.

File模式直连配置

主要关注conf文件夹下的registry.conf文件以及file.conf文件。

采用File直连模式registry.conf文件无需改动,需要在file.conf中添加事务分组。

registry.conf

registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "file"file {name = "file.conf"}}config {# file、nacos 、apollo、zk、consul、etcd3type = "file"file {name = "file.conf"}}

file.conf

在server属性中新增这段值vgroup_mapping.seata-xa=“default”。即添加事务分组。需与TM/RM端配置的一致。

transport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = false#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThreadPrefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"}## transaction log store, only used in server sidestore {## store mode: file、dbmode = "file"## file store propertyfile {## store location dirdir = "sessionStore"# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptionsmaxBranchSessionSize = 16384# globe session size , if exceeded throws exceptionsmaxGlobalSessionSize = 512# file buffer size , if exceeded allocate new bufferfileWriteBufferCacheSize = 16384# when recover batch read sizesessionReloadReadSize = 100# async, syncflushDiskMode = async}}## server configuration, only used in server sideserver {recovery {#schedule committing retry period in millisecondscommittingRetryPeriod = 1000#schedule asyn committing retry period in millisecondsasynCommittingRetryPeriod = 1000#schedule rollbacking retry period in millisecondsrollbackingRetryPeriod = 1000#schedule timeout retry period in millisecondstimeoutRetryPeriod = 1000}undo {logSaveDays = 7#schedule delete expired undo_log in millisecondslogDeletePeriod = 86400000}#check authenableCheckAuth = true#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanentmaxCommitRetryTimeout = "-1"maxRollbackRetryTimeout = "-1"rollbackRetryTimeoutUnlockEnable = false# 新增的这段,设置事务分组vgroup_mapping.seata-xa="default"}## metrics configuration, only used in server sidemetrics {enabled = falseregistryType = "compact"# multi exporters use comma dividedexporterList = "prometheus"exporterPrometheusPort = 9898}

2.4 TM/RM端整合Seata

Seata事务框架在AT 模式下在RM端需要UNDO_LOG 表,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,⽤于回滚处理,所以在所有数据库中分别执⾏.

CREATE TABLE "SYSDBA"."UNDO_LOG"("ID" BIGINT NOT NULL,"BRANCH_ID" BIGINT NOT NULL,"XID" VARCHAR(100) NOT NULL,"CONTEXT" VARCHAR(150) NOT NULL,"ROLLBACK_INFO" BLOB NOT NULL,"LOG_STATUS" INT NOT NULL,"LOG_CREATED" DATETIME(6) NOT NULL,"LOG_MODIFIED" DATETIME(6) NOT NULL,NOT CLUSTER PRIMARY KEY("ID"),CONSTRAINT "UX_UNDO_LOG" UNIQUE("XID", "BRANCH_ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;

RM(事务管理器)端整合Seata与TM(事务管理器)端步骤类似,只不过不需要在⽅法添加@GlobalTransactional注解,针对我们⼯程lagou_bussiness是事务的发起者,所以是TM端,其它⼯程为RM端. 所以我们只需要在lagou_common_db完成前4步骤即可

1.引入seata依赖

修改父pom.xml文件,锁定seata的版本

<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Greenwich.RELEASE</version><type>pom</type><scope>import</scope></dependency><!--SCA --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.1.0.RELEASE</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.22</version></dependency><!--seata版本管理, ⽤于锁定⾼版本的seata --><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>1.3.0</version></dependency></dependencies></dependencyManagement>

修改每个模块的pom.xml文件引入seata的依赖

因为原来的是旧版本,所以需要引入新的seata的版本依赖

<!--seata依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-seata</artifactId><!--排除低版本seata依赖--><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-all</artifactId></exclusion></exclusions></dependency><!--添加⾼版本seata依赖--><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>1.3.0</version></dependency>

2.引入注册中心文件

每个模块的resources目录下引入Seata事务Client客户端的registry.conf文件。又因为注册中心采用的直连模式,所以还需要引入file.conf

这两个文件可以参考资料包。可以看图:

3.配置连接事务组

新增每个模块的事务组,在每个项目的application.properties中添加以下信息

spring.cloud.alibaba.seata.txServiceGroup=seata-xalogging.level.io.seata=debuglogging.level.io.seata.core.rpc=warn

4.修改数据源url兼容Oracle

每个模块兼容Oracle数据库,所以需要修改数据源的Url连接。

修改每个模块的url连接。下面是一个例子,注意端口号

spring.datasource.url=jdbc:oracle:thin:@localhost:5237spring.datasource.driverClassName=dm.jdbc.driver.DmDriverspring.datasource.username=SYSDBAspring.datasource.password=SYSDBA

5.添加seata代理数据源

seata事务框架的AT模式需要操作数据源,所以我们把数据源对象代理给seata框架。

在每个模块Application启动类同目录下新建数据源对象,修改每个模块的Application类的扫描。

下面是修改Storage模块的例子

@Configurationpublic class StorageDataSourceConfiguration {/*** 使⽤druid连接池** @return*/@Bean@ConfigurationProperties(prefix = "spring.datasource")public DataSource druidDataSource() {return new DruidDataSource();}/*** 设置数据源代理-,完成分⽀事务注册/事务提交与回滚等操作** @param druidDataSource* @return*/@Primary //设置⾸选数据源对象@Bean("dataSource")public DataSourceProxy dataSource(DataSource druidDataSource) {return new DataSourceProxy(druidDataSource);}}

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,scanBasePackages = "com.dameng")public class StorageXAApplication {public static void main(String[] args) {SpringApplication.run(StorageXAApplication.class, args);}}

6.修改seata源码对达梦的兼容

在每个模块中配置新建目录com.dameng.rm.datasource.util以及XAUtils类。修改每个模块的启动类,让其启动项目时替换掉源代码包中的XAUtils类。

package com.dameng.rm.datasource.util;import com.alibaba.druid.util.JdbcUtils;import com.alibaba.druid.util.MySqlUtils;import com.alibaba.druid.util.PGUtils;import io.seata.rm.BaseDataSourceResource;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.sql.XAConnection;import javax.transaction.xa.XAException;import java.lang.reflect.Constructor;import java.sql.Connection;import java.sql.Driver;import java.sql.SQLException;public class XAUtils {private static final Logger LOGGER = LoggerFactory.getLogger(XAUtils.class);public static String getDbType(String jdbcUrl, String driverClassName) {return JdbcUtils.getDbType(jdbcUrl, driverClassName);}public static XAConnection createXAConnection(Connection physicalConn, BaseDataSourceResource dataSourceResource) throws SQLException {return createXAConnection(physicalConn, dataSourceResource.getDriver(), dataSourceResource.getDbType());}public static XAConnection createXAConnection(Connection physicalConn, Driver driver, String dbType) throws SQLException {if (JdbcUtils.ORACLE.equals(dbType)) {try {// /alibaba/druid/issues/3707// before Druid issue fixed, just make ORACLE XA connection in my way.// return OracleUtils.OracleXAConnection(physicalConn);String physicalConnClassName = physicalConn.getClass().getName();if ("oracle.jdbc.driver.T4CConnection".equals(physicalConnClassName)) {return createOracleXAConnection(physicalConn, "oracle.jdbc.driver.T4CXAConnection");} else {return createOracleXAConnection(physicalConn, "oracle.jdbc.xa.client.OracleXAConnection");}} catch (XAException xae) {throw new SQLException("create xaConnection error", xae);}}if (JdbcUtils.DM.equals(dbType)) {try {//String physicalConnClassName = physicalConn.getClass().getName();//if ("dm.jdbc.driver.DmdbConnection".equals(physicalConnClassName)) {//return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbConnection");//} else {//return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbXAConnection");//}return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbXAConnection");} catch (XAException xae) {throw new SQLException("create xaConnection error", xae);}}if (JdbcUtils.MYSQL.equals(dbType) || JdbcUtils.MARIADB.equals(dbType)) {return MySqlUtils.createXAConnection(driver, physicalConn);}if (JdbcUtils.POSTGRESQL.equals(dbType)) {return PGUtils.createXAConnection(physicalConn);}throw new SQLException("xa not support dbType: " + dbType);}private static XAConnection createOracleXAConnection(Connection physicalConnection, String xaConnectionClassName) throws XAException, SQLException {try {Class xaConnectionClass = Class.forName(xaConnectionClassName);Constructor<XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);constructor.setAccessible(true);return constructor.newInstance(physicalConnection);} catch (Exception e) {LOGGER.warn("Failed to create Oracle XA Connection " + xaConnectionClassName + " on " + physicalConnection);if (e instanceof XAException) {throw (XAException) e;} else {throw new SQLException(e);}}}private static XAConnection createDMXAConnection(Connection physicalConnection, String xaConnectionClassName) throws XAException, SQLException {try {Class xaConnectionClass = Class.forName(xaConnectionClassName);Constructor<XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);constructor.setAccessible(true);return constructor.newInstance(physicalConnection);} catch (Exception e) {LOGGER.warn("Failed to create DM XA Connection " + xaConnectionClassName + " on " + physicalConnection);if (e instanceof XAException) {throw (XAException) e;} else {throw new SQLException(e);}}}}

在启动类上使用@ComponentScan注解,使其项目启动加载时使用我们本机修改的类

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,scanBasePackages = "com.dameng")@ComponentScan(excludeFilters = {@ComponentScan.Filter(type =FilterType.ASSIGNABLE_TYPE, classes = {XAUtils.class})})public class StorageXAApplication {public static void main(String[] args) {SpringApplication.run(StorageXAApplication.class, args);}}

7.添加注解@GlobalTransactional

Business为Seata事务的TM,所以在方法上添加@GlobalTransactional注解

@GlobalTransactional(name = "sale", timeoutMills = 100000, rollbackFor = Exception.class)public void execWork(String USER_ID, String Storage_Code, Integer orderCount) {//记录本地事务int update = jdbcTemplate.update("insert into BIZ_LOG(id_) values(?)", UUID.randomUUID().toString());//扣减商品库存String storageResult = storageFeignClient.consumeStorage(Storage_Code, orderCount);if (FAIL.equals(storageResult)) {throw new RuntimeException("商品报错回滚...");}//扣减订单库存String orderResult = orderFeignClient.createOrder(USER_ID, Storage_Code, orderCount);if (FAIL.equals(orderResult)) {throw new RuntimeException("订单报错回滚...");}}

2.5 启动项目

1. 启动 Seata服务(TC端)

进入到seata的bin目录seata\bin下管理员执行seata-server.bat文件即可。

服务启动后默认端口为8091。

注意:

因为采用直连模式,会在bin目录下生成sessionStore文件,每次启动前建议删除。

有时候客户端会卡住,按一下回车键刷新下日志就好了

2. 启动business-xa服务(TM端)

启动 com.dameng.sample.BusinessXAApplication 服务。

3. 启动storage-xa服务 (RM端)

启动 com.dameng.sample.StorageXAApplication服务。

4. 启动order-xa服务(RM端)

启动 com.dameng.sample.OrderXAApplication服务。

5. 启动account-xa服务(RM端)

启动 com.dameng.sample.AccountXAApplication服务。

项目启动后,查看seata服务端日志,查看服务是否已经注册到Seata服务中

6. 测试

测试成功

在浏览器中输入http://localhost:8084/execWork?orderCount=1,查看库存,订单,金额是否正常。

测试回滚

修改order模块中service代码如图,使代码报异常。

资料包中seata-xa-final.zip为整合以后的包

问题整理

1. endpoint format should like ip:port

java.lang.IllegalArgumentException: endpoint format should like ip:portat io.seata.discovery.registry.FileRegistryServiceImpl.lookup(FileRegistryServiceImpl.java:95) ~[seata-all-1.3.0.jar:1.3.0]at io.seata.tyClientChannelManager.getAvailServerList(NettyClientChannelManager.java:217) ~[seata-all-1.3.0.jar:1.3.0]at io.seata.tyClientChannelManager.reconnect(NettyClientChannelManager.java:162) ~[seata-all-1.3.0.jar:1.3.0]at io.seata.ty.RmNettyRemotingClient.registerResource(RmNettyRemotingClient.java:181) [seata-all-1.3.0.jar:1.3.0]at io.seata.rm.AbstractResourceManager.registerResource(AbstractResourceManager.java:121) [seata-all-1.3.0.jar:1.3.0]at io.seata.rm.datasource.DataSourceManager.registerResource(DataSourceManager.java:146) [seata-all-1.3.0.jar:1.3.0]

解决办法

seata(TC端)事务组与java模块(RM端)事务组不同需自己检查。seata(TC端)的file.conf文件的server属性vgroupMapping配置名有问题自行检查修改。

2. io.seata.core.exception.TmTransactionException: RPC timeout

解决办法

即便已经按照顺序启动,seata也提示注册。因为电脑内存等原因实际情况还是没有注册上。重新启动一遍服务即可。seata配置有问题,seata控制台可能都没有注册上。检查seata的配置文件以及该服务的配置是否正确。

3.java.sql.SQLException: not support oracle driver 8.1

1.达梦数据库版本在DM8 1.2.38以下需要更换为8月以后的版本。

2. 数据库在linux,应用系统在window的IDEA中,会出现window项目启动的时候 驱动包识别的是本台机器上的,识别不到数据库服务器上的dm.svc.conf的配置内容,所以window需要放到需要在指定目录下放dmsvc.conf文件。

4.无法解析的成员访问表达式[UNDO_LOG_SEQ.NEXTVAL]

工具包中整理初始化sql的时候,初始化序列的SQL语句忘记整理了。

CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;

在数据库中在执行这句SQL语句就好了。

资料包

链接:/s/1Tspfh_AH_al_qTzOoQgm0Q 提取码:g03w

-- 查询blob字段的值select utl_raw.cast_to_varchar2(dbms_lob.substr(ROLLBACK_INFO)) from "SYSDBA"."UNDO_LOG";

达梦支持

=======================================

有任何问题请到技术社区反馈。

24小时免费服务热线:400 991 6599

达梦技术社区:

如果觉得《达梦数据库DM8支持Seata事务框架》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。