前言
最近遇到一个需求,有几十个Excel,每个的字段都不一样,然后都差不多是第一行是表头,后面几千上万的数据,需要把这些Excel中的数据全都加入某个已经上线的Django项目
这就需要每个Excel建个表,然后一个个导入了
这样的效率太低,不能忍
所以我造了个自动生成 Model 和导入脚本的轮子
思路
首先拿出 pandas,它的 DataFrame 用来处理数据很方便
pandas 加载 Excel 之后,提取表头,我们要通过表头来生成数据表的字段。有些 Excel 的表头是中文的,需要先做个转换。
一开始我是想用翻译API,全都翻译成英文,不过发现免费的很慢有限额,微软、DeepL都要申请,很麻烦。索性用个拼音转换库,全都转换成拼音得了~
然后字段的长度也要确定,或者全部用不限制长度的 TextField
权衡一下,我还是做一下字段长度判定的逻辑,遍历整个表,找出各个字段最长的数据,然后再加一个偏移量,作为最大长度。
接着生成 Model 类,这里我用 jinja2 模板语言,先把大概的模板写好,然后根据提取出来的字段名啥的生成。
最后生成 admin 配置和导入脚本,同理,也是用 jinja2 模板。
实现
简单介绍下思路,现在开始上代码。
就几行而已,Python很省代码~
模型
首先定义俩模型
字段模型
-
class
Field(
object):
-
def
__init__(
self, name: str, verbose_name: str, max_length: int = 128):
-
self.name = name
-
self.verbose_name = verbose_name
-
self.max_length = max_length
-
-
def
__str__(
self):
-
return
f'<Field>{self.name}:{self.verbose_name}'
-
-
def
__repr__(
self):
-
return self.__str__()
Model模型
为了符合Python关于变量的命名规范,snake_name
属性是用正则表达式实现驼峰命名转蛇形命名
-
class
Model(
object):
-
def
__init__(
self, name: str, verbose_name: str, id_field: Field, fields: List[Field]):
-
self.name = name
-
self.verbose_name = verbose_name
-
self.id_field = id_field
-
self.fields:
List[Field] = fields
-
-
@property
-
def
snake_name(
self):
-
import re
-
pattern = re.
compile(
r'(?<!^)(?=[A-Z])')
-
name = pattern.sub(
'_', self.name).lower()
-
return name
-
-
def
__str__(
self):
-
return
f'<Model>{self.name}:{self.verbose_name}'
-
-
def
__repr__(
self):
-
return self.__str__()
代码模板
使用 jinja2 实现。
本身 jinja2 是 Flask、Django 之类的框架用来渲染网页的。
不过单独使用的效果也不错,我的 DjangoStarter 框架也是用这个 jinja2 来自动生成 CRUD 代码~
Model模板
-
# -*- coding:utf-8 -*-
-
from django.db import models
-
-
class
{{ model.name }}
(models.Model):
-
"""
{{ model.verbose_name }}
"""
-
{% for field in model.fields -%}
-
{{ field.name }}
= models.CharField('
{{ field.verbose_name }}
', default='', null=True, blank=True, max_length=
{{ field.max_length }}
)
-
{% endfor %}
-
class Meta:
-
db_table = '
{{ model.snake_name }}
'
-
verbose_name = '
{{ model.verbose_name }}
'
-
verbose_name_plural = verbose_name
-
Admin配置模板
-
@admin.register({{ model.name }})
-
class {{ model.name }}Admin(admin.ModelAdmin):
-
list_display = [{%
for field
in model.fields %}
'{{ field.name }}', {% endfor %}]
-
list_display_links =
None
-
-
def
has_add_permission(
self, request):
-
return
False
-
-
def
has_delete_permission(
self, request, obj=None):
-
return
False
-
-
def
has_view_permission(
self, request, obj=None):
-
return
False
数据导入脚本
这里做了几件事:
- 使用 pandas 处理空值,填充空字符串
- 已有数据进行批量更新
- 新数据批量插入
更新逻辑麻烦一点,因为数据库一般都有每次最大更新数量的限制,所以我做了分批处理,通过 update_data_once_max_lines
控制每次最多同时更新多少条数据。
-
def import_
{{ model.snake_name }}
():
-
file_path = path_proc(r'
{{ excel_filepath }}
')
-
-
logger.info(f'读取文件: {file_path}')
-
xlsx = pd.ExcelFile(file_path)
-
df = pd.read_excel(xlsx, 0, header=
{{ excel_header }}
)
-
df.fillna('', inplace=True)
-
-
logger.info('开始处理数据')
-
-
id_field_list =
{{ model.name }}
.objects.values_list('
{{ model.id_field.name }}
', flat=True)
-
item_list = list(
{{ model.name }}
.objects.all())
-
-
def get_item(id_value):
-
for i in item_list:
-
if i.shen_qing_ren_zheng_jian_hao_ma == id_value:
-
return i
-
return None
-
-
insert_data = []
-
update_data_once_max_lines = 100
-
update_data_sub_set_index = 0
-
update_data = [[]]
-
update_fields = set()
-
-
for index, row in df.iterrows():
-
if '
{{ model.id_field.verbose_name }}
' not in row:
-
logger.error('id_field {} is not existed'.format('
{{ model.id_field.verbose_name }}
'))
-
continue
-
-
if row['
{{ model.id_field.verbose_name }}
'] in id_field_list:
-
item = get_item(row['
{{ model.id_field.verbose_name }}
'])
-
{% for field in model.fields -%}
-
if '
{{ field.verbose_name }}
' in row:
-
if item.
{{ field.name }}
!= row['
{{ field.verbose_name }}
']:
-
item.
{{ field.name }}
= row['
{{ field.verbose_name }}
']
-
update_fields.add('
{{ field.name }}
')
-
{% endfor %}
-
if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:
-
update_data_sub_set_index += 1
-
update_data.append([])
-
update_data[update_data_sub_set_index].append(item)
-
else:
-
# {% for field in model.fields -%}
{{ field.verbose_name }}
,{%- endfor %}
-
model_obj =
{{ model.name }}
()
-
{% for field in model.fields -%}
-
if '
{{ field.verbose_name }}
' in row:
-
model_obj.
{{ field.name }}
= row['
{{ field.verbose_name }}
']
-
{% endfor %}
-
insert_data.append(model_obj)
-
-
logger.info('开始批量导入')
-
{{ model.name }}
.objects.bulk_create(insert_data)
-
logger.info('导入完成')
-
-
if len(update_data[update_data_sub_set_index]) > 0:
-
logger.info('开始批量更新')
-
for index, update_sub in enumerate(update_data):
-
logger.info(f'正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 条数据')
-
{{ model.name }}
.objects.bulk_update(update_sub, list(update_fields))
-
logger.info('更新完成')
-
主体代码
剩下的全是核心代码了
引用依赖
先把用到的库导入
-
import os
-
import re
-
from typing
import List, Optional
-
-
from pypinyin
import pinyin, lazy_pinyin, Style
-
from jinja2
import Environment, PackageLoader, FileSystemLoader
或者后面直接去我的完整代码里面拿也行~
类
老规矩,我封装了一个类。
构造方法需要指定 Excel 文件地址,还有表头的行索引。
-
class ExcelToModel(object):
-
def __init__(
self, filepath, header_index=
0):
-
self.filepath = filepath
-
self.header_index = header_index
-
self.columns = []
-
self.fields: List[Field] = []
-
-
self.base_dir =
os.
path.dirname(
os.
path.abspath(__file__))
-
self.template_path =
os.
path.join(
self.base_dir,
'templates')
-
self.jinja2_env = Environment(loader=FileSystemLoader(
self.template_path))
-
-
self.load_file()
这里面有个 self.load_file()
后面再贴。
字段名中文转拼音
用了 pypinyin
这个库,感觉还不错。
转换后用正则表达式,去除符号,只保留英文和数字。
代码如下,也是放在 ExcelToModel
类里边。
-
@staticmethod
-
def to_pinyin(
text: str) -> str:
-
pattern = r
'~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
-
text = re.
sub(r
"[%s]+" % pattern,
"",
text)
-
return
'_'.join(lazy_pinyin(text, style=Style.NORMAL))
加载文件
拿出万能的 pandas,按照前面说的思路,提取表头转换成字段,并且遍历数据确定每个字段的最大长度,我这里偏移值是32,即在当前数据最大长度基础上加上32个字符。
-
def
load_file(
self):
-
import pandas
as pd
-
xlsx = pd.ExcelFile(self.filepath)
-
df = pd.read_excel(xlsx,
0, header=self.header_index)
-
df.fillna(
'', inplace=
True)
-
self.columns =
list(df.columns)
-
for col
in self.columns:
-
field = Field(self.to_pinyin(col), col)
-
self.fields.append(field)
-
for index, row
in df.iterrows():
-
item_len =
len(
str(row[col]))
-
if item_len > field.max_length:
-
field.max_length = item_len +
32
-
-
print(field.verbose_name, field.name, field.max_length)
如果觉得这样生成表太慢,可以把确定最大长度的这块代码去掉,就下面这块代码
-
for index, row in df
.iterrows():
-
item_len =
len(
str(row[col]))
-
if item_len > field.max_length:
-
field.max_length = item_len +
32
手动指定最大长度或者换成不限制长度的 TextField
就行。
生成文件
先构造个 context 然后直接用 jinja2 的 render
功能生成代码。
为了在导入时判断数据存不存在,生成代码时要指定 id_field_verbose_name
,即Excel文件中类似“证件号码”、“编号”之类的列名,注意是Excel中的表头列名。
-
def
find_field_by_verbose_name(
self, verbose_name)
-> Optional[Field]:
-
for
field
in
self.fields:
-
if field.verbose_name == verbose_name:
-
return field
-
return
None
-
-
def
generate_file(
self, model_name:
str, verbose_name:
str, id_field_verbose_name:
str, output_filepath:
str):
-
template =
self.jinja2_env.
get_template(
'output.jinja2')
-
context = {
-
'model':
Model(
-
model_name, verbose_name,
-
self.
find_field_by_verbose_name(id_field_verbose_name),
-
self.fields
-
),
-
'excel_filepath':
self.filepath,
-
'excel_header':
self.header_index,
-
}
-
with
open(output_filepath,
'w+', encoding=
'utf-
8')
as f:
-
render_result = template.
render(context)
-
f.
write(render_result)
使用
看代码。
-
tool = ExcelToModel(
'file.xlsx')
-
tool.generate_file(
'CitizenFertility',
'房价与居民生育率',
'证件号码',
'output/citizen_fertility.py')
生成出来的代码都在一个文件里,请根据实际情况放到项目的各个位置。
完整代码
发布到Github了
地址: GitHub - Deali-Axy/excel_to_model: 通过Excel生成Django模型和数据导入脚本
转载:https://blog.csdn.net/jh035512/article/details/127896630