飞道的博客

03 xxl-job任务执行流程

964人阅读  评论(0)

作业类型

xxl-job支持七种作业类型:BeanGLUE(Java)GLUE(Shell)GLUE(Python)GLUE(PHP)GLUE(Nodejs)GLUE(PowerShell)。其中,GLUE类型作业都是在admin管理端编辑业务代码,而Bean类型作业是将用户业务代码逻辑集成到xxl-job进行调度,源码位于用户项目中,而非xxl-jobadmin模块

xxl-job抽象IJobHandler组件,用于执行作业,其实现有三种(见下图):

MethodJobHandler:Bean类型作业处理器,Bean类型作业逻辑实际上封装在带有@XxlJob注解的Method中;

ScriptJobHandler:脚本类型作业处理器,如ShellPythonPHPNodejsPowerShell等都可以看出脚本类型作业,使用该处理器;

GlueJobHandler:该种作业处理器专门用于处理Glue(Java)类型作业,上节分析过Java类型作业会被GlueFactory编译、初始化成实例,然后封装到GlueJobHandler中进行执行;

执行流程

服务端流程

服务端作业执行触发入口见JobTriggerPoolHelper#addTrigger


   
  1. public void addTrigger( final int jobId,
  2.                       final TriggerTypeEnum triggerType,
  3.                       final int failRetryCount,
  4.                       final String executorShardingParam,
  5.                       final String executorParam,
  6.                       final String addressList) {
  7.     // 这里根据一定规则将触发任务从两个线程池中选取一个进行投递
  8.     // fastTriggerPool:默认投递线程池
  9.     // slowTriggerPool:慢作业投递到该线程池
  10.     // 慢作业定义:投递超过500ms,且累计一分钟超过10次(每分钟重置缓存重新计算),则该作业就是慢作业,后续执行时使用slowTriggerPool
  11.    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
  12.    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
  13.     if (jobTimeoutCount!= null && jobTimeoutCount.get() > 10) {       // job-timeout 10 times in 1 min
  14.        triggerPool_ = slowTriggerPool;
  15.    }
  16.     // trigger
  17.    triggerPool_.execute( new Runnable() {
  18.        @Override
  19.         public void run() {
  20.            long start = System.currentTimeMillis();
  21.             try {
  22.                 // 触发作业
  23.                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
  24.            } catch ( Exception e) {
  25.                logger. error(e.getMessage(), e);
  26.            } finally {
  27.                 // 每分钟清空慢作业累计缓存
  28.                long minTim_now = System.currentTimeMillis()/ 60000;
  29.                 if (minTim != minTim_now) {
  30.                    minTim = minTim_now;
  31.                    jobTimeoutCountMap.clear();
  32.                }
  33.                 // 超过500ms则慢作业执行次数累计+1,
  34.                 // 执行端采用异步模式:作业下发到执行端放入到队列中即返回,所以,这个时间是不包括作业本身执行时间
  35.                long cost = System.currentTimeMillis()-start;
  36.                 if (cost > 500) {       // ob-timeout threshold 500ms
  37.                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger( 1));
  38.                     if (timeoutCount != null) {
  39.                        timeoutCount.incrementAndGet();
  40.                    }
  41.                }
  42.            }
  43.        }
  44.    });
  45. }

