飞道的博客

数仓作业调度

400人阅读  评论(0)
  • 概述

随着数据仓库的开发,ETL作业会越来越多,怎么把这些作业有序的运行起来,就需要一个健壮的调度系统来保证数据能够准确、及时的提供给BI应用程序。

  • 调度系统设计目标 

                   

  • 调度系统架构

                                            

 

  • ETL作业

数据仓库的ETL作业可能不至一种,需要把各种作业再次进行封装,建立作业的标准格式,统一作业的输入参数、输出参数和参数格式,达到所有的作业调度方式一致。

  • 作业管理

提供便捷的作业配置和依赖关系配置页面。

  • 作业调度

根据作业调度算法进行自动执行,如果发生异常情况可以进行人工重启和停止。

  • 作业监控

对作业运行情况进行监控,发送邮件或者提供监控页面。

  • 作业逻辑模型

                                        

  • 作业调度状态流转图

                                                                      

  1. 初始化:把新添加的作业加入到执行计划中。
  2. 待运行:根据时间和依赖关系把可执行的任务更新为待运行。
  3. 运行中:把待运行的任务执行,更新为运行中。
  4. 结束:作业运行完成,分为成功和失败。
  • python实现调度工具 
  • 准备环境

    1.安装mysql数据库,执行建表脚本。


  
  1. CREATE TABLE `task_info` (
  2. `task_id` int( 11) NOT NULL AUTO_INCREMENT COMMENT '任务主键',
  3. `task_name` varchar( 255) DEFAULT NULL COMMENT '任务名称',
  4. `task_dir` varchar( 255) DEFAULT NULL COMMENT '任务所在路径',
  5. `task_cmd` varchar( 255) DEFAULT NULL COMMENT '任务启动命令',
  6. `task_argv` varchar( 255) DEFAULT NULL COMMENT '任务参数',
  7. `task_level` varchar( 255) DEFAULT NULL COMMENT '任务层级',
  8. `target_table_name` varchar( 255) DEFAULT NULL COMMENT '目标表名称',
  9. `start_time` datetime DEFAULT NULL COMMENT '任务启动日期',
  10. `task_fq` varchar( 255) DEFAULT NULL COMMENT '任务执行频率:季-quarters,月-months,周-weeks,天-days,小时-hours,分钟-minutes',
  11. `time_interval` int( 11) DEFAULT NULL COMMENT '时间间隔',
  12. `create_user` varchar( 255) DEFAULT NULL COMMENT '创建人',
  13. `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  14. `update_user` datetime DEFAULT NULL COMMENT '更新人',
  15. `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  16. `is_active` tinyint( 4) DEFAULT NULL COMMENT '是否有效',
  17. PRIMARY KEY ( `task_id`),
  18. UNIQUE KEY `uq_task_name` ( `task_name`) USING BTREE
  19. ) ENGINE= InnoDB AUTO_INCREMENT= 1427 DEFAULT CHARSET=utf8;
  20. CREATE TABLE `task_dpd` (
  21. `id` int( 11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  22. `task_id` int( 11) DEFAULT NULL COMMENT '任务外键',
  23. `task_name` varchar( 255) DEFAULT NULL COMMENT '任务名称',
  24. `pre_task_id` int( 11) DEFAULT NULL COMMENT '依赖任务外键',
  25. `is_active` tinyint( 4) DEFAULT NULL COMMENT '是否有效',
  26. `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  27. `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  28. PRIMARY KEY ( `id`),
  29. KEY `fk_task_dpd_info_id` ( `task_id`),
  30. CONSTRAINT `fk_task_dpd_info_id` FOREIGN KEY ( `task_id`) REFERENCES `task_info` ( `task_id`)
  31. ) ENGINE= InnoDB AUTO_INCREMENT= 2064 DEFAULT CHARSET=utf8;
  32. CREATE TABLE `task_plan` (
  33. `id` int( 11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  34. `task_id` int( 11) DEFAULT NULL COMMENT '外键',
  35. `task_name` varchar( 255) DEFAULT NULL COMMENT '任务名称',
  36. `begtime` datetime DEFAULT NULL COMMENT '开始时间',
  37. `endtime` datetime DEFAULT NULL COMMENT '结束时间',
  38. `task_status` tinyint( 4) DEFAULT NULL COMMENT '任务状态 1-初始化,2-待运行,3-运行中,4-成功,5-失败',
  39. `retry_count` tinyint( 4) DEFAULT '0' COMMENT '重试次数',
  40. `failed_desc` text COMMENT '失败原因描述',
  41. `next_time` datetime DEFAULT NULL COMMENT '下次运行时间',
  42. `task_ppid` int( 11) DEFAULT NULL COMMENT '父进程id',
  43. `task_pid` int( 11) DEFAULT NULL COMMENT '子进程id',
  44. `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  45. `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  46. `task_result` text COMMENT '执行结果信息',
  47. PRIMARY KEY ( `id`),
  48. KEY `fk_task_info_task_id` ( `task_id`),
  49. CONSTRAINT `fk_task_info_task_id` FOREIGN KEY ( `task_id`) REFERENCES `task_info` ( `task_id`)
  50. ) ENGINE= InnoDB AUTO_INCREMENT= 6401 DEFAULT CHARSET=utf8;
  51. CREATE TABLE `task_log` (
  52. `id` int( 11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  53. `task_id` int( 11) NOT NULL COMMENT '任务外键',
  54. `task_name` varchar( 255) DEFAULT NULL COMMENT '任务名称',
  55. `begtime` datetime DEFAULT NULL COMMENT '开始时间',
  56. `endtime` datetime DEFAULT NULL COMMENT '结束时间',
  57. `task_status` tinyint( 4) DEFAULT NULL COMMENT '任务状态 4-成功,5-失败',
  58. `task_result` text COMMENT '执行结果信息',
  59. `failed_desc` text COMMENT '失败信息',
  60. `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  61. `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  62. PRIMARY KEY ( `id`),
  63. KEY `fk_task_info_id` ( `task_id`),
  64. CONSTRAINT `fk_task_info_id` FOREIGN KEY ( `task_id`) REFERENCES `task_info` ( `task_id`)
  65. ) ENGINE= InnoDB AUTO_INCREMENT= 2382 DEFAULT CHARSET=utf8;
  66. CREATE TABLE `task_param` (
  67. `id` int( 11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  68. `param_type` varchar( 255) DEFAULT NULL COMMENT '参数类型',
  69. `param_value` varchar( 255) DEFAULT NULL COMMENT '参数值',
  70. `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  71. `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  72. `param_name` varchar( 255) DEFAULT NULL COMMENT '参数名称',
  73. PRIMARY KEY ( `id`)
  74. ) ENGINE= InnoDB AUTO_INCREMENT= 2 DEFAULT CHARSET=utf8;
  75. INSERT INTO `task_param`( `id`, `param_type`, `param_value`, `create_time`, `update_time`, `param_name`) VALUES ( 1, 'IS_START_TASK', '1', '2020-04-17 17:13:59', '2020-04-22 16:57:40', 'IS_START_TASK');

2.安装python3,安装包:pymysql,dateutil,concurrent_log_handler,DBUtils。

3.下载工具包,解压到ETL服务器上。

4.初始化配置文件settings,加载任务和依赖数据。

  • 目录结构描述

                                                        

文件名称 描述
log 任务执行日志,按照文件大小切分。
taskmodels 任务模型,主要有关于数据库操作的方法。
utils 主要有数据连接日志,发送邮件的公共方法。
settings.py 调度工具配置信息。
taskinit.py 任务初始化执行文件。
taskschedule.py 任务调度执行文件。
sendmail.py 发送邮箱执行文件。
  • 运行调度工具 

运行 taskinit.py、taskschedule.py、sendmail.py三个文件

 

 

 

 


转载:https://blog.csdn.net/mark_wu2000/article/details/105726247
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场