- 概述
随着数据仓库的开发,ETL作业会越来越多,怎么把这些作业有序的运行起来,就需要一个健壮的调度系统来保证数据能够准确、及时的提供给BI应用程序。
- 调度系统设计目标
- 调度系统架构
- ETL作业
数据仓库的ETL作业可能不至一种,需要把各种作业再次进行封装,建立作业的标准格式,统一作业的输入参数、输出参数和参数格式,达到所有的作业调度方式一致。
- 作业管理
提供便捷的作业配置和依赖关系配置页面。
- 作业调度
根据作业调度算法进行自动执行,如果发生异常情况可以进行人工重启和停止。
- 作业监控
对作业运行情况进行监控,发送邮件或者提供监控页面。
- 作业逻辑模型
- 作业调度状态流转图
- 初始化:把新添加的作业加入到执行计划中。
- 待运行:根据时间和依赖关系把可执行的任务更新为待运行。
- 运行中:把待运行的任务执行,更新为运行中。
- 结束:作业运行完成,分为成功和失败。
- python实现调度工具
- 准备环境
1.安装mysql数据库,执行建表脚本。
-
CREATE
TABLE
`task_info` (
-
`task_id`
int(
11)
NOT
NULL AUTO_INCREMENT
COMMENT
'任务主键',
-
`task_name`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务名称',
-
`task_dir`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务所在路径',
-
`task_cmd`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务启动命令',
-
`task_argv`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务参数',
-
`task_level`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务层级',
-
`target_table_name`
varchar(
255)
DEFAULT
NULL
COMMENT
'目标表名称',
-
`start_time` datetime
DEFAULT
NULL
COMMENT
'任务启动日期',
-
`task_fq`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务执行频率:季-quarters,月-months,周-weeks,天-days,小时-hours,分钟-minutes',
-
`time_interval`
int(
11)
DEFAULT
NULL
COMMENT
'时间间隔',
-
`create_user`
varchar(
255)
DEFAULT
NULL
COMMENT
'创建人',
-
`create_time` datetime
DEFAULT
CURRENT_TIMESTAMP
COMMENT
'创建时间',
-
`update_user` datetime
DEFAULT
NULL
COMMENT
'更新人',
-
`update_time` datetime
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'更新时间',
-
`is_active` tinyint(
4)
DEFAULT
NULL
COMMENT
'是否有效',
-
PRIMARY
KEY (
`task_id`),
-
UNIQUE
KEY
`uq_task_name` (
`task_name`)
USING BTREE
-
)
ENGINE=
InnoDB AUTO_INCREMENT=
1427
DEFAULT
CHARSET=utf8;
-
CREATE
TABLE
`task_dpd` (
-
`id`
int(
11)
NOT
NULL AUTO_INCREMENT
COMMENT
'主键',
-
`task_id`
int(
11)
DEFAULT
NULL
COMMENT
'任务外键',
-
`task_name`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务名称',
-
`pre_task_id`
int(
11)
DEFAULT
NULL
COMMENT
'依赖任务外键',
-
`is_active` tinyint(
4)
DEFAULT
NULL
COMMENT
'是否有效',
-
`create_time` datetime
DEFAULT
CURRENT_TIMESTAMP
COMMENT
'创建时间',
-
`update_time` datetime
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'更新时间',
-
PRIMARY
KEY (
`id`),
-
KEY
`fk_task_dpd_info_id` (
`task_id`),
-
CONSTRAINT
`fk_task_dpd_info_id` FOREIGN
KEY (
`task_id`)
REFERENCES
`task_info` (
`task_id`)
-
)
ENGINE=
InnoDB AUTO_INCREMENT=
2064
DEFAULT
CHARSET=utf8;
-
CREATE
TABLE
`task_plan` (
-
`id`
int(
11)
NOT
NULL AUTO_INCREMENT
COMMENT
'主键',
-
`task_id`
int(
11)
DEFAULT
NULL
COMMENT
'外键',
-
`task_name`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务名称',
-
`begtime` datetime
DEFAULT
NULL
COMMENT
'开始时间',
-
`endtime` datetime
DEFAULT
NULL
COMMENT
'结束时间',
-
`task_status` tinyint(
4)
DEFAULT
NULL
COMMENT
'任务状态 1-初始化,2-待运行,3-运行中,4-成功,5-失败',
-
`retry_count` tinyint(
4)
DEFAULT
'0'
COMMENT
'重试次数',
-
`failed_desc`
text
COMMENT
'失败原因描述',
-
`next_time` datetime
DEFAULT
NULL
COMMENT
'下次运行时间',
-
`task_ppid`
int(
11)
DEFAULT
NULL
COMMENT
'父进程id',
-
`task_pid`
int(
11)
DEFAULT
NULL
COMMENT
'子进程id',
-
`create_time` datetime
DEFAULT
CURRENT_TIMESTAMP
COMMENT
'创建时间',
-
`update_time` datetime
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'更新时间',
-
`task_result`
text
COMMENT
'执行结果信息',
-
PRIMARY
KEY (
`id`),
-
KEY
`fk_task_info_task_id` (
`task_id`),
-
CONSTRAINT
`fk_task_info_task_id` FOREIGN
KEY (
`task_id`)
REFERENCES
`task_info` (
`task_id`)
-
)
ENGINE=
InnoDB AUTO_INCREMENT=
6401
DEFAULT
CHARSET=utf8;
-
-
CREATE
TABLE
`task_log` (
-
`id`
int(
11)
NOT
NULL AUTO_INCREMENT
COMMENT
'主键',
-
`task_id`
int(
11)
NOT
NULL
COMMENT
'任务外键',
-
`task_name`
varchar(
255)
DEFAULT
NULL
COMMENT
'任务名称',
-
`begtime` datetime
DEFAULT
NULL
COMMENT
'开始时间',
-
`endtime` datetime
DEFAULT
NULL
COMMENT
'结束时间',
-
`task_status` tinyint(
4)
DEFAULT
NULL
COMMENT
'任务状态 4-成功,5-失败',
-
`task_result`
text
COMMENT
'执行结果信息',
-
`failed_desc`
text
COMMENT
'失败信息',
-
`create_time` datetime
DEFAULT
CURRENT_TIMESTAMP
COMMENT
'创建时间',
-
`update_time` datetime
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'更新时间',
-
PRIMARY
KEY (
`id`),
-
KEY
`fk_task_info_id` (
`task_id`),
-
CONSTRAINT
`fk_task_info_id` FOREIGN
KEY (
`task_id`)
REFERENCES
`task_info` (
`task_id`)
-
)
ENGINE=
InnoDB AUTO_INCREMENT=
2382
DEFAULT
CHARSET=utf8;
-
CREATE
TABLE
`task_param` (
-
`id`
int(
11)
NOT
NULL AUTO_INCREMENT
COMMENT
'主键',
-
`param_type`
varchar(
255)
DEFAULT
NULL
COMMENT
'参数类型',
-
`param_value`
varchar(
255)
DEFAULT
NULL
COMMENT
'参数值',
-
`create_time` datetime
DEFAULT
CURRENT_TIMESTAMP
COMMENT
'创建时间',
-
`update_time` datetime
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'更新时间',
-
`param_name`
varchar(
255)
DEFAULT
NULL
COMMENT
'参数名称',
-
PRIMARY
KEY (
`id`)
-
)
ENGINE=
InnoDB AUTO_INCREMENT=
2
DEFAULT
CHARSET=utf8;
-
INSERT
INTO
`task_param`(
`id`,
`param_type`,
`param_value`,
`create_time`,
`update_time`,
`param_name`)
VALUES (
1,
'IS_START_TASK',
'1',
'2020-04-17 17:13:59',
'2020-04-22 16:57:40',
'IS_START_TASK');
2.安装python3,安装包:pymysql,dateutil,concurrent_log_handler,DBUtils。
3.下载工具包,解压到ETL服务器上。
4.初始化配置文件settings,加载任务和依赖数据。
- 目录结构描述
文件名称 | 描述 |
log | 任务执行日志,按照文件大小切分。 |
taskmodels | 任务模型,主要有关于数据库操作的方法。 |
utils | 主要有数据连接日志,发送邮件的公共方法。 |
settings.py | 调度工具配置信息。 |
taskinit.py | 任务初始化执行文件。 |
taskschedule.py | 任务调度执行文件。 |
sendmail.py | 发送邮箱执行文件。 |
- 运行调度工具
运行 taskinit.py、taskschedule.py、sendmail.py三个文件
转载:https://blog.csdn.net/mark_wu2000/article/details/105726247
查看评论