继续向下跟踪XxlJobTrigger#trigger:


   
  1. private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
  2.     // 阻塞处理策略
  3.    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
  4.     // 路由策略
  5.    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);     // route strategy
  6.     // 分片参数
  7.     String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)? String.valueOf(index).concat( "/").concat( String.valueOf(total)): null;
  8.     // 1、save log-id
  9.    XxlJobLog jobLog = new XxlJobLog();
  10.    jobLog.setJobGroup(jobInfo.getJobGroup());
  11.    jobLog.setJobId(jobInfo.getId());
  12.    jobLog.setTriggerTime( new Date());
  13.     // xxl_job_log插入运行日志
  14.    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
  15.    logger.debug( ">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
  16.     // 2、init trigger-param
  17.    TriggerParam triggerParam = new TriggerParam();
  18.    triggerParam.setJobId(jobInfo.getId());
  19.    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
  20.    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
  21.    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
  22.    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
  23.    triggerParam.setLogId(jobLog.getId());
  24.    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
  25.    triggerParam.setGlueType(jobInfo.getGlueType());
  26.    triggerParam.setGlueSource(jobInfo.getGlueSource());
  27.    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
  28.    triggerParam.setBroadcastIndex(index);
  29.    triggerParam.setBroadcastTotal(total);
  30.     // 初始化执行器地址
  31.     String address = null;
  32.    ReturnT< String> routeAddressResult = null;
  33.     if (group.getRegistryList()!= null && !group.getRegistryList().isEmpty()) {
  34.         if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
  35.             // 分片广播模式
  36.             if (index < group.getRegistryList().size()) {
  37.                address = group.getRegistryList().get(index);
  38.            } else {
  39.                address = group.getRegistryList().get( 0);
  40.            }
  41.        } else {
  42.             //路由策略选取执行器地址
  43.            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
  44.             if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
  45.                address = routeAddressResult.getContent();
  46.            }
  47.        }
  48.    } else {
  49.        routeAddressResult = new ReturnT< String>(ReturnT.FAIL_CODE, I18nUtil.getString( "jobconf_trigger_address_empty"));
  50.    }
  51.     // 4、trigger remote executor
  52.    ReturnT< String> triggerResult = null;
  53.     if (address != null) {
  54.         // 作业执行
  55.        triggerResult = runExecutor(triggerParam, address);
  56.    } else {
  57.        triggerResult = new ReturnT< String>(ReturnT.FAIL_CODE, null);
  58.    }
  59.     // 收集执行信息
  60.    StringBuffer triggerMsgSb = new StringBuffer();
  61.    triggerMsgSb.append(I18nUtil.getString( "jobconf_trigger_type")).append( ":").append(triggerType.getTitle());
  62.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobconf_trigger_admin_adress")).append( ":").append(IpUtil.getIp());
  63.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobconf_trigger_exe_regtype")).append( ":")
  64.                .append( (group.getAddressType() == 0)?I18nUtil.getString( "jobgroup_field_addressType_0"):I18nUtil.getString( "jobgroup_field_addressType_1") );
  65.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobconf_trigger_exe_regaddress")).append( ":").append(group.getRegistryList());
  66.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobinfo_field_executorRouteStrategy")).append( ":").append(executorRouteStrategyEnum.getTitle());
  67.     if (shardingParam != null) {
  68.        triggerMsgSb.append( "("+shardingParam+ ")");
  69.    }
  70.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobinfo_field_executorBlockStrategy")).append( ":").append(blockStrategy.getTitle());
  71.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobinfo_field_timeout")).append( ":").append(jobInfo.getExecutorTimeout());
  72.    triggerMsgSb.append( "<br>").append(I18nUtil.getString( "jobinfo_field_executorFailRetryCount")).append( ":").append(finalFailRetryCount);
  73.    triggerMsgSb.append( "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString( "jobconf_trigger_run") + "<<<<<<<<<<< </span><br>")
  74.                .append((routeAddressResult!= null&&routeAddressResult.getMsg()!= null)?routeAddressResult.getMsg()+ "<br><br>": "").append(triggerResult.getMsg()!= null?triggerResult.getMsg(): "");
  75.     // 6、save log trigger-info
  76.    jobLog.setExecutorAddress(address);
  77.    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
  78.    jobLog.setExecutorParam(jobInfo.getExecutorParam());
  79.    jobLog.setExecutorShardingParam(shardingParam);
  80.    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
  81.     //jobLog.setTriggerTime();
  82.    jobLog.setTriggerCode(triggerResult.getCode());
  83.    jobLog.setTriggerMsg(triggerMsgSb.toString());
  84.     // 将执行信息更新到xxl_job_log日志表中
  85.    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
  86.    logger.debug( ">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
  87. }

这个方法代码比较多,但是逻辑都比较简单,核心逻辑:广播或路由策略选取执行器地址 -> 作业执行 -> 收集执行信息更新到xxl_job_log日志表中。

路由策略下节单独分析,接下里继续跟踪作业执行流程XxlJobTrigger#runExecutor:


   
  1. public static ReturnT< String> runExecutor(TriggerParam triggerParam, String address){
  2.    ReturnT< String> runResult = null;
  3.     try {
  4.         // 根据address获取ExecutorBiz
  5.        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
  6.        runResult = executorBiz.run(triggerParam);
  7.    } catch ( Exception e) {
  8.        logger. error( ">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
  9.        runResult = new ReturnT< String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
  10.    }
  11.     // 结果解析
  12.    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString( "jobconf_trigger_run") + ":");
  13.    runResultSB.append( "<br>address:").append(address);
  14.    runResultSB.append( "<br>code:").append(runResult.getCode());
  15.    runResultSB.append( "<br>msg:").append(runResult.getMsg());
  16.    runResult.setMsg(runResultSB.toString());
  17.     return runResult;
  18. }

根据address获取对应的执行器代理ExecutorBiz,然后调用其run方法将作业下发到执行器端运行。上节分析过执行器启动时使用netty初始化一个http serverweb容器,所以,这里的下发逻辑比较简单,就是调用http接口XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);

