作业类型
xxl-job
支持七种作业类型:Bean
、GLUE(Java)
、GLUE(Shell)
、GLUE(Python)
、GLUE(PHP)
、GLUE(Nodejs)
、GLUE(PowerShell)
。其中,GLUE类型作业
都是在admin管理端
编辑业务代码,而Bean类型作业
是将用户业务代码逻辑集成到xxl-job
进行调度,源码位于用户项目中,而非xxl-job
的admin模块
。
xxl-job
抽象IJobHandler
组件,用于执行作业,其实现有三种(见下图):
MethodJobHandler
:Bean
类型作业处理器,Bean类型作业
逻辑实际上封装在带有@XxlJob
注解的Method
中;
ScriptJobHandler
:脚本类型作业处理器,如Shell
、Python
、PHP
、Nodejs
、PowerShell
等都可以看出脚本类型作业,使用该处理器;
GlueJobHandler
:该种作业处理器专门用于处理Glue(Java)
类型作业,上节分析过Java
类型作业会被GlueFactory
编译、初始化成实例,然后封装到GlueJobHandler
中进行执行;
执行流程
服务端流程
服务端作业执行触发入口见JobTriggerPoolHelper#addTrigger
:
-
public
void addTrigger(
final
int jobId,
-
final TriggerTypeEnum triggerType,
-
final
int failRetryCount,
-
final
String executorShardingParam,
-
final
String executorParam,
-
final
String addressList) {
-
-
// 这里根据一定规则将触发任务从两个线程池中选取一个进行投递
-
// fastTriggerPool:默认投递线程池
-
// slowTriggerPool:慢作业投递到该线程池
-
// 慢作业定义:投递超过500ms,且累计一分钟超过10次(每分钟重置缓存重新计算),则该作业就是慢作业,后续执行时使用slowTriggerPool
-
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
-
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
-
if (jobTimeoutCount!=
null && jobTimeoutCount.get() >
10) {
// job-timeout 10 times in 1 min
-
triggerPool_ = slowTriggerPool;
-
}
-
-
// trigger
-
triggerPool_.execute(
new Runnable() {
-
@Override
-
public
void run() {
-
-
long start = System.currentTimeMillis();
-
-
try {
-
// 触发作业
-
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
-
}
catch (
Exception e) {
-
logger.
error(e.getMessage(), e);
-
}
finally {
-
-
// 每分钟清空慢作业累计缓存
-
long minTim_now = System.currentTimeMillis()/
60000;
-
if (minTim != minTim_now) {
-
minTim = minTim_now;
-
jobTimeoutCountMap.clear();
-
}
-
-
// 超过500ms则慢作业执行次数累计+1,
-
// 执行端采用异步模式:作业下发到执行端放入到队列中即返回,所以,这个时间是不包括作业本身执行时间
-
long cost = System.currentTimeMillis()-start;
-
if (cost >
500) {
// ob-timeout threshold 500ms
-
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId,
new AtomicInteger(
1));
-
if (timeoutCount !=
null) {
-
timeoutCount.incrementAndGet();
-
}
-
}
-
}
-
-
}
-
});
-
}
继续向下跟踪XxlJobTrigger#trigger
:
-
private
static
void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo,
int finalFailRetryCount, TriggerTypeEnum triggerType,
int index,
int total){
-
-
// 阻塞处理策略
-
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
-
// 路由策略
-
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),
null);
// route strategy
-
// 分片参数
-
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?
String.valueOf(index).concat(
"/").concat(
String.valueOf(total)):
null;
-
-
// 1、save log-id
-
XxlJobLog jobLog =
new XxlJobLog();
-
jobLog.setJobGroup(jobInfo.getJobGroup());
-
jobLog.setJobId(jobInfo.getId());
-
jobLog.setTriggerTime(
new Date());
-
// xxl_job_log插入运行日志
-
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
-
logger.debug(
">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
-
-
// 2、init trigger-param
-
TriggerParam triggerParam =
new TriggerParam();
-
triggerParam.setJobId(jobInfo.getId());
-
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
-
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
-
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
-
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
-
triggerParam.setLogId(jobLog.getId());
-
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
-
triggerParam.setGlueType(jobInfo.getGlueType());
-
triggerParam.setGlueSource(jobInfo.getGlueSource());
-
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
-
triggerParam.setBroadcastIndex(index);
-
triggerParam.setBroadcastTotal(total);
-
-
// 初始化执行器地址
-
String address =
null;
-
ReturnT<
String> routeAddressResult =
null;
-
if (group.getRegistryList()!=
null && !group.getRegistryList().isEmpty()) {
-
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
-
// 分片广播模式
-
if (index < group.getRegistryList().size()) {
-
address = group.getRegistryList().get(index);
-
}
else {
-
address = group.getRegistryList().get(
0);
-
}
-
}
else {
-
//路由策略选取执行器地址
-
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
-
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
-
address = routeAddressResult.getContent();
-
}
-
}
-
}
else {
-
routeAddressResult =
new ReturnT<
String>(ReturnT.FAIL_CODE, I18nUtil.getString(
"jobconf_trigger_address_empty"));
-
}
-
-
// 4、trigger remote executor
-
ReturnT<
String> triggerResult =
null;
-
if (address !=
null) {
-
// 作业执行
-
triggerResult = runExecutor(triggerParam, address);
-
}
else {
-
triggerResult =
new ReturnT<
String>(ReturnT.FAIL_CODE,
null);
-
}
-
-
// 收集执行信息
-
StringBuffer triggerMsgSb =
new StringBuffer();
-
triggerMsgSb.append(I18nUtil.getString(
"jobconf_trigger_type")).append(
":").append(triggerType.getTitle());
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobconf_trigger_admin_adress")).append(
":").append(IpUtil.getIp());
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobconf_trigger_exe_regtype")).append(
":")
-
.append( (group.getAddressType() ==
0)?I18nUtil.getString(
"jobgroup_field_addressType_0"):I18nUtil.getString(
"jobgroup_field_addressType_1") );
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobconf_trigger_exe_regaddress")).append(
":").append(group.getRegistryList());
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobinfo_field_executorRouteStrategy")).append(
":").append(executorRouteStrategyEnum.getTitle());
-
if (shardingParam !=
null) {
-
triggerMsgSb.append(
"("+shardingParam+
")");
-
}
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobinfo_field_executorBlockStrategy")).append(
":").append(blockStrategy.getTitle());
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobinfo_field_timeout")).append(
":").append(jobInfo.getExecutorTimeout());
-
triggerMsgSb.append(
"<br>").append(I18nUtil.getString(
"jobinfo_field_executorFailRetryCount")).append(
":").append(finalFailRetryCount);
-
-
triggerMsgSb.append(
"<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString(
"jobconf_trigger_run") +
"<<<<<<<<<<< </span><br>")
-
.append((routeAddressResult!=
null&&routeAddressResult.getMsg()!=
null)?routeAddressResult.getMsg()+
"<br><br>":
"").append(triggerResult.getMsg()!=
null?triggerResult.getMsg():
"");
-
-
// 6、save log trigger-info
-
jobLog.setExecutorAddress(address);
-
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
-
jobLog.setExecutorParam(jobInfo.getExecutorParam());
-
jobLog.setExecutorShardingParam(shardingParam);
-
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
-
//jobLog.setTriggerTime();
-
jobLog.setTriggerCode(triggerResult.getCode());
-
jobLog.setTriggerMsg(triggerMsgSb.toString());
-
-
// 将执行信息更新到xxl_job_log日志表中
-
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
-
-
logger.debug(
">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
-
}
这个方法代码比较多,但是逻辑都比较简单,核心逻辑:广播或路由策略选取执行器地址 -> 作业执行 -> 收集执行信息更新到xxl_job_log日志表中。
路由策略下节单独分析,接下里继续跟踪作业执行流程XxlJobTrigger#runExecutor
:
-
public
static ReturnT<
String> runExecutor(TriggerParam triggerParam,
String address){
-
ReturnT<
String> runResult =
null;
-
try {
-
// 根据address获取ExecutorBiz
-
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
-
runResult = executorBiz.run(triggerParam);
-
}
catch (
Exception e) {
-
logger.
error(
">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
-
runResult =
new ReturnT<
String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
-
}
-
-
// 结果解析
-
StringBuffer runResultSB =
new StringBuffer(I18nUtil.getString(
"jobconf_trigger_run") +
":");
-
runResultSB.append(
"<br>address:").append(address);
-
runResultSB.append(
"<br>code:").append(runResult.getCode());
-
runResultSB.append(
"<br>msg:").append(runResult.getMsg());
-
-
runResult.setMsg(runResultSB.toString());
-
return runResult;
-
}
根据address
获取对应的执行器代理ExecutorBiz
,然后调用其run
方法将作业下发到执行器端运行。上节分析过执行器启动时使用netty
初始化一个http server
的web容器
,所以,这里的下发逻辑比较简单,就是调用http
接口XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
。
执行端流程
上节执行器启动流程分析过其在启动时会利用netty
初始化一个http server
的web容器
,用于接收admin
下发指令,然后将接收到的指令转给EmbedHttpServerHandler#process
处理:
-
private
Object process(HttpMethod httpMethod,
String uri,
String requestData,
String accessTokenReq) {
-
-
// valid
-
if (HttpMethod.POST != httpMethod) {
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"invalid request, HttpMethod not support.");
-
}
-
if (uri==
null || uri.trim().length()==
0) {
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"invalid request, uri-mapping empty.");
-
}
-
if (accessToken!=
null
-
&& accessToken.trim().length()>
0
-
&& !accessToken.equals(accessTokenReq)) {
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"The access token is wrong.");
-
}
-
-
// services mapping
-
try {
-
if (
"/beat".equals(uri)) {
//执行器是否正常(在线),对应路由策略:故障转移
-
return executorBiz.beat();
-
}
else
if (
"/idleBeat".equals(uri)) {
// 执行器是否空闲,对应路由策略:忙碌转移
-
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.
class);
-
return executorBiz.idleBeat(idleBeatParam);
-
}
else
if (
"/run".equals(uri)) {
-
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.
class);
-
return executorBiz.run(triggerParam);
-
}
else
if (
"/kill".equals(uri)) {
// kill作业指令监听
-
logger.info(
"receive kill, data:{}", requestData);
-
KillParam killParam = GsonTool.fromJson(requestData, KillParam.
class);
-
return executorBiz.kill(killParam);
-
}
else
if (
"/log".equals(uri)) {
// 查看执行器调度日志监听
-
LogParam logParam = GsonTool.fromJson(requestData, LogParam.
class);
-
return executorBiz.log(logParam);
-
}
else {
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"invalid request, uri-mapping("+ uri +
") not found.");
-
}
-
}
catch (
Exception e) {
-
logger.
error(e.getMessage(), e);
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"request error:" + ThrowableUtil.toString(e));
-
}
-
}
继续跟踪ExecutorBizImpl#run
:
-
@Override
-
public ReturnT<
String> run(TriggerParam triggerParam) {
-
// load old:jobHandler + jobThread
-
// 根据jobId从缓存中加载JobThread和IJobHandler
-
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
-
IJobHandler jobHandler = jobThread!=
null?jobThread.getHandler():
null;
-
String removeOldReason =
null;
-
-
// 作业类型匹配 并进行IJobHandler校验
-
// 比如作业IJobHandler发送变更、Glue类作业源码出现编辑等,则之前缓存的JobThread不能再继续使用,并使用最新IJobHandler创建JobThread
-
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
-
if (GlueTypeEnum.BEAN == glueTypeEnum) {
//Bean类型作业
-
......
-
}
else
if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
//Java类型作业
-
......
-
}
else
if (glueTypeEnum!=
null && glueTypeEnum.isScript()) {
//脚本类作业
-
......
-
}
else {
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"glueType[" + triggerParam.getGlueType() +
"] is not valid.");
-
}
-
-
if (jobThread !=
null) {
-
// 如果JobThread != null,则该JobThread可能存在正在运行作业,则根据阻塞策略处理
-
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(),
null);
-
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
-
// 丢弃后续调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则当前作业丢弃
-
if (jobThread.isRunningOrHasQueue()) {
-
return
new ReturnT<
String>(ReturnT.FAIL_CODE,
"block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
-
}
-
}
else
if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
-
// 覆盖之前调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则destroy之前的JobThread,并重新创建JobThread运行当前作业
-
if (jobThread.isRunningOrHasQueue()) {
-
removeOldReason =
"block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
-
-
jobThread =
null;
-
}
-
}
else {
-
// 单机串行则直接将作业发送到JobThread的triggerQueue中即可
-
}
-
}
-
-
if (jobThread ==
null) {
-
// 创建JobThread,并放入缓存,如果jobId缓存中已存在,则destroy
-
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
-
}
-
-
logger.debug(
"jobThread.pushTriggerQueue hash:{}, data:{}", System.identityHashCode(jobThread), GsonTool.toJson(triggerParam));
-
// 将下发的作业放入到JobThread的triggerQueue中,JobThread处理线程从triggerQueue提取执行
-
ReturnT<
String> pushResult = jobThread.pushTriggerQueue(triggerParam);
-
return pushResult;
-
}
下发的作业被投递到JobThread
的triggerQueue
队列中,JobThread#run
:
-
@Override
-
public
void run() {
-
-
try {
-
// 调用IJobHandler.init方法,如@XxlJob(init=xxx)即在这里调用
-
handler.init();
-
}
catch (
Throwable e) {
-
logger.
error(e.getMessage(), e);
-
}
-
-
while(!toStop){
-
// running=false表示当前JobThread没有在处理作业
-
// isRunningOrHasQueue()中判断JobThread是否运行用到该值以及triggerQueue
-
running =
false;
-
// 空闲次数累加+1
-
idleTimes++;
-
-
TriggerParam triggerParam =
null;
-
ReturnT<
String> executeResult =
null;
-
try {
-
triggerParam = triggerQueue.poll(
3L, TimeUnit.SECONDS);
-
if (triggerParam!=
null) {
-
// running=true表示当前JobThread正在处理作业
-
running =
true;
-
// 重置空闲统计次数
-
idleTimes =
0;
-
triggerLogIdSet.remove(triggerParam.getLogId());
-
-
// log filename, like "logPath/yyyy-MM-dd/9999.log"
-
// 初始化日志文件
-
String logFileName = XxlJobFileAppender.makeLogFileName(
new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
-
XxlJobFileAppender.contextHolder.set(logFileName);
-
// 将分片信息注入到线程上下文中:InheritableThreadLocal
-
ShardingUtil.setShardingVo(
new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
-
-
// execute
-
XxlJobLogger.log(
"<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
-
-
// executorTimeout:作业执行超时控制
-
// 正常执行作业是handler.execute(triggerParam.getExecutorParams()),
-
// 如果带有超时控制,则封装FutureTask放入到线程中异步执行,超时则触发中断并返回超时异常
-
if (triggerParam.getExecutorTimeout() >
0) {
-
// limit timeout
-
Thread futureThread =
null;
-
try {
-
final TriggerParam triggerParamTmp = triggerParam;
-
FutureTask<ReturnT<
String>> futureTask =
new FutureTask<ReturnT<
String>>(
new
Callable<ReturnT<
String>>() {
-
@Override
-
public ReturnT<
String> call() throws
Exception {
-
return handler.execute(triggerParamTmp.getExecutorParams());
-
}
-
});
-
futureThread =
new Thread(futureTask);
-
futureThread.start();
-
-
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
-
}
catch (TimeoutException e) {
-
-
XxlJobLogger.log(
"<br>----------- xxl-job job execute timeout");
-
XxlJobLogger.log(e);
-
-
executeResult =
new ReturnT<
String>(IJobHandler.FAIL_TIMEOUT.getCode(),
"job execute timeout ");
-
}
finally {
-
futureThread.interrupt();
-
}
-
}
else {
-
// 调用对应的IJobHandler处理作业
-
executeResult = handler.execute(triggerParam.getExecutorParams());
-
}
-
-
if (executeResult ==
null) {
-
executeResult = IJobHandler.FAIL;
-
}
else {
-
executeResult.setMsg(
-
(executeResult!=
null&&executeResult.getMsg()!=
null&&executeResult.getMsg().length()>
50000)
-
?executeResult.getMsg().substring(
0,
50000).concat(
"...")
-
:executeResult.getMsg());
-
executeResult.setContent(
null);
// limit obj size
-
}
-
XxlJobLogger.log(
"<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
-
-
}
else {
-
// 连续超时30次(每次3秒),即90秒内JobThread一直空闲,则销毁JobThread
-
if (idleTimes >
30) {
-
if(triggerQueue.size() ==
0) {
// avoid concurrent trigger causes jobId-lost
-
XxlJobExecutor.removeJobThread(jobId,
"excutor idel times over limit.");
-
}
-
}
-
}
-
}
catch (
Throwable e) {
-
if (toStop) {
-
XxlJobLogger.log(
"<br>----------- JobThread toStop, stopReason:" + stopReason);
-
}
-
-
StringWriter stringWriter =
new StringWriter();
-
e.printStackTrace(
new PrintWriter(stringWriter));
-
String errorMsg = stringWriter.toString();
-
executeResult =
new ReturnT<
String>(ReturnT.FAIL_CODE, errorMsg);
-
-
// 作业执行异常,则将异常信息写入到日志中
-
XxlJobLogger.log(
"<br>----------- JobThread Exception:" + errorMsg +
"<br>----------- xxl-job job execute end(error) -----------");
-
}
finally {
-
if(triggerParam !=
null) {
-
if (!toStop) {
-
// JobThread未停止场景下,异步回调机制将执行结果推送到admin
-
TriggerCallbackThread.pushCallBack(
new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
-
}
else {
-
// JobThread停止场景下,异步回调机制将kill异常推送到admin
-
ReturnT<
String> stopResult =
new ReturnT<
String>(ReturnT.FAIL_CODE, stopReason +
" [job running, killed]");
-
TriggerCallbackThread.pushCallBack(
new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
-
}
-
}
-
}
-
}
-
-
// JobThread被kill,检查下triggerQueue是否还有等待触发作业,如果有则向admin推送异常信息
-
while(triggerQueue !=
null && triggerQueue.size()>
0){
-
TriggerParam triggerParam = triggerQueue.poll();
-
if (triggerParam!=
null) {
-
// is killed
-
ReturnT<
String> stopResult =
new ReturnT<
String>(ReturnT.FAIL_CODE, stopReason +
" [job not executed, in the job queue, killed.]");
-
TriggerCallbackThread.pushCallBack(
new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
-
}
-
}
-
-
// destroy
-
try {
-
// 销毁IJobHandler,调用IJobHandler.destroy方法,如@XxlJob(destroy=xxx)即在这里调用
-
handler.destroy();
-
}
catch (
Throwable e) {
-
logger.
error(e.getMessage(), e);
-
}
-
-
logger.info(
">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
-
}
上面代码很多,但是逻辑不太复杂,看注释很容易理解到,接下来再来看下执行流程中最后一个核心组件IJobHandler
,调用作业执行逻辑被封装到该组件中,xxl-job
内置提供了三种实现方式,分别对应调用Bean
、Java
和脚本类型
作业,其实现不太复杂,这里就不再继续深入分析。
核心抽象组件
ExecutorRouter
:路由组件,选取执行器地址;
ExecutorBizClient
:路由组件选取任务执行器地址后,将其包装成ExecutorBizClient
,ExecutorBizClient
可以看成执行器在引擎端代理,屏蔽远程RPC网络通信底层细节;
EmbedHttpServerHandler
:执行器通过netty
实现http server
容器,EmbedHttpServerHandler
扩展组件用于处理接收指令;
ExecutorBizImpl
:ExecutorBizClient
作为执行器在引擎端代理,主要将指令通过RPC
转发给执行器,起到透传作用,ExecutorBizImpl
则是执行器上真正实现逻辑封装,所以,ExecutorBizClient
和ExecutorBizImpl
都实现同一接口ExecutorBiz
。
JobThread
:每个任务在执行器上执行都会对应一个JobThread
,任务和任务间是互相独立的,JobThread
控制任务在执行器上并发模型。
IJobHandler
:IJobHandler
则是封装怎么调用任务逻辑,xxl-job
内置三种实现类分别用来调用不同类型任务。
总结
上面对xxl-job
作业执行的核心关键代码进行了整体分析梳理,整体还是比较简单,可能比较枯燥,下面简要整理了作业执行的大概流程(见下图),可对xxl-job
调度机制有个大致理解:
大致描述:
xxl-job
整体架构采用中心化设计,分为调度中心Admin
和执行器两部分;调度中心
Admin
模块提供trigger
触发接口进行作业调度,然后根据作业历史统计下发耗时将作业分配到两个线程池中的一个进行执行;执行前将作业启动日志记录到
xxl_job_log
表中,然后利用路由组件选取执行器地址,并利用执行器代理ExecutorBiz
将执行下发到路由的执行器上,执行器代理ExecutorBiz
实现很简单:就是发送http
请求;执行器在启动时会利用
netty
初始化一个内嵌http server
容器,当接收到调度中心发送过来的指令后,将其转交给EmbedHttpServerHandler
处理器进行处理;EmbedHttpServerHandler
处理器在处理作业运行指令时,会根据jobId
从缓存中查找对应的JobThread
,然后将作业执行指令投递到JobThread
实例中triggerQueue
队列中排队;JobThread
线程不停循环从triggerQueue
队列中提取等待执行的作业信息,然后将其交由IJobHandler
真正处理作业调用,JobThread
将IJobHandler
处理结果解析后投递给TriggerCallbackThread
线程中callBackQueue
队列中排队;TriggerCallbackThread
内部也是线程不停循环从callBackQueue
提取回调任务,然后转交给doCallback
方法,这个方法内部通过Admin
代理类AdminBizClient
叫结果回调发送给调用中心的回调接口,即完成作业完成通知。
上面就是xxl-job
作业执行的整体大致流程,将其抽象出来的几个核心组件串联起来看清其脉络,则整个逻辑就比较清晰了。这里理解关键点是JobThread
组件,每个作业在每个执行器中会对应一个JobThread
实例,当作业下发到执行器上时,找到对应的JobThread
进行处理。JobThread
采用懒加载和缓存模式设计,只有作业下发执行器未找到对应的JobThread
才会创建并返回起来,待下次同一个作业过来执行时直接使用该JobThread
即可。
什么场景下执行器找不到JobThread
:
作业第一次下发到该执行器;
JobThread
内部线程循环不停从triggerQueue
提取作业进行处理,且每个作业在执行器上对应一个JobThread
,如果某个作业在执行器上执行一次后面不再执行、或者执行频率很低,可能会导致大量线程浪费,所以JobThread
设计上有空闲超时自动销毁机制。当30 * 3 = 90秒
没有执行作业,则判断JobThread
空闲超时,进入销毁流程,后面又接收到该作业下发来的指令,则会重新创建JobThread
。
长按二维码识别关注
转载:https://blog.csdn.net/god_86/article/details/114650439