失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > JAVA多线程+事务进行批量新增

JAVA多线程+事务进行批量新增

时间:2024-03-16 09:03:44

相关推荐

JAVA多线程+事务进行批量新增

线程池的配置:

@Configuration@EnableAsync@Slf4jpublic class ThreadPoolConfig {@Bean("threadPoolTaskExecutor")public ThreadPoolTaskExecutor buildThreadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 设置核心线程数threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());// 设置最大线程数threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);// 设置队列容量threadPoolTaskExecutor.setQueueCapacity(0);// 设置线程活跃时间(秒)threadPoolTaskExecutor.setKeepAliveSeconds(20);// 设置默认线程名称threadPoolTaskExecutor.setThreadNamePrefix("ThreadPool-");// 设置拒绝策略threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 等待所有任务结束后再关闭线程池threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}}

代码实现:

//根据数据库不同,现定义每次最大插入量2000通用private static final int INSERT_LIMIT_NUMBER = 2000;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;*** 异步新增** @param getAllShiftsResponseList* @param monthFullDay*/private void threadInsert(List<GetAllShiftsVo> getAllShiftsResponseList, List<String> monthFullDay) {int eachInsertSize = INSERT_LIMIT_NUMBER;int maxInsertPoolSize = threadPoolTaskExecutor.getMaxPoolSize();if (getAllShiftsResponseList.size() > eachInsertSize * maxInsertPoolSize) {eachInsertSize = BigDecimal.valueOf(getAllShiftsResponseList.size()).divide(BigDecimal.valueOf(maxInsertPoolSize), BigDecimal.ROUND_UP).intValue();}List<List<GetAllShiftsVo>> partition = Lists.partition(getAllShiftsResponseList, eachInsertSize);int threadNum = partition.size();// 是否存在异常AtomicReference<Boolean> rollbackFlag = new AtomicReference<>(false);CountDownLatch transactionLatch = new CountDownLatch(threadNum);partition.forEach(item -> executeInsert(item, rollbackFlag, transactionLatch));try {if (!transactionLatch.await(200L, TimeUnit.SECONDS)) {log.info("执行时间过长!!");rollbackFlag.set(true);}} catch (Exception e) {log.info("批量新增每月排班失败!");throw new BusinessException(Emsg.BATCH_SHIFT_INSERT_ERROR);}}

private void executeInsert(List<GetAllShiftsVo> list, AtomicReference<Boolean> rollbackFlag,CountDownLatch transactionLatch) {threadPoolTaskExecutor.execute(() -> {log.info(Thread.currentThread().getName() + "线程启动");if (rollbackFlag.get()) {return;}DefaultTransactionDefinition def = new DefaultTransactionDefinition();//事物隔离级别,开启新事务,这样会比较安全些。def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);//获得事务状态TransactionStatus status = transactionManager.getTransaction(def);try {log.info("开始插入");if (list.size() <= INSERT_LIMIT_NUMBER) {batchInsertAllShiftList(list);} else {List<List<GetAllShiftsVo>> listList = Lists.partition(list, INSERT_LIMIT_NUMBER);listList.forEach(item -> batchInsertAllShiftList(item));}log.info(Thread.currentThread().getName() + "线程等待");} catch (Exception e) {log.error("批量插入异常", e);rollbackFlag.set(true);transactionManager.rollback(status);}// 事务结束,计数阀计数减1transactionLatch.countDown();try {transactionLatch.await();if (rollbackFlag.get()) {//事务回滚transactionManager.rollback(status);} else {//事务提交mit(status);}} catch (InterruptedException e) {e.printStackTrace();}log.info(Thread.currentThread().getName() + "线程结束");});}

大致实现逻辑就是这样

如果觉得《JAVA多线程+事务进行批量新增》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。