执行端流程

上节执行器启动流程分析过其在启动时会利用netty初始化一个http serverweb容器,用于接收admin下发指令,然后将接收到的指令转给EmbedHttpServerHandler#process处理:


   
  1. private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
  2.        
  3.     // valid
  4.     if (HttpMethod.POST != httpMethod) {
  5.         return new ReturnT< String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
  6.    }
  7.     if (uri== null || uri.trim().length()== 0) {
  8.         return new ReturnT< String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
  9.    }
  10.     if (accessToken!= null
  11.            && accessToken.trim().length()> 0
  12.            && !accessToken.equals(accessTokenReq)) {
  13.         return new ReturnT< String>(ReturnT.FAIL_CODE, "The access token is wrong.");
  14.    }
  15.     // services mapping
  16.     try {
  17.         if ( "/beat".equals(uri)) { //执行器是否正常(在线),对应路由策略:故障转移
  18.             return executorBiz.beat();
  19.        } else if ( "/idleBeat".equals(uri)) { // 执行器是否空闲,对应路由策略:忙碌转移
  20.            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam. class);
  21.             return executorBiz.idleBeat(idleBeatParam);
  22.        } else if ( "/run".equals(uri)) {
  23.            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam. class);
  24.             return executorBiz.run(triggerParam);
  25.        } else if ( "/kill".equals(uri)) { // kill作业指令监听
  26.            logger.info( "receive kill, data:{}", requestData);
  27.            KillParam killParam = GsonTool.fromJson(requestData, KillParam. class);
  28.             return executorBiz.kill(killParam);
  29.        } else if ( "/log".equals(uri)) { // 查看执行器调度日志监听
  30.            LogParam logParam = GsonTool.fromJson(requestData, LogParam. class);
  31.             return executorBiz.log(logParam);
  32.        } else {
  33.             return new ReturnT< String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri + ") not found.");
  34.        }
  35.    } catch ( Exception e) {
  36.        logger. error(e.getMessage(), e);
  37.         return new ReturnT< String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
  38.    }
  39. }

