小言_互联网的博客

造个Python轮子,实现根据Excel生成Model和数据导入脚本

402人阅读  评论(0)

前言

最近遇到一个需求,有几十个Excel,每个的字段都不一样,然后都差不多是第一行是表头,后面几千上万的数据,需要把这些Excel中的数据全都加入某个已经上线的Django项目

这就需要每个Excel建个表,然后一个个导入了

这样的效率太低,不能忍

所以我造了个自动生成 Model 和导入脚本的轮子

思路

首先拿出 pandas,它的 DataFrame 用来处理数据很方便

pandas 加载 Excel 之后,提取表头,我们要通过表头来生成数据表的字段。有些 Excel 的表头是中文的,需要先做个转换。

一开始我是想用翻译API,全都翻译成英文,不过发现免费的很慢有限额,微软、DeepL都要申请,很麻烦。索性用个拼音转换库,全都转换成拼音得了~

然后字段的长度也要确定,或者全部用不限制长度的 TextField

权衡一下,我还是做一下字段长度判定的逻辑,遍历整个表,找出各个字段最长的数据,然后再加一个偏移量,作为最大长度。

接着生成 Model 类,这里我用 jinja2 模板语言,先把大概的模板写好,然后根据提取出来的字段名啥的生成。

最后生成 admin 配置和导入脚本,同理,也是用 jinja2 模板。

实现

简单介绍下思路,现在开始上代码。

就几行而已,Python很省代码~

模型

首先定义俩模型

字段模型


  
  1. class Field( object):
  2. def __init__( self, name: str, verbose_name: str, max_length: int = 128):
  3. self.name = name
  4. self.verbose_name = verbose_name
  5. self.max_length = max_length
  6. def __str__( self):
  7. return f'<Field>{self.name}:{self.verbose_name}'
  8. def __repr__( self):
  9. return self.__str__()

Model模型

为了符合Python关于变量的命名规范,snake_name 属性是用正则表达式实现驼峰命名转蛇形命名


  
  1. class Model( object):
  2. def __init__( self, name: str, verbose_name: str, id_field: Field, fields: List[Field]):
  3. self.name = name
  4. self.verbose_name = verbose_name
  5. self.id_field = id_field
  6. self.fields: List[Field] = fields
  7. @property
  8. def snake_name( self):
  9. import re
  10. pattern = re. compile( r'(?<!^)(?=[A-Z])')
  11. name = pattern.sub( '_', self.name).lower()
  12. return name
  13. def __str__( self):
  14. return f'<Model>{self.name}:{self.verbose_name}'
  15. def __repr__( self):
  16. return self.__str__()

代码模板

使用 jinja2 实现。

本身 jinja2 是 Flask、Django 之类的框架用来渲染网页的。

不过单独使用的效果也不错,我的 DjangoStarter 框架也是用这个 jinja2 来自动生成 CRUD 代码~

Model模板


  
  1. # -*- coding:utf-8 -*-
  2. from django.db import models
  3. class {{ model.name }} (models.Model):
  4. """ {{ model.verbose_name }} """
  5. {% for field in model.fields -%}
  6. {{ field.name }} = models.CharField(' {{ field.verbose_name }} ', default='', null=True, blank=True, max_length= {{ field.max_length }} )
  7. {% endfor %}
  8. class Meta:
  9. db_table = ' {{ model.snake_name }} '
  10. verbose_name = ' {{ model.verbose_name }} '
  11. verbose_name_plural = verbose_name

Admin配置模板


  
  1. @admin.register({{ model.name }})
  2. class {{ model.name }}Admin(admin.ModelAdmin):
  3. list_display = [{% for field in model.fields %} '{{ field.name }}', {% endfor %}]
  4. list_display_links = None
  5. def has_add_permission( self, request):
  6. return False
  7. def has_delete_permission( self, request, obj=None):
  8. return False
  9. def has_view_permission( self, request, obj=None):
  10. return False

数据导入脚本

这里做了几件事:

  • 使用 pandas 处理空值,填充空字符串
  • 已有数据进行批量更新
  • 新数据批量插入

更新逻辑麻烦一点,因为数据库一般都有每次最大更新数量的限制,所以我做了分批处理,通过 update_data_once_max_lines 控制每次最多同时更新多少条数据。


  
  1. def import_ {{ model.snake_name }} ():
  2. file_path = path_proc(r' {{ excel_filepath }} ')
  3. logger.info(f'读取文件: {file_path}')
  4. xlsx = pd.ExcelFile(file_path)
  5. df = pd.read_excel(xlsx, 0, header= {{ excel_header }} )
  6. df.fillna('', inplace=True)
  7. logger.info('开始处理数据')
  8. id_field_list = {{ model.name }} .objects.values_list(' {{ model.id_field.name }} ', flat=True)
  9. item_list = list( {{ model.name }} .objects.all())
  10. def get_item(id_value):
  11. for i in item_list:
  12. if i.shen_qing_ren_zheng_jian_hao_ma == id_value:
  13. return i
  14. return None
  15. insert_data = []
  16. update_data_once_max_lines = 100
  17. update_data_sub_set_index = 0
  18. update_data = [[]]
  19. update_fields = set()
  20. for index, row in df.iterrows():
  21. if ' {{ model.id_field.verbose_name }} ' not in row:
  22. logger.error('id_field {} is not existed'.format(' {{ model.id_field.verbose_name }} '))
  23. continue
  24. if row[' {{ model.id_field.verbose_name }} '] in id_field_list:
  25. item = get_item(row[' {{ model.id_field.verbose_name }} '])
  26. {% for field in model.fields -%}
  27. if ' {{ field.verbose_name }} ' in row:
  28. if item. {{ field.name }} != row[' {{ field.verbose_name }} ']:
  29. item. {{ field.name }} = row[' {{ field.verbose_name }} ']
  30. update_fields.add(' {{ field.name }} ')
  31. {% endfor %}
  32. if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:
  33. update_data_sub_set_index += 1
  34. update_data.append([])
  35. update_data[update_data_sub_set_index].append(item)
  36. else:
  37. # {% for field in model.fields -%} {{ field.verbose_name }} ,{%- endfor %}
  38. model_obj = {{ model.name }} ()
  39. {% for field in model.fields -%}
  40. if ' {{ field.verbose_name }} ' in row:
  41. model_obj. {{ field.name }} = row[' {{ field.verbose_name }} ']
  42. {% endfor %}
  43. insert_data.append(model_obj)
  44. logger.info('开始批量导入')
  45. {{ model.name }} .objects.bulk_create(insert_data)
  46. logger.info('导入完成')
  47. if len(update_data[update_data_sub_set_index]) > 0:
  48. logger.info('开始批量更新')
  49. for index, update_sub in enumerate(update_data):
  50. logger.info(f'正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 条数据')
  51. {{ model.name }} .objects.bulk_update(update_sub, list(update_fields))
  52. logger.info('更新完成')

