失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Python多进程导入CSV数据到

Python多进程导入CSV数据到

时间:2024-01-20 13:11:14

相关推荐

Python多进程导入CSV数据到

后端开发|Python教程

Python 多进程 导入MySQL

后端开发-Python教程

前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求。两个很大的 CSV 文件, 分别有 3GB、2100 万条记录和 7GB、3500 万条记录。对于这个量级的数据,用简单的单进程/单线程导入 会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:

易语言ce源码下载,vscode 不可打印字符,ubuntu自毁,tomcat单机并发量,爬虫管子油,php外卖订餐系统,北京常规seo优化什么价格,像学校官网的网站源码,论坛网站程序模板下载不了lzw

批量插入而不是逐条插入

ios 通信录源码,lts版ubuntu,怎么找tomcat是什么,猴子爬虫子,如何在命令行运行php,河间关键词seo优化哪家好lzw

为了加快插入速度,先不要建索引

板块池 源码,vscode怎么输入源码,ubuntu 安装elk,tomcat启动参数编码,python爬虫解密,php导出excel样式,seo外包合同要写哪些,h5模板网站代码下载,最好的网站模板下载网站lzw

生产者和消费者模型,主进程读文件,多个 worker 进程执行插入

注意控制 worker 的数量,避免对 MySQL 造成太大的压力

注意处理脏数据导致的异常

原始数据是 GBK 编码,所以还要注意转换成 UTF-8

用 click 封装命令行工具

具体的代码实现如下:

#!/usr/bin/env python# -*- coding: utf-8 -*-import codecsimport csvimport loggingimport multiprocessingimport osimport warningsimport clickimport MySQLdbimport sqlalchemywarnings.filterwarnings(ignore, category=MySQLdb.Warning)# 批量插入的记录数量BATCH = 5000DB_URI = mysql://root@localhost:3306/example?charset=utf8engine = sqlalchemy.create_engine(DB_URI)def get_table_cols(table): sql = SELECT * FROM `{table}` LIMIT 0.format(table=table) res = engine.execute(sql) return res.keys()def insert_many(table, cols, rows, cursor): sql = INSERT INTO `{table}` ({cols}) VALUES ({marks}).format(table=table,cols=, .join(cols),marks=, .join([\%s] * len(cols))) cursor.execute(sql, *rows) logging.info(process %s inserted %s rows into table %s, os.getpid(), len(rows), table)def insert_worker(table, cols, queue): rows = [] # 每个子进程创建自己的 engine 对象 cursor = sqlalchemy.create_engine(DB_URI) while True: row = queue.get() if row is None:if rows: insert_many(table, cols, rows, cursor)break rows.append(row) if len(rows) == BATCH:insert_many(table, cols, rows, cursor)rows = []def insert_parallel(table, reader, w=10): cols = get_table_cols(table) # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据 # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存 queue = multiprocessing.Queue(maxsize=w*BATCH*2) workers = [] for i in range(w): p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue)) p.start() workers.append(p) logging.info(starting # %s worker process, pid: %s..., i + 1, p.pid) dirty_data_file = ./{}_dirty_rows.csv.format(table) xf = open(dirty_data_file, w) writer = csv.writer(xf, delimiter=reader.dialect.delimiter) for line in reader: # 记录并跳过脏数据: 键值数量不一致 if len(line) != len(cols):writer.writerow(line)continue # 把 None 值替换为 NULL clean_line = [None if x == NULL else x for x in line] # 往队列里写数据 queue.put(tuple(clean_line)) if reader.line_num % 500000 == 0:logging.info(put %s tasks into queue., reader.line_num) xf.close() # 给每个 worker 发送任务结束的信号 logging.info(send close signal to worker processes) for i in range(w): queue.put(None) for p in workers: p.join()def convert_file_to_utf8(f, rv_file=None): if not rv_file: name, ext = os.path.splitext(f) if isinstance(name, unicode):name = name.encode(utf8) rv_file = {}_utf8{}.format(name, ext) logging.info(start to process file %s, f) with open(f) as infd: with open(rv_file, w) as outfd:lines = []loop = 0chunck = 200000first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + \ lines.append(first_line)for line in infd: clean_line = line.decode(gb18030).encode(utf8) clean_line = clean_line.rstrip() + \ lines.append(clean_line) if len(lines) == chunck:outfd.writelines(lines)lines = []loop += 1logging.info(processed %s lines., loop * chunck)outfd.writelines(lines)logging.info(processed %s lines., loop * chunck + len(lines))@click.group()def cli(): logging.basicConfig(level=logging.INFO, format=\%(asctime)s - %(levelname)s - %(name)s - %(message)s)@mand(gbk_to_utf8)@click.argument(f)def convert_gbk_to_utf8(f): convert_file_to_utf8(f)@mand(load)@click.option(-t, --table, required=True, help=表名)@click.option(-i, --filename, required=True, help=输入文件)@click.option(-w, --workers, default=10, help=worker 数量,默认 10)def load_fac_day_pro_nos_sal_table(table, filename, workers): with open(filename) as fd: fd.readline() # skip header reader = csv.reader(fd) insert_parallel(table, reader, w=workers)if __name__ == \__main__: cli()

以上就是本文给大家分享的全部没人了,希望大家能够喜欢

更多Python多进程导入CSV数据到 相关文章请关注PHP中文网!

如果觉得《Python多进程导入CSV数据到》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。