继续跟踪ExecutorBizImpl#run:


   
  1. @Override
  2. public ReturnT< String> run(TriggerParam triggerParam) {
  3.     // load old:jobHandler + jobThread
  4.     // 根据jobId从缓存中加载JobThread和IJobHandler
  5.    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
  6.    IJobHandler jobHandler = jobThread!= null?jobThread.getHandler(): null;
  7.     String removeOldReason = null;
  8.     // 作业类型匹配 并进行IJobHandler校验
  9.     // 比如作业IJobHandler发送变更、Glue类作业源码出现编辑等,则之前缓存的JobThread不能再继续使用,并使用最新IJobHandler创建JobThread
  10.    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
  11.     if (GlueTypeEnum.BEAN == glueTypeEnum) { //Bean类型作业
  12.  ......
  13.    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { //Java类型作业
  14.  ......
  15.    } else if (glueTypeEnum!= null && glueTypeEnum.isScript()) { //脚本类作业
  16.  ......
  17.    } else {
  18.         return new ReturnT< String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
  19.    }
  20.     if (jobThread != null) {
  21.         // 如果JobThread != null,则该JobThread可能存在正在运行作业,则根据阻塞策略处理
  22.        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
  23.         if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
  24.             // 丢弃后续调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则当前作业丢弃
  25.             if (jobThread.isRunningOrHasQueue()) {
  26.                 return new ReturnT< String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
  27.            }
  28.        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
  29.             // 覆盖之前调度:如果JobThread还正在执行作业或其triggerQueue中有排队作业,则destroy之前的JobThread,并重新创建JobThread运行当前作业
  30.             if (jobThread.isRunningOrHasQueue()) {
  31.                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
  32.                jobThread = null;
  33.            }
  34.        } else {
  35.             // 单机串行则直接将作业发送到JobThread的triggerQueue中即可
  36.        }
  37.    }
  38.     if (jobThread == null) {
  39.         // 创建JobThread,并放入缓存,如果jobId缓存中已存在,则destroy
  40.        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
  41.    }
  42.    logger.debug( "jobThread.pushTriggerQueue hash:{}, data:{}", System.identityHashCode(jobThread), GsonTool.toJson(triggerParam));
  43.     // 将下发的作业放入到JobThread的triggerQueue中,JobThread处理线程从triggerQueue提取执行
  44.    ReturnT< String> pushResult = jobThread.pushTriggerQueue(triggerParam);
  45.     return pushResult;
  46. }

