失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 多线程异步处理时的事务管理

多线程异步处理时的事务管理

时间:2020-01-12 09:07:38

相关推荐

多线程异步处理时的事务管理

分布式事务介绍:/p/183753774

前言:项目中在保证数据一致性的前提下还想提高执行效率,有什么好办法么?使用多线程肯定是首先想到的,但多线程之间的事务怎么保持一致呢?下面的代码就是在单个项目中使用多线程异步处理时的事务管理的方法。

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.TransactionDefinition;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.atomic.AtomicReference;import java.util.function.Supplier;/*** 多线程异步处理时的事务管理* 1.addFunction 添加要异步执行的方法* 2.execute方法中,使用全局的计数器和异常标记字段,统计个异步线程执行的结果*当所有异步线程执行完之后,根据异常标记字段判断是回滚还是提交事务。*/public class MultiThreadTransactionComponent {Logger logger= LoggerFactory.getLogger(this.getClass());private PlatformTransactionManager platformTransactionManager;private ThreadPoolExecutor threadPoolExecutor;private List<Supplier> supplierList=new ArrayList();// 创建执行计数器private CountDownLatch countDownLatch;// 是否存在异常AtomicReference<Boolean> isError = new AtomicReference<>(false);public MultiThreadTransactionComponent(PlatformTransactionManager transactionManager,ThreadPoolExecutor threadPoolExecutor){this.platformTransactionManager=transactionManager;this.threadPoolExecutor=threadPoolExecutor;}/*** 添加要异步执行的方法程序* @param supplier*/public void addFunction(Supplier supplier){supplierList.add(supplier);}public void execute(){countDownLatch=new CountDownLatch(supplierList.size());logger.info("【多线程事务】开始...");for(Supplier supplier:supplierList){this.threadPoolExecutor.submit(new TransactionRunnable(platformTransactionManager,supplier));}try {countDownLatch.await();if(isError.get()) {logger.error("【多线程事务】多线程执行失败,事务已回滚");// 主线程抛出自定义的异常throw new RuntimeException("多线程执行失败");}logger.info("【多线程事务】多线程执行完成,事务已提交");} catch (InterruptedException e) {logger.error("多线程执行失败");// 主线程抛出自定义的异常throw new RuntimeException("多线程执行失败"+e.getMessage());}}class TransactionRunnable implements Runnable{private PlatformTransactionManager platformTransactionManager;private Supplier supplier;public TransactionRunnable(PlatformTransactionManager platformTransactionManager, Supplier supplier) {this.platformTransactionManager=platformTransactionManager;this.supplier=supplier;}@Overridepublic void run() {DefaultTransactionDefinition def = new DefaultTransactionDefinition();def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);TransactionStatus transaction = this.platformTransactionManager.getTransaction(def);try {this.supplier.get();} catch (Exception e) {//设置错误标记isError.set(true);logger.error("【多线程事务】执行失败{}",e.getMessage());}countDownLatch.countDown();try{countDownLatch.await();if(isError.get()){//logger.info("【多线程事务-子线程】事务回滚");//事务回滚platformTransactionManager.rollback(transaction);}else {//logger.info("【多线程事务-子线程】事务提交");//事务提交mit(transaction);}}catch (InterruptedException e){e.printStackTrace();}}}}

使用示例:

@AutowiredPlatformTransactionManager platformTransactionManager;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;@Testpublic void testTransaction(){MultiThreadTransactionComponent mttc = new MultiThreadTransactionComponent(platformTransactionManager,threadPoolExecutor);for(int k=0;k<10;k++){int i = RandomUtils.nextInt(0, 5);int y=RandomUtils.nextInt(0,5);//添加要执行的业务代码mttc.addFunction(()->{System.out.println("当前线程:" + Thread.currentThread().getName());System.out.println(i%y); //除数为0时 执行失败MarketGeomUpLog marketGeomUpLog=new MarketGeomUpLog();marketGeomUpLog.setContent(i+"--"+y);marketGeomUpLogMapper.addLog(marketGeomUpLog);return 0;});}mttc.execute();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}

如果觉得《多线程异步处理时的事务管理》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。