失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 多线程处理十万百万级List(大list处理)

多线程处理十万百万级List(大list处理)

时间:2019-06-15 23:54:31

相关推荐

多线程处理十万百万级List(大list处理)

一、背景

对于java开发中,十万百万级别的list进行各种操作,然后入库等操作好多时候会遇到。普通单线程处理,处理时间长,还经常报gc问题。针对此问题,查阅了网上很多资料,好多都使用多线程来处理。跟着好多的博客进行处理,要么是线程安全问题,要么根本速度就提高不了。我针对我项目中的使用场景,结合资料进行了修改,特提交此文,为有共同需求的小伙伴提供一点建议。

本次使用场景为:从数据库中查阅数十万数据生成list,将list进行遍历转json,拼接属性,组转成新list批量入库es.

二、代码如下:

//从数据库中查询需要同步的航班List<Map> fvaluesList = abss.queryListFvaluesOfDays(start, end);long beginTime = System.currentTimeMillis();// 开始时间long start1 = System.currentTimeMillis();// 每500条数据开启一条线程int threadSize = 5000;// 总数据条数int dataSize = fvaluesList.size();// 线程数int threadNum = dataSize / threadSize + 1;// 定义标记,过滤threadNum为整数boolean special = dataSize % threadSize == 0;// 创建一个线程池ExecutorService exec = Executors.newFixedThreadPool(threadNum);// 定义一个任务集合List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();Callable<Integer> task = null;CopyOnWriteArrayList<Map> cutList = null;final int[] num = {0};// 确定每条线程的数据for (int n = 0; n < threadNum; n++) {if (n == threadNum - 1) {if (special) {break;}cutList = new CopyOnWriteArrayList(fvaluesList.subList(threadSize * n, dataSize).toArray()) ;} else {cutList = new CopyOnWriteArrayList(fvaluesList.subList(threadSize * n, threadSize * (n + 1)).toArray());}final CopyOnWriteArrayList<Map> listStr = cutList;task = new Callable<Integer>() {@Overridepublic Integer call() throws Exception {System.out.println(Thread.currentThread().getName());CopyOnWriteArrayList<SqaEsFvalues> esfvalues = new CopyOnWriteArrayList<SqaEsFvalues>();for (Map fvalues : listStr) {if (fvalues.get("items") != null || !fvalues.get("items").equals("")) {JSONObject items = JSONObject.parseObject(fvalues.get("items").toString());//items转MapMap<String, Object> map = (Map<String, Object>) items;//当map1和map中有相同字段,则保留map中的,丢弃map1中的for (Object key : fvalues.keySet()) {if (map.containsKey(key)) {map.remove(key);}}fvalues.putAll(map);fvalues.remove("items");}String jsonString = JSON.toJSONString(fvalues);//把map系列化到对象中SqaEsFvalues esfvalue = JSON.parseObject(jsonString, SqaEsFvalues.class);esfvalues.add(esfvalue);}synchronized (fvaluesRepository){if (!esfvalues.isEmpty()) {num[0] +=esfvalues.size();System.out.println(num[0]);fvaluesRepository.saveAll(esfvalues);}}return 1;}};// 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系tasks.add(task);}System.out.println("tasksSize=" + tasks.size());List<Future<Integer>> results = exec.invokeAll(tasks);for (Future<Integer> future : results) {System.out.println("results.size=" + results.size());// System.out.println(future.get());}// 关闭线程池exec.shutdown();System.out.println("线程任务执行结束");System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start1) + "毫秒");

三、特别说明

1.使用CopyOnWriteArrayList替代Arraylist,因为ArrayList是线程不安全的。

2.在存入es时候,会出现线程安全问题,使用synchronized代码块处理。

四、效果

本机实验,二百万数据花半小时成功进入es.数据有待提高,其中从数据库中一次查询几百万数据用时较长,可以进一步优化。

如果觉得《多线程处理十万百万级List(大list处理)》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。