下发的作业被投递到JobThreadtriggerQueue队列中,JobThread#run:


   
  1. @Override
  2. public void run() {
  3.     try {
  4.     // 调用IJobHandler.init方法,如@XxlJob(init=xxx)即在这里调用
  5.  handler.init();
  6. } catch ( Throwable e) {
  7.     logger. error(e.getMessage(), e);
  8. }
  9. while(!toStop){
  10.         // running=false表示当前JobThread没有在处理作业
  11.   // isRunningOrHasQueue()中判断JobThread是否运行用到该值以及triggerQueue
  12.  running = false;
  13.   // 空闲次数累加+1
  14.  idleTimes++;
  15.        TriggerParam triggerParam = null;
  16.            ReturnT< String> executeResult = null;
  17.             try {
  18.    triggerParam = triggerQueue.poll( 3L, TimeUnit.SECONDS);
  19.     if (triggerParam!= null) {
  20.                     // running=true表示当前JobThread正在处理作业
  21.     running = true;
  22.     // 重置空闲统计次数
  23.     idleTimes = 0;
  24.     triggerLogIdSet.remove(triggerParam.getLogId());
  25.     // log filename, like "logPath/yyyy-MM-dd/9999.log"
  26.     // 初始化日志文件
  27.     String logFileName = XxlJobFileAppender.makeLogFileName( new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
  28.     XxlJobFileAppender.contextHolder.set(logFileName);
  29.     // 将分片信息注入到线程上下文中:InheritableThreadLocal
  30.     ShardingUtil.setShardingVo( new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
  31.     // execute
  32.     XxlJobLogger.log( "<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
  33.     // executorTimeout:作业执行超时控制
  34.     // 正常执行作业是handler.execute(triggerParam.getExecutorParams()),
  35.     // 如果带有超时控制,则封装FutureTask放入到线程中异步执行,超时则触发中断并返回超时异常
  36.     if (triggerParam.getExecutorTimeout() > 0) {
  37.       // limit timeout
  38.      Thread futureThread = null;
  39.       try {
  40.       final TriggerParam triggerParamTmp = triggerParam;
  41.       FutureTask<ReturnT< String>> futureTask = new FutureTask<ReturnT< String>>( new Callable<ReturnT< String>>() {
  42.        @Override
  43.         public ReturnT< String> call() throws Exception {
  44.         return handler.execute(triggerParamTmp.getExecutorParams());
  45.        }
  46.       });
  47.       futureThread = new Thread(futureTask);
  48.       futureThread.start();
  49.       executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
  50.      } catch (TimeoutException e) {
  51.       XxlJobLogger.log( "<br>----------- xxl-job job execute timeout");
  52.       XxlJobLogger.log(e);
  53.       executeResult = new ReturnT< String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
  54.      } finally {
  55.       futureThread.interrupt();
  56.      }
  57.     } else {
  58.       // 调用对应的IJobHandler处理作业
  59.      executeResult = handler.execute(triggerParam.getExecutorParams());
  60.     }
  61.     if (executeResult == null) {
  62.      executeResult = IJobHandler.FAIL;
  63.     } else {
  64.      executeResult.setMsg(
  65.        (executeResult!= null&&executeResult.getMsg()!= null&&executeResult.getMsg().length()> 50000)
  66.          ?executeResult.getMsg().substring( 0, 50000).concat( "...")
  67.          :executeResult.getMsg());
  68.      executeResult.setContent( null); // limit obj size
  69.     }
  70.     XxlJobLogger.log( "<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
  71.    } else {
  72.     // 连续超时30次(每次3秒),即90秒内JobThread一直空闲,则销毁JobThread
  73.     if (idleTimes > 30) {
  74.       if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
  75.       XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
  76.      }
  77.     }
  78.    }
  79.   } catch ( Throwable e) {
  80.     if (toStop) {
  81.     XxlJobLogger.log( "<br>----------- JobThread toStop, stopReason:" + stopReason);
  82.    }
  83.    StringWriter stringWriter = new StringWriter();
  84.    e.printStackTrace( new PrintWriter(stringWriter));
  85.     String errorMsg = stringWriter.toString();
  86.    executeResult = new ReturnT< String>(ReturnT.FAIL_CODE, errorMsg);
  87.     // 作业执行异常,则将异常信息写入到日志中
  88.    XxlJobLogger.log( "<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
  89.   } finally {
  90.                 if(triggerParam != null) {
  91.                     if (!toStop) {
  92.       // JobThread未停止场景下,异步回调机制将执行结果推送到admin
  93.                        TriggerCallbackThread.pushCallBack( new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
  94.                    } else {
  95.       // JobThread停止场景下,异步回调机制将kill异常推送到admin
  96.                        ReturnT< String> stopResult = new ReturnT< String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
  97.                        TriggerCallbackThread.pushCallBack( new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
  98.                    }
  99.                }
  100.            }
  101.        }
  102.   // JobThread被kill,检查下triggerQueue是否还有等待触发作业,如果有则向admin推送异常信息
  103.   while(triggerQueue != null && triggerQueue.size()> 0){
  104.   TriggerParam triggerParam = triggerQueue.poll();
  105.   if (triggerParam!= null) {
  106.     // is killed
  107.    ReturnT< String> stopResult = new ReturnT< String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
  108.    TriggerCallbackThread.pushCallBack( new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
  109.   }
  110.  }
  111.   // destroy
  112.   try {
  113.   // 销毁IJobHandler,调用IJobHandler.destroy方法,如@XxlJob(destroy=xxx)即在这里调用
  114.   handler.destroy();
  115.  } catch ( Throwable e) {
  116.   logger. error(e.getMessage(), e);
  117.  }
  118.  logger.info( ">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
  119. }

上面代码很多,但是逻辑不太复杂,看注释很容易理解到,接下来再来看下执行流程中最后一个核心组件IJobHandler,调用作业执行逻辑被封装到该组件中,xxl-job内置提供了三种实现方式,分别对应调用BeanJava脚本类型作业,其实现不太复杂,这里就不再继续深入分析。

核心抽象组件

ExecutorRouter:路由组件,选取执行器地址;

ExecutorBizClient:路由组件选取任务执行器地址后,将其包装成ExecutorBizClientExecutorBizClient可以看成执行器在引擎端代理,屏蔽远程RPC网络通信底层细节;

EmbedHttpServerHandler:执行器通过netty实现http server容器,EmbedHttpServerHandler扩展组件用于处理接收指令;

ExecutorBizImplExecutorBizClient作为执行器在引擎端代理,主要将指令通过RPC转发给执行器,起到透传作用,ExecutorBizImpl则是执行器上真正实现逻辑封装,所以,ExecutorBizClientExecutorBizImpl都实现同一接口ExecutorBiz

JobThread:每个任务在执行器上执行都会对应一个JobThread,任务和任务间是互相独立的,JobThread控制任务在执行器上并发模型。

IJobHandlerIJobHandler则是封装怎么调用任务逻辑,xxl-job内置三种实现类分别用来调用不同类型任务。

总结

上面对xxl-job作业执行的核心关键代码进行了整体分析梳理,整体还是比较简单,可能比较枯燥,下面简要整理了作业执行的大概流程(见下图),可对xxl-job调度机制有个大致理解:

大致描述:

  • xxl-job整体架构采用中心化设计,分为调度中心Admin和执行器两部分;

  • 调度中心Admin模块提供trigger触发接口进行作业调度,然后根据作业历史统计下发耗时将作业分配到两个线程池中的一个进行执行;

  • 执行前将作业启动日志记录到xxl_job_log表中,然后利用路由组件选取执行器地址,并利用执行器代理ExecutorBiz将执行下发到路由的执行器上,执行器代理ExecutorBiz实现很简单:就是发送http请求;

  • 执行器在启动时会利用netty初始化一个内嵌http server容器,当接收到调度中心发送过来的指令后,将其转交给EmbedHttpServerHandler处理器进行处理;

  • EmbedHttpServerHandler处理器在处理作业运行指令时,会根据jobId从缓存中查找对应的JobThread,然后将作业执行指令投递到JobThread实例中triggerQueue队列中排队;

  • JobThread线程不停循环从triggerQueue队列中提取等待执行的作业信息,然后将其交由IJobHandler真正处理作业调用,JobThreadIJobHandler处理结果解析后投递给TriggerCallbackThread线程中callBackQueue队列中排队;

  • TriggerCallbackThread内部也是线程不停循环从callBackQueue提取回调任务,然后转交给doCallback方法,这个方法内部通过Admin代理类AdminBizClient叫结果回调发送给调用中心的回调接口,即完成作业完成通知。

上面就是xxl-job作业执行的整体大致流程,将其抽象出来的几个核心组件串联起来看清其脉络,则整个逻辑就比较清晰了。这里理解关键点是JobThread组件,每个作业在每个执行器中会对应一个JobThread实例,当作业下发到执行器上时,找到对应的JobThread进行处理。JobThread采用懒加载和缓存模式设计,只有作业下发执行器未找到对应的JobThread才会创建并返回起来,待下次同一个作业过来执行时直接使用该JobThread即可。

什么场景下执行器找不到JobThread

  • 作业第一次下发到该执行器;

  • JobThread内部线程循环不停从triggerQueue提取作业进行处理,且每个作业在执行器上对应一个JobThread,如果某个作业在执行器上执行一次后面不再执行、或者执行频率很低,可能会导致大量线程浪费,所以JobThread设计上有空闲超时自动销毁机制。当30 * 3 = 90秒没有执行作业,则判断JobThread空闲超时,进入销毁流程,后面又接收到该作业下发来的指令,则会重新创建JobThread

长按二维码识别关注


转载:https://blog.csdn.net/god_86/article/details/114650439
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场