主体代码

剩下的全是核心代码了

引用依赖

先把用到的库导入


  
  1. import os
  2. import re
  3. from typing import List, Optional
  4. from pypinyin import pinyin, lazy_pinyin, Style
  5. from jinja2 import Environment, PackageLoader, FileSystemLoader

或者后面直接去我的完整代码里面拿也行~

老规矩,我封装了一个类。

构造方法需要指定 Excel 文件地址,还有表头的行索引。


  
  1. class ExcelToModel(object):
  2. def __init__( self, filepath, header_index= 0):
  3. self.filepath = filepath
  4. self.header_index = header_index
  5. self.columns = []
  6. self.fields: List[Field] = []
  7. self.base_dir = os. path.dirname( os. path.abspath(__file__))
  8. self.template_path = os. path.join( self.base_dir, 'templates')
  9. self.jinja2_env = Environment(loader=FileSystemLoader( self.template_path))
  10. self.load_file()

这里面有个 self.load_file() 后面再贴。

字段名中文转拼音

用了 pypinyin 这个库,感觉还不错。

转换后用正则表达式,去除符号,只保留英文和数字。

代码如下,也是放在 ExcelToModel 类里边。


  
  1. @staticmethod
  2. def to_pinyin( text: str) -> str:
  3. pattern = r '~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
  4. text = re. sub(r "[%s]+" % pattern, "", text)
  5. return '_'.join(lazy_pinyin(text, style=Style.NORMAL))

加载文件

拿出万能的 pandas,按照前面说的思路,提取表头转换成字段,并且遍历数据确定每个字段的最大长度,我这里偏移值是32,即在当前数据最大长度基础上加上32个字符。


  
  1. def load_file( self):
  2. import pandas as pd
  3. xlsx = pd.ExcelFile(self.filepath)
  4. df = pd.read_excel(xlsx, 0, header=self.header_index)
  5. df.fillna( '', inplace= True)
  6. self.columns = list(df.columns)
  7. for col in self.columns:
  8. field = Field(self.to_pinyin(col), col)
  9. self.fields.append(field)
  10. for index, row in df.iterrows():
  11. item_len = len( str(row[col]))
  12. if item_len > field.max_length:
  13. field.max_length = item_len + 32
  14. print(field.verbose_name, field.name, field.max_length)

如果觉得这样生成表太慢,可以把确定最大长度的这块代码去掉,就下面这块代码


  
  1. for index, row in df .iterrows():
  2. item_len = len( str(row[col]))
  3. if item_len > field.max_length:
  4. field.max_length = item_len + 32

手动指定最大长度或者换成不限制长度的 TextField 就行。

生成文件

先构造个 context 然后直接用 jinja2 的 render 功能生成代码。

为了在导入时判断数据存不存在,生成代码时要指定 id_field_verbose_name,即Excel文件中类似“证件号码”、“编号”之类的列名,注意是Excel中的表头列名。


  
  1. def find_field_by_verbose_name( self, verbose_name) -> Optional[Field]:
  2. for field in self.fields:
  3. if field.verbose_name == verbose_name:
  4. return field
  5. return None
  6. def generate_file( self, model_name: str, verbose_name: str, id_field_verbose_name: str, output_filepath: str):
  7. template = self.jinja2_env. get_template( 'output.jinja2')
  8. context = {
  9. 'model': Model(
  10. model_name, verbose_name,
  11. self. find_field_by_verbose_name(id_field_verbose_name),
  12. self.fields
  13. ),
  14. 'excel_filepath': self.filepath,
  15. 'excel_header': self.header_index,
  16. }
  17. with open(output_filepath, 'w+', encoding= 'utf- 8') as f:
  18. render_result = template. render(context)
  19. f. write(render_result)

使用

看代码。


  
  1. tool = ExcelToModel( 'file.xlsx')
  2. tool.generate_file( 'CitizenFertility', '房价与居民生育率', '证件号码', 'output/citizen_fertility.py')

生成出来的代码都在一个文件里,请根据实际情况放到项目的各个位置。

完整代码

发布到Github了

地址: GitHub - Deali-Axy/excel_to_model: 通过Excel生成Django模型和数据导入脚本


转载:https://blog.csdn.net/jh035512/article/details/127896630
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场