失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 炸锅了 Java 多线程批量操作 居然有人不做事务控制?

炸锅了 Java 多线程批量操作 居然有人不做事务控制?

时间:2024-05-19 00:51:49

相关推荐

炸锅了 Java 多线程批量操作 居然有人不做事务控制?

点击下方“Java编程鸭”关注并标星

更多精彩 第一时间直达

来源:/qq273766764/article/

details/119972911

项目代码基于:MySql 数据,开发框架为:SpringBoot、Mybatis

开发语言为:Java8

项目代码:/john273766764/springboot-mybatis-threads

文章目录

前言

循环操作的代码

使用手动事务的操作代码

尝试多线程进行数据修改

基于两个CountDownLatch控制多线程事务提交

基于TransactionStatus集合来控制多线程事务提交

使用union连接多个select实现批量update

总结

前言

公司业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。

具体操作如下:

一、循环操作的代码

先写一个最简单的for循环代码,看看耗时情况怎么样。

/****一条一条依次对50000条数据进行更新操作*耗时:2m27s,1m54s*/@TestvoidupdateStudent(){List<Student>allStudents=studentMapper.getAll();allStudents.forEach(s->{//更新教师信息Stringteacher=s.getTeacher();StringnewTeacher="TNO_"+newRandom().nextInt(100);s.setTeacher(newTeacher);studentMapper.update(s);});}

循环修改整体耗时约 1分54秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。

二、使用手动事务的操作代码

修改后的代码如下:

@AutowiredprivateDataSourceTransactionManagerdataSourceTransactionManager;@AutowiredprivateTransactionDefinitiontransactionDefinition;/***由于希望更新操作一次性完成,需要手动控制添加事务*耗时:24s*从测试结果可以看出,添加事务后插入数据的效率有明显的提升*/@TestvoidupdateStudentWithTrans(){List<Student>allStudents=studentMapper.getAll();TransactionStatustransactionStatus=dataSourceTransactionManager.getTransaction(transactionDefinition);try{allStudents.forEach(s->{//更新教师信息Stringteacher=s.getTeacher();StringnewTeacher="TNO_"+newRandom().nextInt(100);s.setTeacher(newTeacher);studentMapper.update(s);});mit(transactionStatus);}catch(Throwablee){dataSourceTransactionManager.rollback(transactionStatus);throwe;}}

添加手动事务操控制后,整体耗时约 24秒,这相对于自动事务提交的代码,快了约5倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。

三、尝试多线程进行数据修改

添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。

先添加一个Service将批量修改操作整合一下,具体代码如下:

StudentServiceImpl.java

@ServicepublicclassStudentServiceImplimplementsStudentService{@AutowiredprivateStudentMapperstudentMapper;@AutowiredprivateDataSourceTransactionManagerdataSourceTransactionManager;@AutowiredprivateTransactionDefinitiontransactionDefinition;@OverridepublicvoidupdateStudents(List<Student>students,CountDownLatchthreadLatch){TransactionStatustransactionStatus=dataSourceTransactionManager.getTransaction(transactionDefinition);System.out.println("子线程:"+Thread.currentThread().getName());try{students.forEach(s->{//更新教师信息//Stringteacher=s.getTeacher();StringnewTeacher="TNO_"+newRandom().nextInt(100);s.setTeacher(newTeacher);studentMapper.update(s);});mit(transactionStatus);threadLatch.countDown();}catch(Throwablee){e.printStackTrace();dataSourceTransactionManager.rollback(transactionStatus);}}}

批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:

@AutowiredprivateDataSourceTransactionManagerdataSourceTransactionManager;@AutowiredprivateTransactionDefinitiontransactionDefinition;@AutowiredprivateStudentServicestudentService;/***对用户而言,27s任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度*预计创建10个线程,每个线程进行5000条数据修改操作*耗时统计* 1 线程数:1 耗时:25s* 2 线程数:2 耗时:14s* 3 线程数:5 耗时:15s* 4 线程数:10耗时:15s* 5 线程数:100耗时:15s* 6 线程数:200耗时:15s* 7 线程数:500耗时:17s* 8 线程数:1000耗时:19s* 8 线程数:2000耗时:23s* 8 线程数:5000耗时:29s*/@TestvoidupdateStudentWithThreads(){//查询总数据List<Student>allStudents=studentMapper.getAll();//线程数量finalIntegerthreadCount=100;//每个线程处理的数据量finalIntegerdataPartionLength=(allStudents.size()+threadCount-1)/threadCount;//创建多线程处理任务ExecutorServicestudentThreadPool=Executors.newFixedThreadPool(threadCount);CountDownLatchthreadLatchs=newCountDownLatch(threadCount);for(inti=0;i<threadCount;i++){//每个线程处理的数据List<Student>threadDatas=allStudents.stream().skip(i*dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());studentThreadPool.execute(()->{studentService.updateStudents(threadDatas,threadLatchs);});}try{//倒计时锁设置超时时间30sthreadLatchs.await(30,TimeUnit.SECONDS);}catch(Throwablee){e.printStackTrace();}System.out.println("主线程完成");}

多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格,

多线程修改50000条数据时 不同线程数耗时对比(秒)

根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在2-5个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。

四、基于两个CountDownLatch控制多线程事务提交

由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务,

这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。我们对代码进行了一点修改:

@OverridepublicvoidupdateStudentsThread(List<Student>students,CountDownLatchthreadLatch,CountDownLatchmainLatch,StudentTaskErrortaskStatus){TransactionStatustransactionStatus=dataSourceTransactionManager.getTransaction(transactionDefinition);System.out.println("子线程:"+Thread.currentThread().getName());try{students.forEach(s->{//更新教师信息//Stringteacher=s.getTeacher();StringnewTeacher="TNO_"+newRandom().nextInt(100);s.setTeacher(newTeacher);studentMapper.update(s);});}catch(Throwablee){taskStatus.setIsError();}finally{threadLatch.countDown();//切换到主线程执行}try{mainLatch.await();//等待主线程执行}catch(Throwablee){taskStatus.setIsError();}//判断是否有错误,如有错误就回滚事务if(taskStatus.getIsError()){dataSourceTransactionManager.rollback(transactionStatus);}else{mit(transactionStatus);}}

/***由于每个线程都是单独的事务,需要添加对线程事务的统一控制*我们这边使用两个CountDownLatch对子线程的事务进行控制*/@TestvoidupdateStudentWithThreadsAndTrans(){//查询总数据List<Student>allStudents=studentMapper.getAll();//线程数量finalIntegerthreadCount=4;//每个线程处理的数据量finalIntegerdataPartionLength=(allStudents.size()+threadCount-1)/threadCount;//创建多线程处理任务ExecutorServicestudentThreadPool=Executors.newFixedThreadPool(threadCount);CountDownLatchthreadLatchs=newCountDownLatch(threadCount);//用于计算子线程提交数量CountDownLatchmainLatch=newCountDownLatch(1);//用于判断主线程是否提交StudentTaskErrortaskStatus=newStudentTaskError();//用于判断子线程任务是否有错误for(inti=0;i<threadCount;i++){//每个线程处理的数据List<Student>threadDatas=allStudents.stream().skip(i*dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());studentThreadPool.execute(()->{studentService.updateStudentsThread(threadDatas,threadLatchs,mainLatch,taskStatus);});}try{//倒计时锁设置超时时间30sbooleanawait=threadLatchs.await(30,TimeUnit.SECONDS);if(!await){//等待超时,事务回滚taskStatus.setIsError();}}catch(Throwablee){e.printStackTrace();taskStatus.setIsError();}mainLatch.countDown();//切换到子线程执行studentThreadPool.shutdown();//关闭线程池System.out.println("主线程完成");}

本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:

Exceptioninthread"pool-1-thread-2"org.springframework.transaction.CannotCreateTransactionException:CouldnotopenJDBCConnectionfortransaction;nestedexceptionisjava.sql.SQLTransientConnectionException:HikariPool-1-Connectionisnotavailable,requesttimedoutafter30055ms.atorg.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)atorg.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)atorg.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)atcom.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)atcom.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)atjava.lang.Thread.run(Thread.java:748)Causedby:java.sql.SQLTransientConnectionException:HikariPool-1-Connectionisnotavailable,requesttimedoutafter30055ms.atcom.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)atcom.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)atcom.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)atcom.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)atorg.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)...7more

错误的大致意思时,不能为数据库事务打开jdbc Connection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。

看错误日志中错误的来源是HikariPool,我们来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:

#连接池中允许的最小连接数。缺省值:10spring.datasource.hikari.minimum-idle=10#连接池中允许的最大连接数。缺省值:10spring.datasource.hikari.maximum-pool-size=100#自动提交spring.datasource.hikari.auto-commit=true#一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟spring.datasource.hikari.idle-timeout=30000#一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒spring.datasource.hikari.max-lifetime=1800000#等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException,缺省:30秒

再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。

五、基于TransactionStatus集合来控制多线程事务提交

在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下

@ServicepublicclassStudentsTransactionThread{@AutowiredprivateStudentMapperstudentMapper;@AutowiredprivateStudentServicestudentService;@AutowiredprivatePlatformTransactionManagertransactionManager;List<TransactionStatus>transactionStatuses=Collections.synchronizedList(newArrayList<TransactionStatus>());@Transactional(propagation=Propagation.REQUIRED,rollbackFor={Exception.class})publicvoidupdateStudentWithThreadsAndTrans()throwsInterruptedException{//查询总数据List<Student>allStudents=studentMapper.getAll();//线程数量finalIntegerthreadCount=2;//每个线程处理的数据量finalIntegerdataPartionLength=(allStudents.size()+threadCount-1)/threadCount;//创建多线程处理任务ExecutorServicestudentThreadPool=Executors.newFixedThreadPool(threadCount);CountDownLatchthreadLatchs=newCountDownLatch(threadCount);AtomicBooleanisError=newAtomicBoolean(false);try{for(inti=0;i<threadCount;i++){//每个线程处理的数据List<Student>threadDatas=allStudents.stream().skip(i*dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());studentThreadPool.execute(()->{try{try{studentService.updateStudentsTransaction(transactionManager,transactionStatuses,threadDatas);}catch(Throwablee){e.printStackTrace();isError.set(true);}finally{threadLatchs.countDown();}}catch(Exceptione){e.printStackTrace();isError.set(true);}});}//倒计时锁设置超时时间30sbooleanawait=threadLatchs.await(30,TimeUnit.SECONDS);//判断是否超时if(!await){isError.set(true);}}catch(Throwablee){e.printStackTrace();isError.set(true);}if(!transactionStatuses.isEmpty()){if(isError.get()){transactionStatuses.forEach(s->transactionManager.rollback(s));}else{transactionStatuses.forEach(s->mit(s));}}System.out.println("主线程完成");}}

@Override@Transactional(propagation=Propagation.REQUIRED,rollbackFor={Exception.class})publicvoidupdateStudentsTransaction(PlatformTransactionManagertransactionManager,List<TransactionStatus>transactionStatuses,List<Student>students){//使用这种方式将事务状态都放在同一个事务里面DefaultTransactionDefinitiondef=newDefaultTransactionDefinition();def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);//事物隔离级别,开启新事务,这样会比较安全些。TransactionStatusstatus=transactionManager.getTransaction(def);//获得事务状态transactionStatuses.add(status);students.forEach(s->{//更新教师信息//Stringteacher=s.getTeacher();StringnewTeacher="TNO_"+newRandom().nextInt(100);s.setTeacher(newTeacher);studentMapper.update(s);});System.out.println("子线程:"+Thread.currentThread().getName());}

由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,

六、使用union连接多个select实现批量update

有些情况写不支持,批量update,但支持insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条select 语句,然后使用union 连接起来,再使用update 关联这个数据进行update,具体代码演示如下:

updatestudent,((select1asid,'teacher_A'asteacher)union(select2asid,'teacher_A'asteacher)union(select3asid,'teacher_A'asteacher)union(select4asid,'teacher_A'asteacher)/*....moredata...*/)asnew_teachersetstudent.teacher=new_teacher.teacherwherestudent.id=new_teacher.id

这种方式在Mysql 数据库没有配置allowMultiQueries=true也可以实现批量更新。

总结

对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率

多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在2-5个线程时操作时间最快。

对于多线程阻塞事务提交时,线程数量不能过多。

如果能有办法实现批量更新那是最好

END

看完本文有收获?请转发分享给更多人关注「Java编程鸭」,提升Java技能关注Java编程鸭微信公众号,后台回复:码农大礼包可以获取最新整理的技术资料一份。涵盖Java框架学习、架构师学习等!文章有帮助的话,在看,转发吧。谢谢支持哟 (*^__^*)

如果觉得《炸锅了 Java 多线程批量操作 居然有人不做事务控制?》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。