小言_互联网的博客

Quartz 2.4.0 源码解析

283人阅读  评论(0)

 

执行逻辑图

代码流程图

源代码[注释版本]

核心代码

org.quartz.core.QuartzScheduler

org.quartz.core.QuartzSchedulerThread



 

执行逻辑图

 

代码流程图

 

 

源代码[注释版本]

GitLab 地址: https://github.com/BoYiZhang/quartz.git

代码入口:    org.quartz.core.QuartzTest


  
  1. public class QuartzTest {
  2. public static void main(String[] args) throws Exception {
  3. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd hh:mm:ss");
  4. SchedulerFactory sf = new StdSchedulerFactory();
  5. Scheduler sched = sf.getScheduler();
  6. JobDetail job = JobBuilder.newJob(HelloJob.class)
  7. .withIdentity( "job1", "group1").build();
  8. org.quartz.Trigger trigger = TriggerBuilder.newTrigger()
  9. .withIdentity( "trigger1", "group1")
  10. .startNow()
  11. .withSchedule(
  12. CronScheduleBuilder.cronSchedule( "0/2 * * * * ?")
  13. .withMisfireHandlingInstructionFireAndProceed()
  14. // .withMisfireHandlingInstructionIgnoreMisfires() # MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY 忽略所有的超时状态,按照触发器的策略执行。
  15. // .withMisfireHandlingInstructionFireAndProceed() # [默认] MISFIRE_INSTRUCTION_FIRE_ONCE_NOW 立刻执行一次,然后就按照正常的计划执行。
  16. // .withMisfireHandlingInstructionDoNothing() # MISFIRE_INSTRUCTION_DO_NOTHING 目前不执行,然后就按照正常的计划执行。
  17. )
  18. .build();
  19. HolidayCalendar holidayCalendar = new HolidayCalendar();
  20. GregorianCalendar calendar = new GregorianCalendar( 2020, 2, 23); // 2017年11月1日
  21. holidayCalendar.addExcludedDate(calendar.getTime());
  22. calendar = new GregorianCalendar( 2020, 2, 21); // 2018年11月2日
  23. holidayCalendar.addExcludedDate(calendar.getTime());
  24. sched.addCalendar( "holidays", holidayCalendar, false, false); // 节假日加入schedule调度器
  25. sched.scheduleJob(job, trigger);
  26. sched.start();
  27. }
  28. }

核心代码

org.quartz.core.QuartzScheduler


  
  1. /*
  2. * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy
  6. * of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. * License for the specific language governing permissions and limitations
  14. * under the License.
  15. *
  16. */
  17. package org.quartz.core;
  18. import java.io.InputStream;
  19. import java.lang.management.ManagementFactory;
  20. import java.rmi.RemoteException;
  21. import java.rmi.registry.LocateRegistry;
  22. import java.rmi.registry.Registry;
  23. import java.rmi.server.UnicastRemoteObject;
  24. import java.text.SimpleDateFormat;
  25. import java.util.ArrayList;
  26. import java.util.Collection;
  27. import java.util.Date;
  28. import java.util.HashMap;
  29. import java.util.LinkedList;
  30. import java.util.List;
  31. import java.util.Map;
  32. import java.util.Properties;
  33. import java.util.Random;
  34. import java.util.Set;
  35. import java.util.Map.Entry;
  36. import java.util.concurrent.atomic.AtomicInteger;
  37. import javax.management.MBeanServer;
  38. import javax.management.ObjectName;
  39. import org.quartz.Calendar;
  40. import org.quartz.InterruptableJob;
  41. import org.quartz.Job;
  42. import org.quartz.JobDataMap;
  43. import org.quartz.JobDetail;
  44. import org.quartz.JobExecutionContext;
  45. import org.quartz.JobExecutionException;
  46. import org.quartz.JobKey;
  47. import org.quartz.JobListener;
  48. import org.quartz.ListenerManager;
  49. import org.quartz.Matcher;
  50. import org.quartz.ObjectAlreadyExistsException;
  51. import org.quartz.Scheduler;
  52. import org.quartz.SchedulerContext;
  53. import org.quartz.SchedulerException;
  54. import org.quartz.SchedulerListener;
  55. import org.quartz.SchedulerMetaData;
  56. import org.quartz.Trigger;
  57. import static org.quartz.TriggerBuilder.*;
  58. import org.quartz.TriggerKey;
  59. import org.quartz.TriggerListener;
  60. import org.quartz.UnableToInterruptJobException;
  61. import org.quartz.Trigger.CompletedExecutionInstruction;
  62. import org.quartz.Trigger.TriggerState;
  63. import org.quartz.core.jmx.QuartzSchedulerMBean;
  64. import org.quartz.impl.SchedulerRepository;
  65. import org.quartz.impl.matchers.GroupMatcher;
  66. import org.quartz.listeners.SchedulerListenerSupport;
  67. import org.quartz.simpl.PropertySettingJobFactory;
  68. import org.quartz.spi.JobFactory;
  69. import org.quartz.spi.OperableTrigger;
  70. import org.quartz.spi.SchedulerPlugin;
  71. import org.quartz.spi.SchedulerSignaler;
  72. import org.quartz.spi.ThreadExecutor;
  73. import org.slf4j.Logger;
  74. import org.slf4j.LoggerFactory;
  75. /**
  76. * <p>
  77. * This is the heart of Quartz, an indirect implementation of the <code>{@link org.quartz.Scheduler}</code>
  78. * interface, containing methods to schedule <code>{@link org.quartz.Job}</code>s,
  79. * register <code>{@link org.quartz.JobListener}</code> instances, etc.
  80. * </p>
  81. *
  82. * @see org.quartz.Scheduler
  83. * @see org.quartz.core.QuartzSchedulerThread
  84. * @see org.quartz.spi.JobStore
  85. * @see org.quartz.spi.ThreadPool
  86. *
  87. * @author James House
  88. */
  89. public class QuartzScheduler implements RemotableQuartzScheduler {
  90. /*
  91. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  92. *
  93. * Constants.
  94. *
  95. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  96. */
  97. private static String VERSION_MAJOR = "UNKNOWN";
  98. private static String VERSION_MINOR = "UNKNOWN";
  99. private static String VERSION_ITERATION = "UNKNOWN";
  100. static {
  101. Properties props = new Properties();
  102. InputStream is = null;
  103. try {
  104. is = QuartzScheduler.class.getResourceAsStream( "quartz-build.properties");
  105. if(is != null) {
  106. props.load(is);
  107. String version = props.getProperty( "version");
  108. if (version != null) {
  109. String[] versionComponents = version.split( "\\.");
  110. VERSION_MAJOR = versionComponents[ 0];
  111. VERSION_MINOR = versionComponents[ 1];
  112. if(versionComponents.length > 2)
  113. VERSION_ITERATION = versionComponents[ 2];
  114. else
  115. VERSION_ITERATION = "0";
  116. } else {
  117. (LoggerFactory.getLogger(QuartzScheduler.class)).error(
  118. "Can't parse Quartz version from quartz-build.properties");
  119. }
  120. }
  121. } catch (Exception e) {
  122. (LoggerFactory.getLogger(QuartzScheduler.class)).error(
  123. "Error loading version info from quartz-build.properties.", e);
  124. } finally {
  125. if(is != null) {
  126. try { is.close(); } catch(Exception ignore) {}
  127. }
  128. }
  129. }
  130. /*
  131. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  132. *
  133. * Data members.
  134. *
  135. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  136. */
  137. private QuartzSchedulerResources resources;
  138. private QuartzSchedulerThread schedThread;
  139. private ThreadGroup threadGroup;
  140. private SchedulerContext context = new SchedulerContext();
  141. private ListenerManager listenerManager = new ListenerManagerImpl();
  142. private HashMap<String, JobListener> internalJobListeners = new HashMap<String, JobListener>( 10);
  143. private HashMap<String, TriggerListener> internalTriggerListeners = new HashMap<String, TriggerListener>( 10);
  144. private ArrayList<SchedulerListener> internalSchedulerListeners = new ArrayList<SchedulerListener>( 10);
  145. private JobFactory jobFactory = new PropertySettingJobFactory();
  146. ExecutingJobsManager jobMgr = null;
  147. ErrorLogger errLogger = null;
  148. private SchedulerSignaler signaler;
  149. private Random random = new Random();
  150. private ArrayList<Object> holdToPreventGC = new ArrayList<Object>( 5);
  151. private boolean signalOnSchedulingChange = true;
  152. private volatile boolean closed = false;
  153. private volatile boolean shuttingDown = false;
  154. private boolean boundRemotely = false;
  155. private QuartzSchedulerMBean jmxBean = null;
  156. private Date initialStart = null;
  157. private final Logger log = LoggerFactory.getLogger(getClass());
  158. // private static final Map<String, ManagementServer> MGMT_SVR_BY_BIND = new
  159. // HashMap<String, ManagementServer>();
  160. // private String registeredManagementServerBind;
  161. /*
  162. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  163. *
  164. * Constructors.
  165. *
  166. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  167. */
  168. /**
  169. * <p>
  170. * Create a <code>QuartzScheduler</code> with the given configuration
  171. * properties.
  172. * </p>
  173. *
  174. * @see QuartzSchedulerResources
  175. */
  176. public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
  177. throws SchedulerException {
  178. this.resources = resources;
  179. if (resources.getJobStore() instanceof JobListener) {
  180. addInternalJobListener((JobListener)resources.getJobStore());
  181. }
  182. this.schedThread = new QuartzSchedulerThread( this, resources);
  183. ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
  184. schedThreadExecutor.execute( this.schedThread);
  185. if (idleWaitTime > 0) {
  186. this.schedThread.setIdleWaitTime(idleWaitTime);
  187. }
  188. jobMgr = new ExecutingJobsManager();
  189. addInternalJobListener(jobMgr);
  190. errLogger = new ErrorLogger();
  191. addInternalSchedulerListener(errLogger);
  192. signaler = new SchedulerSignalerImpl( this, this.schedThread);
  193. getLog().info( "Quartz Scheduler v." + getVersion() + " created.");
  194. }
  195. public void initialize() throws SchedulerException {
  196. try {
  197. bind();
  198. } catch (Exception re) {
  199. throw new SchedulerException(
  200. "Unable to bind scheduler to RMI Registry.", re);
  201. }
  202. if (resources.getJMXExport()) {
  203. try {
  204. registerJMX();
  205. } catch (Exception e) {
  206. throw new SchedulerException(
  207. "Unable to register scheduler with MBeanServer.", e);
  208. }
  209. }
  210. // ManagementRESTServiceConfiguration managementRESTServiceConfiguration
  211. // = resources.getManagementRESTServiceConfiguration();
  212. //
  213. // if (managementRESTServiceConfiguration != null &&
  214. // managementRESTServiceConfiguration.isEnabled()) {
  215. // try {
  216. // /**
  217. // * ManagementServer will only be instantiated and started if one
  218. // * isn't already running on the configured port for this class
  219. // * loader space.
  220. // */
  221. // synchronized (QuartzScheduler.class) {
  222. // if
  223. // (!MGMT_SVR_BY_BIND.containsKey(managementRESTServiceConfiguration.getBind()))
  224. // {
  225. // Class<?> managementServerImplClass =
  226. // Class.forName("org.quartz.management.ManagementServerImpl");
  227. // Class<?> managementRESTServiceConfigurationClass[] = new Class[] {
  228. // managementRESTServiceConfiguration.getClass() };
  229. // Constructor<?> managementRESTServiceConfigurationConstructor =
  230. // managementServerImplClass
  231. // .getConstructor(managementRESTServiceConfigurationClass);
  232. // Object arglist[] = new Object[] { managementRESTServiceConfiguration
  233. // };
  234. // ManagementServer embeddedRESTServer = ((ManagementServer)
  235. // managementRESTServiceConfigurationConstructor.newInstance(arglist));
  236. // embeddedRESTServer.start();
  237. // MGMT_SVR_BY_BIND.put(managementRESTServiceConfiguration.getBind(),
  238. // embeddedRESTServer);
  239. // }
  240. // registeredManagementServerBind =
  241. // managementRESTServiceConfiguration.getBind();
  242. // ManagementServer embeddedRESTServer =
  243. // MGMT_SVR_BY_BIND.get(registeredManagementServerBind);
  244. // embeddedRESTServer.register(this);
  245. // }
  246. // } catch (Exception e) {
  247. // throw new
  248. // SchedulerException("Unable to start the scheduler management REST service",
  249. // e);
  250. // }
  251. // }
  252. getLog().info( "Scheduler meta-data: " +
  253. ( new SchedulerMetaData(getSchedulerName(),
  254. getSchedulerInstanceId(), getClass(), boundRemotely, runningSince() != null,
  255. isInStandbyMode(), isShutdown(), runningSince(),
  256. numJobsExecuted(), getJobStoreClass(),
  257. supportsPersistence(), isClustered(), getThreadPoolClass(),
  258. getThreadPoolSize(), getVersion())).toString());
  259. }
  260. /*
  261. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  262. *
  263. * Interface.
  264. *
  265. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  266. */
  267. public String getVersion() {
  268. return getVersionMajor() + "." + getVersionMinor() + "."
  269. + getVersionIteration();
  270. }
  271. public static String getVersionMajor() {
  272. return VERSION_MAJOR;
  273. }
  274. public static String getVersionMinor() {
  275. return VERSION_MINOR;
  276. }
  277. public static String getVersionIteration() {
  278. return VERSION_ITERATION;
  279. }
  280. public SchedulerSignaler getSchedulerSignaler() {
  281. return signaler;
  282. }
  283. public Logger getLog() {
  284. return log;
  285. }
  286. /**
  287. * Register the scheduler in the local MBeanServer.
  288. */
  289. private void registerJMX() throws Exception {
  290. String jmxObjectName = resources.getJMXObjectName();
  291. MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
  292. jmxBean = new QuartzSchedulerMBeanImpl( this);
  293. mbs.registerMBean(jmxBean, new ObjectName(jmxObjectName));
  294. }
  295. /**
  296. * Unregister the scheduler from the local MBeanServer.
  297. */
  298. private void unregisterJMX() throws Exception {
  299. String jmxObjectName = resources.getJMXObjectName();
  300. MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
  301. mbs.unregisterMBean( new ObjectName(jmxObjectName));
  302. jmxBean.setSampledStatisticsEnabled( false);
  303. getLog().info( "Scheduler unregistered from name '" + jmxObjectName + "' in the local MBeanServer.");
  304. }
  305. /**
  306. * <p>
  307. * Bind the scheduler to an RMI registry.
  308. * </p>
  309. */
  310. private void bind() throws RemoteException {
  311. String host = resources.getRMIRegistryHost();
  312. // don't export if we're not configured to do so...
  313. if (host == null || host.length() == 0) {
  314. return;
  315. }
  316. RemotableQuartzScheduler exportable = null;
  317. if(resources.getRMIServerPort() > 0) {
  318. exportable = (RemotableQuartzScheduler) UnicastRemoteObject
  319. .exportObject( this, resources.getRMIServerPort());
  320. } else {
  321. exportable = (RemotableQuartzScheduler) UnicastRemoteObject
  322. .exportObject( this);
  323. }
  324. Registry registry = null;
  325. if (resources.getRMICreateRegistryStrategy().equals(
  326. QuartzSchedulerResources.CREATE_REGISTRY_AS_NEEDED)) {
  327. try {
  328. // First try to get an existing one, instead of creating it,
  329. // since if
  330. // we're in a web-app being 'hot' re-depoloyed, then the JVM
  331. // still
  332. // has the registry that we created above the first time...
  333. registry = LocateRegistry.getRegistry(resources
  334. .getRMIRegistryPort());
  335. registry.list();
  336. } catch (Exception e) {
  337. registry = LocateRegistry.createRegistry(resources
  338. .getRMIRegistryPort());
  339. }
  340. } else if (resources.getRMICreateRegistryStrategy().equals(
  341. QuartzSchedulerResources.CREATE_REGISTRY_ALWAYS)) {
  342. try {
  343. registry = LocateRegistry.createRegistry(resources
  344. .getRMIRegistryPort());
  345. } catch (Exception e) {
  346. // Fall back to an existing one, instead of creating it, since
  347. // if
  348. // we're in a web-app being 'hot' re-depoloyed, then the JVM
  349. // still
  350. // has the registry that we created above the first time...
  351. registry = LocateRegistry.getRegistry(resources
  352. .getRMIRegistryPort());
  353. }
  354. } else {
  355. registry = LocateRegistry.getRegistry(resources
  356. .getRMIRegistryHost(), resources.getRMIRegistryPort());
  357. }
  358. String bindName = resources.getRMIBindName();
  359. registry.rebind(bindName, exportable);
  360. boundRemotely = true;
  361. getLog().info( "Scheduler bound to RMI registry under name '" + bindName + "'");
  362. }
  363. /**
  364. * <p>
  365. * Un-bind the scheduler from an RMI registry.
  366. * </p>
  367. */
  368. private void unBind() throws RemoteException {
  369. String host = resources.getRMIRegistryHost();
  370. // don't un-export if we're not configured to do so...
  371. if (host == null || host.length() == 0) {
  372. return;
  373. }
  374. Registry registry = LocateRegistry.getRegistry(resources
  375. .getRMIRegistryHost(), resources.getRMIRegistryPort());
  376. String bindName = resources.getRMIBindName();
  377. try {
  378. registry.unbind(bindName);
  379. UnicastRemoteObject.unexportObject( this, true);
  380. } catch (java.rmi.NotBoundException nbe) {
  381. }
  382. getLog().info( "Scheduler un-bound from name '" + bindName + "' in RMI registry");
  383. }
  384. /**
  385. * <p>
  386. * Returns the name of the <code>QuartzScheduler</code>.
  387. * </p>
  388. */
  389. public String getSchedulerName() {
  390. return resources.getName();
  391. }
  392. /**
  393. * <p>
  394. * Returns the instance Id of the <code>QuartzScheduler</code>.
  395. * </p>
  396. */
  397. public String getSchedulerInstanceId() {
  398. return resources.getInstanceId();
  399. }
  400. /**
  401. * <p>
  402. * Returns the name of the thread group for Quartz's main threads.
  403. * </p>
  404. */
  405. public ThreadGroup getSchedulerThreadGroup() {
  406. if (threadGroup == null) {
  407. threadGroup = new ThreadGroup( "QuartzScheduler:"
  408. + getSchedulerName());
  409. if (resources.getMakeSchedulerThreadDaemon()) {
  410. threadGroup.setDaemon( true);
  411. }
  412. }
  413. return threadGroup;
  414. }
  415. public void addNoGCObject(Object obj) {
  416. holdToPreventGC.add(obj);
  417. }
  418. public boolean removeNoGCObject(Object obj) {
  419. return holdToPreventGC.remove(obj);
  420. }
  421. /**
  422. * <p>
  423. * Returns the <code>SchedulerContext</code> of the <code>Scheduler</code>.
  424. * </p>
  425. */
  426. public SchedulerContext getSchedulerContext() throws SchedulerException {
  427. return context;
  428. }
  429. public boolean isSignalOnSchedulingChange() {
  430. return signalOnSchedulingChange;
  431. }
  432. public void setSignalOnSchedulingChange(boolean signalOnSchedulingChange) {
  433. this.signalOnSchedulingChange = signalOnSchedulingChange;
  434. }
  435. ///////////////////////////////////////////////////////////////////////////
  436. ///
  437. /// Scheduler State Management Methods
  438. ///
  439. ///////////////////////////////////////////////////////////////////////////
  440. /**
  441. * <p>
  442. * Starts the <code>QuartzScheduler</code>'s threads that fire <code>{@link org.quartz.Trigger}s</code>.
  443. * </p>
  444. *
  445. * <p>
  446. * All <code>{@link org.quartz.Trigger}s</code> that have misfired will
  447. * be passed to the appropriate TriggerListener(s).
  448. * </p>
  449. * todo
  450. * 启动的初始化
  451. * 判断是否集群,对应不同的操作
  452. * 若是非集群,首先有恢复机制,恢复任何失败或misfire的作业,并根据需要清理数据存储。
  453. * 初始化线程管理,唤醒所有等待的线程!
  454. * 线程中启动线程是调用start()方法,但是真正执行线程任务的操作在run()中!
  455. *
  456. */
  457. public void start() throws SchedulerException {
  458. if (shuttingDown|| closed) {
  459. throw new SchedulerException(
  460. "The Scheduler cannot be restarted after shutdown() has been called.");
  461. }
  462. // QTZ-212 : calling new schedulerStarting() method on the listeners
  463. // right after entering start()
  464. notifySchedulerListenersStarting();
  465. //todo 初始化标识为null,进行初始化操作
  466. if (initialStart == null) {
  467. initialStart = new Date();
  468. //todo 1.主要分析的地方
  469. this.resources.getJobStore().schedulerStarted();
  470. startPlugins();
  471. } else {
  472. //todo 2.如果已经初始化过,则恢复jobStore
  473. resources.getJobStore().schedulerResumed();
  474. }
  475. //todo 3.唤醒所有等待的线程
  476. schedThread.togglePause( false);
  477. getLog().info(
  478. "Scheduler " + resources.getUniqueIdentifier() + " started.");
  479. notifySchedulerListenersStarted();
  480. }
  481. public void startDelayed(final int seconds) throws SchedulerException
  482. {
  483. if (shuttingDown || closed) {
  484. throw new SchedulerException(
  485. "The Scheduler cannot be restarted after shutdown() has been called.");
  486. }
  487. Thread t = new Thread( new Runnable() {
  488. public void run() {
  489. try { Thread.sleep(seconds * 1000L); }
  490. catch(InterruptedException ignore) {}
  491. try { start(); }
  492. catch(SchedulerException se) {
  493. getLog().error( "Unable to start scheduler after startup delay.", se);
  494. }
  495. }
  496. });
  497. t.start();
  498. }
  499. /**
  500. * <p>
  501. * Temporarily halts the <code>QuartzScheduler</code>'s firing of <code>{@link org.quartz.Trigger}s</code>.
  502. * </p>
  503. *
  504. * <p>
  505. * The scheduler is not destroyed, and can be re-started at any time.
  506. * </p>
  507. */
  508. public void standby() {
  509. resources.getJobStore().schedulerPaused();
  510. schedThread.togglePause( true);
  511. getLog().info(
  512. "Scheduler " + resources.getUniqueIdentifier() + " paused.");
  513. notifySchedulerListenersInStandbyMode();
  514. }
  515. /**
  516. * <p>
  517. * Reports whether the <code>Scheduler</code> is paused.
  518. * </p>
  519. */
  520. public boolean isInStandbyMode() {
  521. return schedThread.isPaused();
  522. }
  523. public Date runningSince() {
  524. if(initialStart == null)
  525. return null;
  526. return new Date(initialStart.getTime());
  527. }
  528. public int numJobsExecuted() {
  529. return jobMgr.getNumJobsFired();
  530. }
  531. public Class<?> getJobStoreClass() {
  532. return resources.getJobStore().getClass();
  533. }
  534. public boolean supportsPersistence() {
  535. return resources.getJobStore().supportsPersistence();
  536. }
  537. public boolean isClustered() {
  538. return resources.getJobStore().isClustered();
  539. }
  540. public Class<?> getThreadPoolClass() {
  541. return resources.getThreadPool().getClass();
  542. }
  543. public int getThreadPoolSize() {
  544. return resources.getThreadPool().getPoolSize();
  545. }
  546. /**
  547. * <p>
  548. * Halts the <code>QuartzScheduler</code>'s firing of <code>{@link org.quartz.Trigger}s</code>,
  549. * and cleans up all resources associated with the QuartzScheduler.
  550. * Equivalent to <code>shutdown(false)</code>.
  551. * </p>
  552. *
  553. * <p>
  554. * The scheduler cannot be re-started.
  555. * </p>
  556. */
  557. public void shutdown() {
  558. shutdown( false);
  559. }
  560. /**
  561. * <p>
  562. * Halts the <code>QuartzScheduler</code>'s firing of <code>{@link org.quartz.Trigger}s</code>,
  563. * and cleans up all resources associated with the QuartzScheduler.
  564. * </p>
  565. *
  566. * <p>
  567. * The scheduler cannot be re-started.
  568. * </p>
  569. *
  570. * @param waitForJobsToComplete
  571. * if <code>true</code> the scheduler will not allow this method
  572. * to return until all currently executing jobs have completed.
  573. */
  574. public void shutdown(boolean waitForJobsToComplete) {
  575. if(shuttingDown || closed) {
  576. return;
  577. }
  578. shuttingDown = true;
  579. getLog().info(
  580. "Scheduler " + resources.getUniqueIdentifier()
  581. + " shutting down.");
  582. // boolean removeMgmtSvr = false;
  583. // if (registeredManagementServerBind != null) {
  584. // ManagementServer standaloneRestServer =
  585. // MGMT_SVR_BY_BIND.get(registeredManagementServerBind);
  586. //
  587. // try {
  588. // standaloneRestServer.unregister(this);
  589. //
  590. // if (!standaloneRestServer.hasRegistered()) {
  591. // removeMgmtSvr = true;
  592. // standaloneRestServer.stop();
  593. // }
  594. // } catch (Exception e) {
  595. // getLog().warn("Failed to shutdown the ManagementRESTService", e);
  596. // } finally {
  597. // if (removeMgmtSvr) {
  598. // MGMT_SVR_BY_BIND.remove(registeredManagementServerBind);
  599. // }
  600. //
  601. // registeredManagementServerBind = null;
  602. // }
  603. // }
  604. standby();
  605. schedThread.halt(waitForJobsToComplete);
  606. notifySchedulerListenersShuttingdown();
  607. if( (resources.isInterruptJobsOnShutdown() && !waitForJobsToComplete) ||
  608. (resources.isInterruptJobsOnShutdownWithWait() && waitForJobsToComplete)) {
  609. List<JobExecutionContext> jobs = getCurrentlyExecutingJobs();
  610. for(JobExecutionContext job: jobs) {
  611. if(job.getJobInstance() instanceof InterruptableJob)
  612. try {
  613. ((InterruptableJob)job.getJobInstance()).interrupt();
  614. } catch (Throwable e) {
  615. // do nothing, this was just a courtesy effort
  616. getLog().warn( "Encountered error when interrupting job {} during shutdown: {}", job.getJobDetail().getKey(), e);
  617. }
  618. }
  619. }
  620. resources.getThreadPool().shutdown(waitForJobsToComplete);
  621. closed = true;
  622. if (resources.getJMXExport()) {
  623. try {
  624. unregisterJMX();
  625. } catch (Exception e) {
  626. }
  627. }
  628. if(boundRemotely) {
  629. try {
  630. unBind();
  631. } catch (RemoteException re) {
  632. }
  633. }
  634. shutdownPlugins();
  635. resources.getJobStore().shutdown();
  636. notifySchedulerListenersShutdown();
  637. SchedulerRepository.getInstance().remove(resources.getName());
  638. holdToPreventGC.clear();
  639. getLog().info(
  640. "Scheduler " + resources.getUniqueIdentifier()
  641. + " shutdown complete.");
  642. }
  643. /**
  644. * <p>
  645. * Reports whether the <code>Scheduler</code> has been shutdown.
  646. * </p>
  647. */
  648. public boolean isShutdown() {
  649. return closed;
  650. }
  651. public boolean isShuttingDown() {
  652. return shuttingDown;
  653. }
  654. public boolean isStarted() {
  655. return !shuttingDown && !closed && !isInStandbyMode() && initialStart != null;
  656. }
  657. public void validateState() throws SchedulerException {
  658. if (isShutdown()) {
  659. throw new SchedulerException( "The Scheduler has been shutdown.");
  660. }
  661. // other conditions to check (?)
  662. }
  663. /**
  664. * <p>
  665. * Return a list of <code>JobExecutionContext</code> objects that
  666. * represent all currently executing Jobs in this Scheduler instance.
  667. * </p>
  668. *
  669. * <p>
  670. * This method is not cluster aware. That is, it will only return Jobs
  671. * currently executing in this Scheduler instance, not across the entire
  672. * cluster.
  673. * </p>
  674. *
  675. * <p>
  676. * Note that the list returned is an 'instantaneous' snap-shot, and that as
  677. * soon as it's returned, the true list of executing jobs may be different.
  678. * </p>
  679. */
  680. public List<JobExecutionContext> getCurrentlyExecutingJobs() {
  681. return jobMgr.getExecutingJobs();
  682. }
  683. ///////////////////////////////////////////////////////////////////////////
  684. ///
  685. /// Scheduling-related Methods
  686. ///
  687. ///////////////////////////////////////////////////////////////////////////
  688. /**
  689. * <p>
  690. * Add the <code>{@link org.quartz.Job}</code> identified by the given
  691. * <code>{@link org.quartz.JobDetail}</code> to the Scheduler, and
  692. * associate the given <code>{@link org.quartz.Trigger}</code> with it.
  693. * </p>
  694. *
  695. * <p>
  696. * If the given Trigger does not reference any <code>Job</code>, then it
  697. * will be set to reference the Job passed with it into this method.
  698. * </p>
  699. *
  700. * @throws SchedulerException
  701. * if the Job or Trigger cannot be added to the Scheduler, or
  702. * there is an internal Scheduler error.
  703. *
  704. *
  705. *
  706. * 将给定org.quartz.JobDetail标识的org.quartz.Job添加到Scheduler,
  707. * 并将给定的org.quartz.Trigger与其关联。
  708. * 如果给定的触发器不引用任何作业,则它将被设置为引用与其一起传递的作业到此方法中。
  709. *
  710. * 实现在 QuartzScheduler.scheduleJob(JobDetail jobDetail,
  711. * Trigger trigger)
  712. *
  713. */
  714. public Date scheduleJob(JobDetail jobDetail,
  715. Trigger trigger) throws SchedulerException {
  716. //todo 验证调度器是否关闭,关闭抛出异常
  717. validateState();
  718. //todo 检查 jobDetail和trigger
  719. if (jobDetail == null) {
  720. throw new SchedulerException( "JobDetail cannot be null");
  721. }
  722. if (trigger == null) {
  723. throw new SchedulerException( "Trigger cannot be null");
  724. }
  725. if (jobDetail.getKey() == null) {
  726. throw new SchedulerException( "Job's key cannot be null");
  727. }
  728. if (jobDetail.getJobClass() == null) {
  729. throw new SchedulerException( "Job's class cannot be null");
  730. }
  731. OperableTrigger trig = (OperableTrigger)trigger;
  732. //todo getJobKey 获取 getJobName(), getJobGroup()
  733. if (trigger.getJobKey() == null) {
  734. trig.setJobKey(jobDetail.getKey());
  735. } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
  736. throw new SchedulerException(
  737. "Trigger does not reference given job!");
  738. }
  739. //todo 验证trigger
  740. trig.validate();
  741. Calendar cal = null;
  742. if (trigger.getCalendarName() != null) {
  743. cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
  744. }
  745. //todo 在触发器首次添加到调度程序时由调度程序调用,以便让触发器基于任何关联的日历计算
  746. //todo 其第一次触发时间。调用此方法后,getNextFireTime()应返回有效的答案。
  747. Date ft = trig.computeFirstFireTime(cal);
  748. if (ft == null) {
  749. throw new SchedulerException(
  750. "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
  751. } else{
  752. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd hh:mm:ss");
  753. System.out.println( "computeFirstFireTime : "+ sdf.format(ft));
  754. }
  755. //todo 存储给定的org.quartz.JobDetail和org.quartz.Trigger。
  756. //todo 主要看这一行
  757. //todo org.quartz.impl.jdbcjobstore#JobStoreSupport
  758. resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
  759. notifySchedulerListenersJobAdded(jobDetail);
  760. notifySchedulerThread(trigger.getNextFireTime().getTime());
  761. notifySchedulerListenersSchduled(trigger);
  762. return ft;
  763. }
  764. /**
  765. * <p>
  766. * Schedule the given <code>{@link org.quartz.Trigger}</code> with the
  767. * <code>Job</code> identified by the <code>Trigger</code>'s settings.
  768. * </p>
  769. *
  770. * @throws SchedulerException
  771. * if the indicated Job does not exist, or the Trigger cannot be
  772. * added to the Scheduler, or there is an internal Scheduler
  773. * error.
  774. */
  775. public Date scheduleJob(Trigger trigger)
  776. throws SchedulerException {
  777. validateState();
  778. if (trigger == null) {
  779. throw new SchedulerException( "Trigger cannot be null");
  780. }
  781. OperableTrigger trig = (OperableTrigger)trigger;
  782. trig.validate();
  783. Calendar cal = null;
  784. if (trigger.getCalendarName() != null) {
  785. cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
  786. if(cal == null) {
  787. throw new SchedulerException(
  788. "Calendar not found: " + trigger.getCalendarName());
  789. }
  790. }
  791. Date ft = trig.computeFirstFireTime(cal);
  792. if (ft == null) {
  793. throw new SchedulerException(
  794. "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
  795. }
  796. resources.getJobStore().storeTrigger(trig, false);
  797. notifySchedulerThread(trigger.getNextFireTime().getTime());
  798. notifySchedulerListenersSchduled(trigger);
  799. return ft;
  800. }
  801. /**
  802. * <p>
  803. * Add the given <code>Job</code> to the Scheduler - with no associated
  804. * <code>Trigger</code>. The <code>Job</code> will be 'dormant' until
  805. * it is scheduled with a <code>Trigger</code>, or <code>Scheduler.triggerJob()</code>
  806. * is called for it.
  807. * </p>
  808. *
  809. * <p>
  810. * The <code>Job</code> must by definition be 'durable', if it is not,
  811. * SchedulerException will be thrown.
  812. * </p>
  813. *
  814. * @throws SchedulerException
  815. * if there is an internal Scheduler error, or if the Job is not
  816. * durable, or a Job with the same name already exists, and
  817. * <code>replace</code> is <code>false</code>.
  818. */
  819. public void addJob(JobDetail jobDetail, boolean replace) throws SchedulerException {
  820. addJob(jobDetail, replace, false);
  821. }
  822. public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException {
  823. validateState();
  824. if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {
  825. throw new SchedulerException(
  826. "Jobs added with no trigger must be durable.");
  827. }
  828. resources.getJobStore().storeJob(jobDetail, replace);
  829. notifySchedulerThread( 0L);
  830. notifySchedulerListenersJobAdded(jobDetail);
  831. }
  832. /**
  833. * <p>
  834. * Delete the identified <code>Job</code> from the Scheduler - and any
  835. * associated <code>Trigger</code>s.
  836. * </p>
  837. *
  838. * @return true if the Job was found and deleted.
  839. * @throws SchedulerException
  840. * if there is an internal Scheduler error.
  841. */
  842. public boolean deleteJob(JobKey jobKey) throws SchedulerException {
  843. validateState();
  844. boolean result = false;
  845. List<? extends Trigger> triggers = getTriggersOfJob(jobKey);
  846. for (Trigger trigger : triggers) {
  847. if (!unscheduleJob(trigger.getKey())) {
  848. StringBuilder sb = new StringBuilder().append(
  849. "Unable to unschedule trigger [").append(
  850. trigger.getKey()).append( "] while deleting job [")
  851. .append(jobKey).append(
  852. "]");
  853. throw new SchedulerException(sb.toString());
  854. }
  855. result = true;
  856. }
  857. result = resources.getJobStore().removeJob(jobKey) || result;
  858. if (result) {
  859. notifySchedulerThread( 0L);
  860. notifySchedulerListenersJobDeleted(jobKey);
  861. }
  862. return result;
  863. }
  864. public boolean deleteJobs(List<JobKey> jobKeys) throws SchedulerException {
  865. validateState();
  866. boolean result = false;
  867. result = resources.getJobStore().removeJobs(jobKeys);
  868. notifySchedulerThread( 0L);
  869. for(JobKey key: jobKeys)
  870. notifySchedulerListenersJobDeleted(key);
  871. return result;
  872. }
  873. public void scheduleJobs(Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, boolean replace) throws SchedulerException {
  874. validateState();
  875. // make sure all triggers refer to their associated job
  876. for(Entry<JobDetail, Set<? extends Trigger>> e: triggersAndJobs.entrySet()) {
  877. JobDetail job = e.getKey();
  878. if(job == null) // there can be one of these (for adding a bulk set of triggers for pre-existing jobs)
  879. continue;
  880. Set<? extends Trigger> triggers = e.getValue();
  881. if(triggers == null) // this is possible because the job may be durable, and not yet be having triggers
  882. continue;
  883. for(Trigger trigger: triggers) {
  884. OperableTrigger opt = (OperableTrigger)trigger;
  885. opt.setJobKey(job.getKey());
  886. opt.validate();
  887. Calendar cal = null;
  888. if (trigger.getCalendarName() != null) {
  889. cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
  890. if(cal == null) {
  891. throw new SchedulerException(
  892. "Calendar '" + trigger.getCalendarName() + "' not found for trigger: " + trigger.getKey());
  893. }
  894. }
  895. Date ft = opt.computeFirstFireTime(cal);
  896. if (ft == null) {
  897. throw new SchedulerException(
  898. "Based on configured schedule, the given trigger will never fire.");
  899. }
  900. }
  901. }
  902. resources.getJobStore().storeJobsAndTriggers(triggersAndJobs, replace);
  903. notifySchedulerThread( 0L);
  904. for (JobDetail job : triggersAndJobs.keySet()) {
  905. notifySchedulerListenersJobAdded(job);
  906. Set<? extends Trigger> triggers = triggersAndJobs.get(job);
  907. for (Trigger trigger : triggers) {
  908. notifySchedulerListenersSchduled(trigger);
  909. }
  910. }
  911. }
  912. public void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob,
  913. boolean replace) throws SchedulerException {
  914. Map<JobDetail, Set<? extends Trigger>> triggersAndJobs = new HashMap<JobDetail, Set<? extends Trigger>>();
  915. triggersAndJobs.put(jobDetail, triggersForJob);
  916. scheduleJobs(triggersAndJobs, replace);
  917. }
  918. public boolean unscheduleJobs(List<TriggerKey> triggerKeys) throws SchedulerException {
  919. validateState();
  920. boolean result = false;
  921. result = resources.getJobStore().removeTriggers(triggerKeys);
  922. notifySchedulerThread( 0L);
  923. for(TriggerKey key: triggerKeys)
  924. notifySchedulerListenersUnscheduled(key);
  925. return result;
  926. }
  927. /**
  928. * <p>
  929. * Remove the indicated <code>{@link org.quartz.Trigger}</code> from the
  930. * scheduler.
  931. * </p>
  932. */
  933. public boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException {
  934. validateState();
  935. if (resources.getJobStore().removeTrigger(triggerKey)) {
  936. notifySchedulerThread( 0L);
  937. notifySchedulerListenersUnscheduled(triggerKey);
  938. } else {
  939. return false;
  940. }
  941. return true;
  942. }
  943. /**
  944. * <p>
  945. * Remove (delete) the <code>{@link org.quartz.Trigger}</code> with the
  946. * given name, and store the new given one - which must be associated
  947. * with the same job.
  948. * </p>
  949. * @param newTrigger
  950. * The new <code>Trigger</code> to be stored.
  951. *
  952. * @return <code>null</code> if a <code>Trigger</code> with the given
  953. * name & group was not found and removed from the store, otherwise
  954. * the first fire time of the newly scheduled trigger.
  955. */
  956. public Date rescheduleJob(TriggerKey triggerKey,
  957. Trigger newTrigger) throws SchedulerException {
  958. validateState();
  959. if (triggerKey == null) {
  960. throw new IllegalArgumentException( "triggerKey cannot be null");
  961. }
  962. if (newTrigger == null) {
  963. throw new IllegalArgumentException( "newTrigger cannot be null");
  964. }
  965. OperableTrigger trig = (OperableTrigger)newTrigger;
  966. Trigger oldTrigger = getTrigger(triggerKey);
  967. if (oldTrigger == null) {
  968. return null;
  969. } else {
  970. trig.setJobKey(oldTrigger.getJobKey());
  971. }
  972. trig.validate();
  973. Calendar cal = null;
  974. if (newTrigger.getCalendarName() != null) {
  975. cal = resources.getJobStore().retrieveCalendar(
  976. newTrigger.getCalendarName());
  977. }
  978. Date ft = trig.computeFirstFireTime(cal);
  979. if (ft == null) {
  980. throw new SchedulerException(
  981. "Based on configured schedule, the given trigger will never fire.");
  982. }
  983. if (resources.getJobStore().replaceTrigger(triggerKey, trig)) {
  984. notifySchedulerThread(newTrigger.getNextFireTime().getTime());
  985. notifySchedulerListenersUnscheduled(triggerKey);
  986. notifySchedulerListenersSchduled(newTrigger);
  987. } else {
  988. return null;
  989. }
  990. return ft;
  991. }
  992. private String newTriggerId() {
  993. long r = random.nextLong();
  994. if (r < 0) {
  995. r = -r;
  996. }
  997. return "MT_"
  998. + Long.toString(r, 30 + ( int) (System.currentTimeMillis() % 7));
  999. }
  1000. /**
  1001. * <p>
  1002. * Trigger the identified <code>{@link org.quartz.Job}</code> (execute it
  1003. * now) - with a non-volatile trigger.
  1004. * </p>
  1005. */
  1006. @SuppressWarnings( "deprecation")
  1007. public void triggerJob(JobKey jobKey, JobDataMap data) throws SchedulerException {
  1008. validateState();
  1009. OperableTrigger trig = (OperableTrigger) newTrigger().withIdentity(newTriggerId(), Scheduler.DEFAULT_GROUP).forJob(jobKey).build();
  1010. trig.computeFirstFireTime( null);
  1011. if(data != null) {
  1012. trig.setJobDataMap(data);
  1013. }
  1014. boolean collision = true;
  1015. while (collision) {
  1016. try {
  1017. resources.getJobStore().storeTrigger(trig, false);
  1018. collision = false;
  1019. } catch (ObjectAlreadyExistsException oaee) {
  1020. trig.setKey( new TriggerKey(newTriggerId(), Scheduler.DEFAULT_GROUP));
  1021. }
  1022. }
  1023. notifySchedulerThread(trig.getNextFireTime().getTime());
  1024. notifySchedulerListenersSchduled(trig);
  1025. }
  1026. /**
  1027. * <p>
  1028. * Store and schedule the identified <code>{@link org.quartz.spi.OperableTrigger}</code>
  1029. * </p>
  1030. */
  1031. public void triggerJob(OperableTrigger trig) throws SchedulerException {
  1032. validateState();
  1033. trig.computeFirstFireTime( null);
  1034. boolean collision = true;
  1035. while (collision) {
  1036. try {
  1037. resources.getJobStore().storeTrigger(trig, false);
  1038. collision = false;
  1039. } catch (ObjectAlreadyExistsException oaee) {
  1040. trig.setKey( new TriggerKey(newTriggerId(), Scheduler.DEFAULT_GROUP));
  1041. }
  1042. }
  1043. notifySchedulerThread(trig.getNextFireTime().getTime());
  1044. notifySchedulerListenersSchduled(trig);
  1045. }
  1046. /**
  1047. * <p>
  1048. * Pause the <code>{@link Trigger}</code> with the given name.
  1049. * </p>
  1050. *
  1051. */
  1052. public void pauseTrigger(TriggerKey triggerKey) throws SchedulerException {
  1053. validateState();
  1054. resources.getJobStore().pauseTrigger(triggerKey);
  1055. notifySchedulerThread( 0L);
  1056. notifySchedulerListenersPausedTrigger(triggerKey);
  1057. }
  1058. /**
  1059. * <p>
  1060. * Pause all of the <code>{@link Trigger}s</code> in the matching groups.
  1061. * </p>
  1062. *
  1063. */
  1064. public void pauseTriggers(GroupMatcher<TriggerKey> matcher)
  1065. throws SchedulerException {
  1066. validateState();
  1067. if(matcher == null) {
  1068. matcher = GroupMatcher.groupEquals(Scheduler.DEFAULT_GROUP);
  1069. }
  1070. Collection<String> pausedGroups = resources.getJobStore().pauseTriggers(matcher);
  1071. notifySchedulerThread( 0L);
  1072. for (String pausedGroup : pausedGroups) {
  1073. notifySchedulerListenersPausedTriggers(pausedGroup);
  1074. }
  1075. }
  1076. /**
  1077. * <p>
  1078. * Pause the <code>{@link org.quartz.JobDetail}</code> with the given
  1079. * name - by pausing all of its current <code>Trigger</code>s.
  1080. * </p>
  1081. *
  1082. */
  1083. public void pauseJob(JobKey jobKey) throws SchedulerException {
  1084. validateState();
  1085. resources.getJobStore().pauseJob(jobKey);
  1086. notifySchedulerThread( 0L);
  1087. notifySchedulerListenersPausedJob(jobKey);
  1088. }
  1089. /**
  1090. * <p>
  1091. * Pause all of the <code>{@link org.quartz.JobDetail}s</code> in the
  1092. * matching groups - by pausing all of their <code>Trigger</code>s.
  1093. * </p>
  1094. *
  1095. */
  1096. public void pauseJobs(GroupMatcher<JobKey> groupMatcher)
  1097. throws SchedulerException {
  1098. validateState();
  1099. if(groupMatcher == null) {
  1100. groupMatcher = GroupMatcher.groupEquals(Scheduler.DEFAULT_GROUP);
  1101. }
  1102. Collection<String> pausedGroups = resources.getJobStore().pauseJobs(groupMatcher);
  1103. notifySchedulerThread( 0L);
  1104. for (String pausedGroup : pausedGroups) {
  1105. notifySchedulerListenersPausedJobs(pausedGroup);
  1106. }
  1107. }
  1108. /**
  1109. * <p>
  1110. * Resume (un-pause) the <code>{@link Trigger}</code> with the given
  1111. * name.
  1112. * </p>
  1113. *
  1114. * <p>
  1115. * If the <code>Trigger</code> missed one or more fire-times, then the
  1116. * <code>Trigger</code>'s misfire instruction will be applied.
  1117. * </p>
  1118. *
  1119. */
  1120. public void resumeTrigger(TriggerKey triggerKey) throws SchedulerException {
  1121. validateState();
  1122. resources.getJobStore().resumeTrigger(triggerKey);
  1123. notifySchedulerThread( 0L);
  1124. notifySchedulerListenersResumedTrigger(triggerKey);
  1125. }
  1126. /**
  1127. * <p>
  1128. * Resume (un-pause) all of the <code>{@link Trigger}s</code> in the
  1129. * matching groups.
  1130. * </p>
  1131. *
  1132. * <p>
  1133. * If any <code>Trigger</code> missed one or more fire-times, then the
  1134. * <code>Trigger</code>'s misfire instruction will be applied.
  1135. * </p>
  1136. *
  1137. */
  1138. public void resumeTriggers(GroupMatcher<TriggerKey> matcher)
  1139. throws SchedulerException {
  1140. validateState();
  1141. if(matcher == null) {
  1142. matcher = GroupMatcher.groupEquals(Scheduler.DEFAULT_GROUP);
  1143. }
  1144. Collection<String> pausedGroups = resources.getJobStore().resumeTriggers(matcher);
  1145. notifySchedulerThread( 0L);
  1146. for (String pausedGroup : pausedGroups) {
  1147. notifySchedulerListenersResumedTriggers(pausedGroup);
  1148. }
  1149. }
  1150. public Set<String> getPausedTriggerGroups() throws SchedulerException {
  1151. return resources.getJobStore().getPausedTriggerGroups();
  1152. }
  1153. /**
  1154. * <p>
  1155. * Resume (un-pause) the <code>{@link org.quartz.JobDetail}</code> with
  1156. * the given name.
  1157. * </p>
  1158. *
  1159. * <p>
  1160. * If any of the <code>Job</code>'s<code>Trigger</code> s missed one
  1161. * or more fire-times, then the <code>Trigger</code>'s misfire
  1162. * instruction will be applied.
  1163. * </p>
  1164. *
  1165. */
  1166. public void resumeJob(JobKey jobKey) throws SchedulerException {
  1167. validateState();
  1168. resources.getJobStore().resumeJob(jobKey);
  1169. notifySchedulerThread( 0L);
  1170. notifySchedulerListenersResumedJob(jobKey);
  1171. }
  1172. /**
  1173. * <p>
  1174. * Resume (un-pause) all of the <code>{@link org.quartz.JobDetail}s</code>
  1175. * in the matching groups.
  1176. * </p>
  1177. *
  1178. * <p>
  1179. * If any of the <code>Job</code> s had <code>Trigger</code> s that
  1180. * missed one or more fire-times, then the <code>Trigger</code>'s
  1181. * misfire instruction will be applied.
  1182. * </p>
  1183. *
  1184. */
  1185. public void resumeJobs(GroupMatcher<JobKey> matcher)
  1186. throws SchedulerException {
  1187. validateState();
  1188. if(matcher == null) {
  1189. matcher = GroupMatcher.groupEquals(Scheduler.DEFAULT_GROUP);
  1190. }
  1191. Collection<String> resumedGroups = resources.getJobStore().resumeJobs(matcher);
  1192. notifySchedulerThread( 0L);
  1193. for (String pausedGroup : resumedGroups) {
  1194. notifySchedulerListenersResumedJobs(pausedGroup);
  1195. }
  1196. }
  1197. /**
  1198. * <p>
  1199. * Pause all triggers - equivalent of calling <code>pauseTriggers(GroupMatcher<TriggerKey>)</code>
  1200. * with a matcher matching all known groups.
  1201. * </p>
  1202. *
  1203. * <p>
  1204. * When <code>resumeAll()</code> is called (to un-pause), trigger misfire
  1205. * instructions WILL be applied.
  1206. * </p>
  1207. *
  1208. * @see #resumeAll()
  1209. * @see #pauseTriggers(org.quartz.impl.matchers.GroupMatcher)
  1210. * @see #standby()
  1211. */
  1212. public void pauseAll() throws SchedulerException {
  1213. validateState();
  1214. resources.getJobStore().pauseAll();
  1215. notifySchedulerThread( 0L);
  1216. notifySchedulerListenersPausedTriggers( null);
  1217. }
  1218. /**
  1219. * <p>
  1220. * Resume (un-pause) all triggers - equivalent of calling <code>resumeTriggerGroup(group)</code>
  1221. * on every group.
  1222. * </p>
  1223. *
  1224. * <p>
  1225. * If any <code>Trigger</code> missed one or more fire-times, then the
  1226. * <code>Trigger</code>'s misfire instruction will be applied.
  1227. * </p>
  1228. *
  1229. * @see #pauseAll()
  1230. */
  1231. public void resumeAll() throws SchedulerException {
  1232. validateState();
  1233. resources.getJobStore().resumeAll();
  1234. notifySchedulerThread( 0L);
  1235. notifySchedulerListenersResumedTrigger( null);
  1236. }
  1237. /**
  1238. * <p>
  1239. * Get the names of all known <code>{@link org.quartz.Job}</code> groups.
  1240. * </p>
  1241. */
  1242. public List<String> getJobGroupNames()
  1243. throws SchedulerException {
  1244. validateState();
  1245. return resources.getJobStore().getJobGroupNames();
  1246. }
  1247. /**
  1248. * <p>
  1249. * Get the names of all the <code>{@link org.quartz.Job}s</code> in the
  1250. * matching groups.
  1251. * </p>
  1252. */
  1253. public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher)
  1254. throws SchedulerException {
  1255. validateState();
  1256. if(matcher == null) {
  1257. matcher = GroupMatcher.groupEquals(Scheduler.DEFAULT_GROUP);
  1258. }
  1259. return resources.getJobStore().getJobKeys(matcher);
  1260. }
  1261. /**
  1262. * <p>
  1263. * Get all <code>{@link Trigger}</code> s that are associated with the
  1264. * identified <code>{@link org.quartz.JobDetail}</code>.
  1265. * </p>
  1266. */
  1267. public List<? extends Trigger> getTriggersOfJob(JobKey jobKey) throws SchedulerException {
  1268. validateState();
  1269. return resources.getJobStore().getTriggersForJob(jobKey);
  1270. }
  1271. /**
  1272. * <p>
  1273. * Get the names of all known <code>{@link org.quartz.Trigger}</code>
  1274. * groups.
  1275. * </p>
  1276. */
  1277. public List<String> getTriggerGroupNames()
  1278. throws SchedulerException {
  1279. validateState();
  1280. return resources.getJobStore().getTriggerGroupNames();
  1281. }
  1282. /**
  1283. * <p>
  1284. * Get the names of all the <code>{@link org.quartz.Trigger}s</code> in
  1285. * the matching groups.
  1286. * </p>
  1287. */
  1288. public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher)
  1289. throws SchedulerException {
  1290. validateState();
  1291. if(matcher == null) {
  1292. matcher = GroupMatcher.groupEquals(Scheduler.DEFAULT_GROUP);
  1293. }
  1294. return resources.getJobStore().getTriggerKeys(matcher);
  1295. }
  1296. /**
  1297. * <p>
  1298. * Get the <code>{@link JobDetail}</code> for the <code>Job</code>
  1299. * instance with the given name and group.
  1300. * </p>
  1301. */
  1302. public JobDetail getJobDetail(JobKey jobKey) throws SchedulerException {
  1303. validateState();
  1304. return resources.getJobStore().retrieveJob(jobKey);
  1305. }
  1306. /**
  1307. * <p>
  1308. * Get the <code>{@link Trigger}</code> instance with the given name and
  1309. * group.
  1310. * </p>
  1311. */
  1312. public Trigger getTrigger(TriggerKey triggerKey) throws SchedulerException {
  1313. validateState();
  1314. return resources.getJobStore().retrieveTrigger(triggerKey);
  1315. }
  1316. /**
  1317. * Determine whether a {@link Job} with the given identifier already
  1318. * exists within the scheduler.
  1319. *
  1320. * @param jobKey the identifier to check for
  1321. * @return true if a Job exists with the given identifier
  1322. * @throws SchedulerException
  1323. */
  1324. public boolean checkExists(JobKey jobKey) throws SchedulerException {
  1325. validateState();
  1326. return resources.getJobStore().checkExists(jobKey);
  1327. }
  1328. /**
  1329. * Determine whether a {@link Trigger} with the given identifier already
  1330. * exists within the scheduler.
  1331. *
  1332. * @param triggerKey the identifier to check for
  1333. * @return true if a Trigger exists with the given identifier
  1334. * @throws SchedulerException
  1335. */
  1336. public boolean checkExists(TriggerKey triggerKey) throws SchedulerException {
  1337. validateState();
  1338. return resources.getJobStore().checkExists(triggerKey);
  1339. }
  1340. /**
  1341. * Clears (deletes!) all scheduling data - all {@link Job}s, {@link Trigger}s
  1342. * {@link Calendar}s.
  1343. *
  1344. * @throws SchedulerException
  1345. */
  1346. public void clear() throws SchedulerException {
  1347. validateState();
  1348. resources.getJobStore().clearAllSchedulingData();
  1349. notifySchedulerListenersUnscheduled( null);
  1350. }
  1351. /**
  1352. * <p>
  1353. * Get the current state of the identified <code>{@link Trigger}</code>.
  1354. * </p>
  1355. J *
  1356. * @see TriggerState
  1357. */
  1358. public TriggerState getTriggerState(TriggerKey triggerKey) throws SchedulerException {
  1359. validateState();
  1360. return resources.getJobStore().getTriggerState(triggerKey);
  1361. }
  1362. public void resetTriggerFromErrorState(TriggerKey triggerKey) throws SchedulerException {
  1363. validateState();
  1364. resources.getJobStore().resetTriggerFromErrorState(triggerKey);
  1365. }
  1366. /**
  1367. * <p>
  1368. * Add (register) the given <code>Calendar</code> to the Scheduler.
  1369. * </p>
  1370. *
  1371. * @throws SchedulerException
  1372. * if there is an internal Scheduler error, or a Calendar with
  1373. * the same name already exists, and <code>replace</code> is
  1374. * <code>false</code>.
  1375. */
  1376. public void addCalendar(String calName, Calendar calendar, boolean replace, boolean updateTriggers) throws SchedulerException {
  1377. validateState();
  1378. resources.getJobStore().storeCalendar(calName, calendar, replace, updateTriggers);
  1379. }
  1380. /**
  1381. * <p>
  1382. * Delete the identified <code>Calendar</code> from the Scheduler.
  1383. * </p>
  1384. *
  1385. * @return true if the Calendar was found and deleted.
  1386. * @throws SchedulerException
  1387. * if there is an internal Scheduler error.
  1388. */
  1389. public boolean deleteCalendar(String calName)
  1390. throws SchedulerException {
  1391. validateState();
  1392. return resources.getJobStore().removeCalendar(calName);
  1393. }
  1394. /**
  1395. * <p>
  1396. * Get the <code>{@link Calendar}</code> instance with the given name.
  1397. * </p>
  1398. */
  1399. public Calendar getCalendar(String calName)
  1400. throws SchedulerException {
  1401. validateState();
  1402. return resources.getJobStore().retrieveCalendar(calName);
  1403. }
  1404. /**
  1405. * <p>
  1406. * Get the names of all registered <code>{@link Calendar}s</code>.
  1407. * </p>
  1408. */
  1409. public List<String> getCalendarNames()
  1410. throws SchedulerException {
  1411. validateState();
  1412. return resources.getJobStore().getCalendarNames();
  1413. }
  1414. public ListenerManager getListenerManager() {
  1415. return listenerManager;
  1416. }
  1417. /**
  1418. * <p>
  1419. * Add the given <code>{@link org.quartz.JobListener}</code> to the
  1420. * <code>Scheduler</code>'s <i>internal</i> list.
  1421. * </p>
  1422. */
  1423. public void addInternalJobListener(JobListener jobListener) {
  1424. if (jobListener.getName() == null
  1425. || jobListener.getName().length() == 0) {
  1426. throw new IllegalArgumentException(
  1427. "JobListener name cannot be empty.");
  1428. }
  1429. synchronized (internalJobListeners) {
  1430. internalJobListeners.put(jobListener.getName(), jobListener);
  1431. }
  1432. }
  1433. /**
  1434. * <p>
  1435. * Remove the identified <code>{@link JobListener}</code> from the <code>Scheduler</code>'s
  1436. * list of <i>internal</i> listeners.
  1437. * </p>
  1438. *
  1439. * @return true if the identified listener was found in the list, and
  1440. * removed.
  1441. */
  1442. public boolean removeInternalJobListener(String name) {
  1443. synchronized (internalJobListeners) {
  1444. return (internalJobListeners.remove(name) != null);
  1445. }
  1446. }
  1447. /**
  1448. * <p>
  1449. * Get a List containing all of the <code>{@link org.quartz.JobListener}</code>s
  1450. * in the <code>Scheduler</code>'s <i>internal</i> list.
  1451. * </p>
  1452. */
  1453. public List<JobListener> getInternalJobListeners() {
  1454. synchronized (internalJobListeners) {
  1455. return java.util.Collections.unmodifiableList( new LinkedList<JobListener>(internalJobListeners.values()));
  1456. }
  1457. }
  1458. /**
  1459. * <p>
  1460. * Get the <i>internal</i> <code>{@link org.quartz.JobListener}</code>
  1461. * that has the given name.
  1462. * </p>
  1463. */
  1464. public JobListener getInternalJobListener(String name) {
  1465. synchronized (internalJobListeners) {
  1466. return internalJobListeners.get(name);
  1467. }
  1468. }
  1469. /**
  1470. * <p>
  1471. * Add the given <code>{@link org.quartz.TriggerListener}</code> to the
  1472. * <code>Scheduler</code>'s <i>internal</i> list.
  1473. * </p>
  1474. */
  1475. public void addInternalTriggerListener(TriggerListener triggerListener) {
  1476. if (triggerListener.getName() == null
  1477. || triggerListener.getName().length() == 0) {
  1478. throw new IllegalArgumentException(
  1479. "TriggerListener name cannot be empty.");
  1480. }
  1481. synchronized (internalTriggerListeners) {
  1482. internalTriggerListeners.put(triggerListener.getName(), triggerListener);
  1483. }
  1484. }
  1485. /**
  1486. * <p>
  1487. * Remove the identified <code>{@link TriggerListener}</code> from the <code>Scheduler</code>'s
  1488. * list of <i>internal</i> listeners.
  1489. * </p>
  1490. *
  1491. * @return true if the identified listener was found in the list, and
  1492. * removed.
  1493. */
  1494. public boolean removeinternalTriggerListener(String name) {
  1495. synchronized (internalTriggerListeners) {
  1496. return (internalTriggerListeners.remove(name) != null);
  1497. }
  1498. }
  1499. /**
  1500. * <p>
  1501. * Get a list containing all of the <code>{@link org.quartz.TriggerListener}</code>s
  1502. * in the <code>Scheduler</code>'s <i>internal</i> list.
  1503. * </p>
  1504. */
  1505. public List<TriggerListener> getInternalTriggerListeners() {
  1506. synchronized (internalTriggerListeners) {
  1507. return java.util.Collections.unmodifiableList( new LinkedList<TriggerListener>(internalTriggerListeners.values()));
  1508. }
  1509. }
  1510. /**
  1511. * <p>
  1512. * Get the <i>internal</i> <code>{@link TriggerListener}</code> that
  1513. * has the given name.
  1514. * </p>
  1515. */
  1516. public TriggerListener getInternalTriggerListener(String name) {
  1517. synchronized (internalTriggerListeners) {
  1518. return internalTriggerListeners.get(name);
  1519. }
  1520. }
  1521. /**
  1522. * <p>
  1523. * Register the given <code>{@link SchedulerListener}</code> with the
  1524. * <code>Scheduler</code>'s list of internal listeners.
  1525. * </p>
  1526. */
  1527. public void addInternalSchedulerListener(SchedulerListener schedulerListener) {
  1528. synchronized (internalSchedulerListeners) {
  1529. internalSchedulerListeners.add(schedulerListener);
  1530. }
  1531. }
  1532. /**
  1533. * <p>
  1534. * Remove the given <code>{@link SchedulerListener}</code> from the
  1535. * <code>Scheduler</code>'s list of internal listeners.
  1536. * </p>
  1537. *
  1538. * @return true if the identified listener was found in the list, and
  1539. * removed.
  1540. */
  1541. public boolean removeInternalSchedulerListener(SchedulerListener schedulerListener) {
  1542. synchronized (internalSchedulerListeners) {
  1543. return internalSchedulerListeners.remove(schedulerListener);
  1544. }
  1545. }
  1546. /**
  1547. * <p>
  1548. * Get a List containing all of the <i>internal</i> <code>{@link SchedulerListener}</code>s
  1549. * registered with the <code>Scheduler</code>.
  1550. * </p>
  1551. */
  1552. public List<SchedulerListener> getInternalSchedulerListeners() {
  1553. synchronized (internalSchedulerListeners) {
  1554. return java.util.Collections.unmodifiableList( new ArrayList<SchedulerListener>(internalSchedulerListeners));
  1555. }
  1556. }
  1557. protected void notifyJobStoreJobComplete(OperableTrigger trigger, JobDetail detail, CompletedExecutionInstruction instCode) {
  1558. resources.getJobStore().triggeredJobComplete(trigger, detail, instCode);
  1559. }
  1560. protected void notifyJobStoreJobVetoed(OperableTrigger trigger, JobDetail detail, CompletedExecutionInstruction instCode) {
  1561. resources.getJobStore().triggeredJobComplete(trigger, detail, instCode);
  1562. }
  1563. protected void notifySchedulerThread(long candidateNewNextFireTime) {
  1564. if (isSignalOnSchedulingChange()) {
  1565. signaler.signalSchedulingChange(candidateNewNextFireTime);
  1566. }
  1567. }
  1568. private List<TriggerListener> buildTriggerListenerList()
  1569. throws SchedulerException {
  1570. List<TriggerListener> allListeners = new LinkedList<TriggerListener>();
  1571. allListeners.addAll(getListenerManager().getTriggerListeners());
  1572. allListeners.addAll(getInternalTriggerListeners());
  1573. return allListeners;
  1574. }
  1575. private List<JobListener> buildJobListenerList()
  1576. throws SchedulerException {
  1577. List<JobListener> allListeners = new LinkedList<JobListener>();
  1578. allListeners.addAll(getListenerManager().getJobListeners());
  1579. allListeners.addAll(getInternalJobListeners());
  1580. return allListeners;
  1581. }
  1582. private List<SchedulerListener> buildSchedulerListenerList() {
  1583. List<SchedulerListener> allListeners = new LinkedList<SchedulerListener>();
  1584. allListeners.addAll(getListenerManager().getSchedulerListeners());
  1585. allListeners.addAll(getInternalSchedulerListeners());
  1586. return allListeners;
  1587. }
  1588. private boolean matchJobListener(JobListener listener, JobKey key) {
  1589. List<Matcher<JobKey>> matchers = getListenerManager().getJobListenerMatchers(listener.getName());
  1590. if(matchers == null)
  1591. return true;
  1592. for(Matcher<JobKey> matcher: matchers) {
  1593. if(matcher.isMatch(key))
  1594. return true;
  1595. }
  1596. return false;
  1597. }
  1598. private boolean matchTriggerListener(TriggerListener listener, TriggerKey key) {
  1599. List<Matcher<TriggerKey>> matchers = getListenerManager().getTriggerListenerMatchers(listener.getName());
  1600. if(matchers == null)
  1601. return true;
  1602. for(Matcher<TriggerKey> matcher: matchers) {
  1603. if(matcher.isMatch(key))
  1604. return true;
  1605. }
  1606. return false;
  1607. }
  1608. public boolean notifyTriggerListenersFired(JobExecutionContext jec)
  1609. throws SchedulerException {
  1610. boolean vetoedExecution = false;
  1611. // build a list of all trigger listeners that are to be notified...
  1612. List<TriggerListener> triggerListeners = buildTriggerListenerList();
  1613. // notify all trigger listeners in the list
  1614. for(TriggerListener tl: triggerListeners) {
  1615. try {
  1616. if(!matchTriggerListener(tl, jec.getTrigger().getKey()))
  1617. continue;
  1618. tl.triggerFired(jec.getTrigger(), jec);
  1619. if(tl.vetoJobExecution(jec.getTrigger(), jec)) {
  1620. vetoedExecution = true;
  1621. }
  1622. } catch (Exception e) {
  1623. SchedulerException se = new SchedulerException(
  1624. "TriggerListener '" + tl.getName()
  1625. + "' threw exception: " + e.getMessage(), e);
  1626. throw se;
  1627. }
  1628. }
  1629. return vetoedExecution;
  1630. }
  1631. public void notifyTriggerListenersMisfired(Trigger trigger)
  1632. throws SchedulerException {
  1633. // build a list of all trigger listeners that are to be notified...
  1634. List<TriggerListener> triggerListeners = buildTriggerListenerList();
  1635. // notify all trigger listeners in the list
  1636. for(TriggerListener tl: triggerListeners) {
  1637. try {
  1638. if(!matchTriggerListener(tl, trigger.getKey()))
  1639. continue;
  1640. tl.triggerMisfired(trigger);
  1641. } catch (Exception e) {
  1642. SchedulerException se = new SchedulerException(
  1643. "TriggerListener '" + tl.getName()
  1644. + "' threw exception: " + e.getMessage(), e);
  1645. throw se;
  1646. }
  1647. }
  1648. }
  1649. public void notifyTriggerListenersComplete(JobExecutionContext jec,
  1650. CompletedExecutionInstruction instCode) throws SchedulerException {
  1651. // build a list of all trigger listeners that are to be notified...
  1652. List<TriggerListener> triggerListeners = buildTriggerListenerList();
  1653. // notify all trigger listeners in the list
  1654. for(TriggerListener tl: triggerListeners) {
  1655. try {
  1656. if(!matchTriggerListener(tl, jec.getTrigger().getKey()))
  1657. continue;
  1658. tl.triggerComplete(jec.getTrigger(), jec, instCode);
  1659. } catch (Exception e) {
  1660. SchedulerException se = new SchedulerException(
  1661. "TriggerListener '" + tl.getName()
  1662. + "' threw exception: " + e.getMessage(), e);
  1663. throw se;
  1664. }
  1665. }
  1666. }
  1667. public void notifyJobListenersToBeExecuted(JobExecutionContext jec)
  1668. throws SchedulerException {
  1669. // build a list of all job listeners that are to be notified...
  1670. List<JobListener> jobListeners = buildJobListenerList();
  1671. // notify all job listeners
  1672. for(JobListener jl: jobListeners) {
  1673. try {
  1674. if(!matchJobListener(jl, jec.getJobDetail().getKey()))
  1675. continue;
  1676. jl.jobToBeExecuted(jec);
  1677. } catch (Exception e) {
  1678. SchedulerException se = new SchedulerException(
  1679. "JobListener '" + jl.getName() + "' threw exception: "
  1680. + e.getMessage(), e);
  1681. throw se;
  1682. }
  1683. }
  1684. }
  1685. public void notifyJobListenersWasVetoed(JobExecutionContext jec)
  1686. throws SchedulerException {
  1687. // build a list of all job listeners that are to be notified...
  1688. List<JobListener> jobListeners = buildJobListenerList();
  1689. // notify all job listeners
  1690. for(JobListener jl: jobListeners) {
  1691. try {
  1692. if(!matchJobListener(jl, jec.getJobDetail().getKey()))
  1693. continue;
  1694. jl.jobExecutionVetoed(jec);
  1695. } catch (Exception e) {
  1696. SchedulerException se = new SchedulerException(
  1697. "JobListener '" + jl.getName() + "' threw exception: "
  1698. + e.getMessage(), e);
  1699. throw se;
  1700. }
  1701. }
  1702. }
  1703. public void notifyJobListenersWasExecuted(JobExecutionContext jec,
  1704. JobExecutionException je) throws SchedulerException {
  1705. // build a list of all job listeners that are to be notified...
  1706. List<JobListener> jobListeners = buildJobListenerList();
  1707. // notify all job listeners
  1708. for(JobListener jl: jobListeners) {
  1709. try {
  1710. if(!matchJobListener(jl, jec.getJobDetail().getKey()))
  1711. continue;
  1712. jl.jobWasExecuted(jec, je);
  1713. } catch (Exception e) {
  1714. SchedulerException se = new SchedulerException(
  1715. "JobListener '" + jl.getName() + "' threw exception: "
  1716. + e.getMessage(), e);
  1717. throw se;
  1718. }
  1719. }
  1720. }
  1721. public void notifySchedulerListenersError(String msg, SchedulerException se) {
  1722. // build a list of all scheduler listeners that are to be notified...
  1723. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1724. // notify all scheduler listeners
  1725. for(SchedulerListener sl: schedListeners) {
  1726. try {
  1727. sl.schedulerError(msg, se);
  1728. } catch (Exception e) {
  1729. getLog()
  1730. .error(
  1731. "Error while notifying SchedulerListener of error: ",
  1732. e);
  1733. getLog().error(
  1734. " Original error (for notification) was: " + msg, se);
  1735. }
  1736. }
  1737. }
  1738. public void notifySchedulerListenersSchduled(Trigger trigger) {
  1739. // build a list of all scheduler listeners that are to be notified...
  1740. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1741. // notify all scheduler listeners
  1742. for(SchedulerListener sl: schedListeners) {
  1743. try {
  1744. sl.jobScheduled(trigger);
  1745. } catch (Exception e) {
  1746. getLog().error(
  1747. "Error while notifying SchedulerListener of scheduled job."
  1748. + " Triger=" + trigger.getKey(), e);
  1749. }
  1750. }
  1751. }
  1752. public void notifySchedulerListenersUnscheduled(TriggerKey triggerKey) {
  1753. // build a list of all scheduler listeners that are to be notified...
  1754. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1755. // notify all scheduler listeners
  1756. for(SchedulerListener sl: schedListeners) {
  1757. try {
  1758. if(triggerKey == null)
  1759. sl.schedulingDataCleared();
  1760. else
  1761. sl.jobUnscheduled(triggerKey);
  1762. } catch (Exception e) {
  1763. getLog().error(
  1764. "Error while notifying SchedulerListener of unscheduled job."
  1765. + " Triger=" + (triggerKey == null ? "ALL DATA" : triggerKey), e);
  1766. }
  1767. }
  1768. }
  1769. public void notifySchedulerListenersFinalized(Trigger trigger) {
  1770. // build a list of all scheduler listeners that are to be notified...
  1771. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1772. // notify all scheduler listeners
  1773. for(SchedulerListener sl: schedListeners) {
  1774. try {
  1775. sl.triggerFinalized(trigger);
  1776. } catch (Exception e) {
  1777. getLog().error(
  1778. "Error while notifying SchedulerListener of finalized trigger."
  1779. + " Triger=" + trigger.getKey(), e);
  1780. }
  1781. }
  1782. }
  1783. public void notifySchedulerListenersPausedTrigger(TriggerKey triggerKey) {
  1784. // build a list of all scheduler listeners that are to be notified...
  1785. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1786. // notify all scheduler listeners
  1787. for(SchedulerListener sl: schedListeners) {
  1788. try {
  1789. sl.triggerPaused(triggerKey);
  1790. } catch (Exception e) {
  1791. getLog().error(
  1792. "Error while notifying SchedulerListener of paused trigger: "
  1793. + triggerKey, e);
  1794. }
  1795. }
  1796. }
  1797. public void notifySchedulerListenersPausedTriggers(String group) {
  1798. // build a list of all scheduler listeners that are to be notified...
  1799. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1800. // notify all scheduler listeners
  1801. for(SchedulerListener sl: schedListeners) {
  1802. try {
  1803. sl.triggersPaused(group);
  1804. } catch (Exception e) {
  1805. getLog().error(
  1806. "Error while notifying SchedulerListener of paused trigger group."
  1807. + group, e);
  1808. }
  1809. }
  1810. }
  1811. public void notifySchedulerListenersResumedTrigger(TriggerKey key) {
  1812. // build a list of all scheduler listeners that are to be notified...
  1813. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1814. // notify all scheduler listeners
  1815. for(SchedulerListener sl: schedListeners) {
  1816. try {
  1817. sl.triggerResumed(key);
  1818. } catch (Exception e) {
  1819. getLog().error(
  1820. "Error while notifying SchedulerListener of resumed trigger: "
  1821. + key, e);
  1822. }
  1823. }
  1824. }
  1825. public void notifySchedulerListenersResumedTriggers(String group) {
  1826. // build a list of all scheduler listeners that are to be notified...
  1827. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1828. // notify all scheduler listeners
  1829. for(SchedulerListener sl: schedListeners) {
  1830. try {
  1831. sl.triggersResumed(group);
  1832. } catch (Exception e) {
  1833. getLog().error(
  1834. "Error while notifying SchedulerListener of resumed group: "
  1835. + group, e);
  1836. }
  1837. }
  1838. }
  1839. public void notifySchedulerListenersPausedJob(JobKey key) {
  1840. // build a list of all scheduler listeners that are to be notified...
  1841. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1842. // notify all scheduler listeners
  1843. for(SchedulerListener sl: schedListeners) {
  1844. try {
  1845. sl.jobPaused(key);
  1846. } catch (Exception e) {
  1847. getLog().error(
  1848. "Error while notifying SchedulerListener of paused job: "
  1849. + key, e);
  1850. }
  1851. }
  1852. }
  1853. public void notifySchedulerListenersPausedJobs(String group) {
  1854. // build a list of all scheduler listeners that are to be notified...
  1855. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1856. // notify all scheduler listeners
  1857. for(SchedulerListener sl: schedListeners) {
  1858. try {
  1859. sl.jobsPaused(group);
  1860. } catch (Exception e) {
  1861. getLog().error(
  1862. "Error while notifying SchedulerListener of paused job group: "
  1863. + group, e);
  1864. }
  1865. }
  1866. }
  1867. public void notifySchedulerListenersResumedJob(JobKey key) {
  1868. // build a list of all scheduler listeners that are to be notified...
  1869. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1870. // notify all scheduler listeners
  1871. for(SchedulerListener sl: schedListeners) {
  1872. try {
  1873. sl.jobResumed(key);
  1874. } catch (Exception e) {
  1875. getLog().error(
  1876. "Error while notifying SchedulerListener of resumed job: "
  1877. + key, e);
  1878. }
  1879. }
  1880. }
  1881. public void notifySchedulerListenersResumedJobs(String group) {
  1882. // build a list of all scheduler listeners that are to be notified...
  1883. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1884. // notify all scheduler listeners
  1885. for(SchedulerListener sl: schedListeners) {
  1886. try {
  1887. sl.jobsResumed(group);
  1888. } catch (Exception e) {
  1889. getLog().error(
  1890. "Error while notifying SchedulerListener of resumed job group: "
  1891. + group, e);
  1892. }
  1893. }
  1894. }
  1895. public void notifySchedulerListenersInStandbyMode() {
  1896. // build a list of all scheduler listeners that are to be notified...
  1897. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1898. // notify all scheduler listeners
  1899. for(SchedulerListener sl: schedListeners) {
  1900. try {
  1901. sl.schedulerInStandbyMode();
  1902. } catch (Exception e) {
  1903. getLog().error(
  1904. "Error while notifying SchedulerListener of inStandByMode.",
  1905. e);
  1906. }
  1907. }
  1908. }
  1909. public void notifySchedulerListenersStarted() {
  1910. // build a list of all scheduler listeners that are to be notified...
  1911. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1912. // notify all scheduler listeners
  1913. for(SchedulerListener sl: schedListeners) {
  1914. try {
  1915. sl.schedulerStarted();
  1916. } catch (Exception e) {
  1917. getLog().error(
  1918. "Error while notifying SchedulerListener of startup.",
  1919. e);
  1920. }
  1921. }
  1922. }
  1923. public void notifySchedulerListenersStarting() {
  1924. // build a list of all scheduler listeners that are to be notified...
  1925. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1926. // notify all scheduler listeners
  1927. for (SchedulerListener sl : schedListeners) {
  1928. try {
  1929. sl.schedulerStarting();
  1930. } catch (Exception e) {
  1931. getLog().error(
  1932. "Error while notifying SchedulerListener of startup.",
  1933. e);
  1934. }
  1935. }
  1936. }
  1937. public void notifySchedulerListenersShutdown() {
  1938. // build a list of all scheduler listeners that are to be notified...
  1939. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1940. // notify all scheduler listeners
  1941. for(SchedulerListener sl: schedListeners) {
  1942. try {
  1943. sl.schedulerShutdown();
  1944. } catch (Exception e) {
  1945. getLog().error(
  1946. "Error while notifying SchedulerListener of shutdown.",
  1947. e);
  1948. }
  1949. }
  1950. }
  1951. public void notifySchedulerListenersShuttingdown() {
  1952. // build a list of all scheduler listeners that are to be notified...
  1953. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1954. // notify all scheduler listeners
  1955. for(SchedulerListener sl: schedListeners) {
  1956. try {
  1957. sl.schedulerShuttingdown();
  1958. } catch (Exception e) {
  1959. getLog().error(
  1960. "Error while notifying SchedulerListener of shutdown.",
  1961. e);
  1962. }
  1963. }
  1964. }
  1965. public void notifySchedulerListenersJobAdded(JobDetail jobDetail) {
  1966. // build a list of all scheduler listeners that are to be notified...
  1967. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1968. // notify all scheduler listeners
  1969. for(SchedulerListener sl: schedListeners) {
  1970. try {
  1971. sl.jobAdded(jobDetail);
  1972. } catch (Exception e) {
  1973. getLog().error(
  1974. "Error while notifying SchedulerListener of JobAdded.",
  1975. e);
  1976. }
  1977. }
  1978. }
  1979. public void notifySchedulerListenersJobDeleted(JobKey jobKey) {
  1980. // build a list of all scheduler listeners that are to be notified...
  1981. List<SchedulerListener> schedListeners = buildSchedulerListenerList();
  1982. // notify all scheduler listeners
  1983. for(SchedulerListener sl: schedListeners) {
  1984. try {
  1985. sl.jobDeleted(jobKey);
  1986. } catch (Exception e) {
  1987. getLog().error(
  1988. "Error while notifying SchedulerListener of JobAdded.",
  1989. e);
  1990. }
  1991. }
  1992. }
  1993. public void setJobFactory(JobFactory factory) throws SchedulerException {
  1994. if(factory == null) {
  1995. throw new IllegalArgumentException( "JobFactory cannot be set to null!");
  1996. }
  1997. getLog().info( "JobFactory set to: " + factory);
  1998. this.jobFactory = factory;
  1999. }
  2000. public JobFactory getJobFactory() {
  2001. return jobFactory;
  2002. }
  2003. /**
  2004. * Interrupt all instances of the identified InterruptableJob executing in
  2005. * this Scheduler instance.
  2006. *
  2007. * <p>
  2008. * This method is not cluster aware. That is, it will only interrupt
  2009. * instances of the identified InterruptableJob currently executing in this
  2010. * Scheduler instance, not across the entire cluster.
  2011. * </p>
  2012. *
  2013. * @see org.quartz.core.RemotableQuartzScheduler#interrupt(JobKey)
  2014. */
  2015. public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
  2016. List<JobExecutionContext> jobs = getCurrentlyExecutingJobs();
  2017. JobDetail jobDetail = null;
  2018. Job job = null;
  2019. boolean interrupted = false;
  2020. for(JobExecutionContext jec : jobs) {
  2021. jobDetail = jec.getJobDetail();
  2022. if (jobKey.equals(jobDetail.getKey())) {
  2023. job = jec.getJobInstance();
  2024. if (job instanceof InterruptableJob) {
  2025. ((InterruptableJob)job).interrupt();
  2026. interrupted = true;
  2027. } else {
  2028. throw new UnableToInterruptJobException(
  2029. "Job " + jobDetail.getKey() +
  2030. " can not be interrupted, since it does not implement " +
  2031. InterruptableJob.class.getName());
  2032. }
  2033. }
  2034. }
  2035. return interrupted;
  2036. }
  2037. /**
  2038. * Interrupt the identified InterruptableJob executing in this Scheduler instance.
  2039. *
  2040. * <p>
  2041. * This method is not cluster aware. That is, it will only interrupt
  2042. * instances of the identified InterruptableJob currently executing in this
  2043. * Scheduler instance, not across the entire cluster.
  2044. * </p>
  2045. *
  2046. * @see org.quartz.core.RemotableQuartzScheduler#interrupt(JobKey)
  2047. */
  2048. public boolean interrupt(String fireInstanceId) throws UnableToInterruptJobException {
  2049. List<JobExecutionContext> jobs = getCurrentlyExecutingJobs();
  2050. Job job = null;
  2051. for(JobExecutionContext jec : jobs) {
  2052. if (jec.getFireInstanceId().equals(fireInstanceId)) {
  2053. job = jec.getJobInstance();
  2054. if (job instanceof InterruptableJob) {
  2055. ((InterruptableJob)job).interrupt();
  2056. return true;
  2057. } else {
  2058. throw new UnableToInterruptJobException(
  2059. "Job " + jec.getJobDetail().getKey() +
  2060. " can not be interrupted, since it does not implement " +
  2061. InterruptableJob.class.getName());
  2062. }
  2063. }
  2064. }
  2065. return false;
  2066. }
  2067. private void shutdownPlugins() {
  2068. java.util.Iterator<SchedulerPlugin> itr = resources.getSchedulerPlugins().iterator();
  2069. while (itr.hasNext()) {
  2070. SchedulerPlugin plugin = itr.next();
  2071. plugin.shutdown();
  2072. }
  2073. }
  2074. private void startPlugins() {
  2075. java.util.Iterator<SchedulerPlugin> itr = resources.getSchedulerPlugins().iterator();
  2076. while (itr.hasNext()) {
  2077. SchedulerPlugin plugin = itr.next();
  2078. plugin.start();
  2079. }
  2080. }
  2081. }
  2082. /////////////////////////////////////////////////////////////////////////////
  2083. //
  2084. // ErrorLogger - Scheduler Listener Class
  2085. //
  2086. /////////////////////////////////////////////////////////////////////////////
  2087. class ErrorLogger extends SchedulerListenerSupport {
  2088. ErrorLogger() {
  2089. }
  2090. @Override
  2091. public void schedulerError(String msg, SchedulerException cause) {
  2092. getLog().error(msg, cause);
  2093. }
  2094. }
  2095. /////////////////////////////////////////////////////////////////////////////
  2096. //
  2097. // ExecutingJobsManager - Job Listener Class
  2098. //
  2099. /////////////////////////////////////////////////////////////////////////////
  2100. class ExecutingJobsManager implements JobListener {
  2101. HashMap<String, JobExecutionContext> executingJobs = new HashMap<String, JobExecutionContext>();
  2102. AtomicInteger numJobsFired = new AtomicInteger( 0);
  2103. ExecutingJobsManager() {
  2104. }
  2105. public String getName() {
  2106. return getClass().getName();
  2107. }
  2108. public int getNumJobsCurrentlyExecuting() {
  2109. synchronized (executingJobs) {
  2110. return executingJobs.size();
  2111. }
  2112. }
  2113. public void jobToBeExecuted(JobExecutionContext context) {
  2114. numJobsFired.incrementAndGet();
  2115. synchronized (executingJobs) {
  2116. executingJobs
  2117. .put(((OperableTrigger)context.getTrigger()).getFireInstanceId(), context);
  2118. }
  2119. }
  2120. public void jobWasExecuted(JobExecutionContext context,
  2121. JobExecutionException jobException) {
  2122. synchronized (executingJobs) {
  2123. executingJobs.remove(((OperableTrigger)context.getTrigger()).getFireInstanceId());
  2124. }
  2125. }
  2126. public int getNumJobsFired() {
  2127. return numJobsFired.get();
  2128. }
  2129. public List<JobExecutionContext> getExecutingJobs() {
  2130. synchronized (executingJobs) {
  2131. return java.util.Collections.unmodifiableList( new ArrayList<JobExecutionContext>(
  2132. executingJobs.values()));
  2133. }
  2134. }
  2135. public void jobExecutionVetoed(JobExecutionContext context) {
  2136. }
  2137. }

org.quartz.core.QuartzSchedulerThread


  
  1. /*
  2. * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy
  6. * of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. * License for the specific language governing permissions and limitations
  14. * under the License.
  15. *
  16. */
  17. package org.quartz.core;
  18. import java.util.ArrayList;
  19. import java.util.List;
  20. import java.util.Random;
  21. import java.util.concurrent.atomic.AtomicBoolean;
  22. import org.quartz.JobPersistenceException;
  23. import org.quartz.SchedulerException;
  24. import org.quartz.Trigger;
  25. import org.quartz.Trigger.CompletedExecutionInstruction;
  26. import org.quartz.spi.JobStore;
  27. import org.quartz.spi.OperableTrigger;
  28. import org.quartz.spi.TriggerFiredBundle;
  29. import org.quartz.spi.TriggerFiredResult;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. /**
  33. * <p>
  34. * The thread responsible for performing the work of firing <code>{@link Trigger}</code>
  35. * s that are registered with the <code>{@link QuartzScheduler}</code>.
  36. * </p>
  37. *
  38. * @see QuartzScheduler
  39. * @see org.quartz.Job
  40. * @see Trigger
  41. *
  42. * @author James House
  43. */
  44. public class QuartzSchedulerThread extends Thread {
  45. /*
  46. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  47. *
  48. * Data members.
  49. *
  50. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  51. */
  52. private QuartzScheduler qs;
  53. private QuartzSchedulerResources qsRsrcs;
  54. private final Object sigLock = new Object();
  55. private boolean signaled;
  56. private long signaledNextFireTime;
  57. private boolean paused;
  58. private AtomicBoolean halted;
  59. private Random random = new Random(System.currentTimeMillis());
  60. // When the scheduler finds there is no current trigger to fire, how long
  61. // it should wait until checking again...
  62. private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
  63. private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
  64. private int idleWaitVariablness = 7 * 1000;
  65. private final Logger log = LoggerFactory.getLogger(getClass());
  66. /*
  67. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  68. *
  69. * Constructors.
  70. *
  71. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  72. */
  73. /**
  74. * <p>
  75. * Construct a new <code>QuartzSchedulerThread</code> for the given
  76. * <code>QuartzScheduler</code> as a non-daemon <code>Thread</code>
  77. * with normal priority.
  78. * </p>
  79. */
  80. QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) {
  81. this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
  82. }
  83. /**
  84. * <p>
  85. * Construct a new <code>QuartzSchedulerThread</code> for the given
  86. * <code>QuartzScheduler</code> as a <code>Thread</code> with the given
  87. * attributes.
  88. * </p>
  89. */
  90. QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
  91. super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
  92. this.qs = qs;
  93. this.qsRsrcs = qsRsrcs;
  94. this.setDaemon(setDaemon);
  95. if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
  96. log.info( "QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
  97. this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
  98. }
  99. this.setPriority(threadPrio);
  100. // start the underlying thread, but put this object into the 'paused'
  101. // state
  102. // so processing doesn't start yet...
  103. paused = true;
  104. halted = new AtomicBoolean( false);
  105. }
  106. /*
  107. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  108. *
  109. * Interface.
  110. *
  111. * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  112. */
  113. void setIdleWaitTime(long waitTime) {
  114. idleWaitTime = waitTime;
  115. idleWaitVariablness = ( int) (waitTime * 0.2);
  116. }
  117. private long getRandomizedIdleWaitTime() {
  118. return idleWaitTime - random.nextInt(idleWaitVariablness);
  119. }
  120. /**
  121. * <p>
  122. * Signals the main processing loop to pause at the next possible point.
  123. * </p>
  124. */
  125. void togglePause(boolean pause) {
  126. synchronized (sigLock) {
  127. paused = pause;
  128. if (paused) {
  129. signalSchedulingChange( 0);
  130. } else {
  131. sigLock.notifyAll();
  132. }
  133. }
  134. }
  135. /**
  136. * <p>
  137. * Signals the main processing loop to pause at the next possible point.
  138. * </p>
  139. */
  140. void halt(boolean wait) {
  141. synchronized (sigLock) {
  142. halted.set( true);
  143. if (paused) {
  144. sigLock.notifyAll();
  145. } else {
  146. signalSchedulingChange( 0);
  147. }
  148. }
  149. if (wait) {
  150. boolean interrupted = false;
  151. try {
  152. while ( true) {
  153. try {
  154. join();
  155. break;
  156. } catch (InterruptedException _) {
  157. interrupted = true;
  158. }
  159. }
  160. } finally {
  161. if (interrupted) {
  162. Thread.currentThread().interrupt();
  163. }
  164. }
  165. }
  166. }
  167. boolean isPaused() {
  168. return paused;
  169. }
  170. /**
  171. * <p>
  172. * Signals the main processing loop that a change in scheduling has been
  173. * made - in order to interrupt any sleeping that may be occuring while
  174. * waiting for the fire time to arrive.
  175. * </p>
  176. *
  177. * @param candidateNewNextFireTime the time (in millis) when the newly scheduled trigger
  178. * will fire. If this method is being called do to some other even (rather
  179. * than scheduling a trigger), the caller should pass zero (0).
  180. */
  181. public void signalSchedulingChange(long candidateNewNextFireTime) {
  182. synchronized(sigLock) {
  183. signaled = true;
  184. signaledNextFireTime = candidateNewNextFireTime;
  185. sigLock.notifyAll();
  186. }
  187. }
  188. public void clearSignaledSchedulingChange() {
  189. synchronized(sigLock) {
  190. signaled = false;
  191. signaledNextFireTime = 0;
  192. }
  193. }
  194. public boolean isScheduleChanged() {
  195. synchronized(sigLock) {
  196. return signaled;
  197. }
  198. }
  199. public long getSignaledNextFireTime() {
  200. synchronized(sigLock) {
  201. return signaledNextFireTime;
  202. }
  203. }
  204. /**
  205. * <p>
  206. * The main processing loop of the <code>QuartzSchedulerThread</code>.
  207. * </p>
  208. */
  209. @Override
  210. public void run() {
  211. int acquiresFailed = 0;
  212. while (!halted.get()) {
  213. try {
  214. // check if we're supposed to pause...
  215. synchronized (sigLock) {
  216. while (paused && !halted.get()) {
  217. try {
  218. // wait until togglePause(false) is called...
  219. sigLock.wait( 1000L);
  220. } catch (InterruptedException ignore) {
  221. }
  222. // reset failure counter when paused, so that we don't
  223. // wait again after unpausing
  224. acquiresFailed = 0;
  225. }
  226. if (halted.get()) {
  227. break;
  228. }
  229. }
  230. // wait a bit, if reading from job store is consistently
  231. // failing (e.g. DB is down or restarting)..
  232. if (acquiresFailed > 1) {
  233. try {
  234. long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
  235. Thread.sleep(delay);
  236. } catch (Exception ignore) {
  237. }
  238. }
  239. //todo 获取可用线程的数量
  240. int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  241. //todo 将永远是true,由于blockForAvailableThreads的语义...
  242. if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
  243. //todo 定义触发器集合
  244. List<OperableTrigger> triggers;
  245. long now = System.currentTimeMillis();
  246. clearSignaledSchedulingChange();
  247. try {
  248. //todo 从jobStore中获取下次要触发的触发器集合
  249. // idleWaitTime == 30L * 1000L; 当调度程序发现没有当前触发器要触发,它应该等待多长时间再检查...
  250. triggers = qsRsrcs.getJobStore().acquireNextTriggers(
  251. now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
  252. acquiresFailed = 0;
  253. if (log.isDebugEnabled())
  254. log.debug( "batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
  255. } catch (JobPersistenceException jpe) {
  256. if (acquiresFailed == 0) {
  257. qs.notifySchedulerListenersError(
  258. "An error occurred while scanning for the next triggers to fire.",
  259. jpe);
  260. }
  261. if (acquiresFailed < Integer.MAX_VALUE)
  262. acquiresFailed++;
  263. continue;
  264. } catch (RuntimeException e) {
  265. if (acquiresFailed == 0) {
  266. getLog().error( "quartzSchedulerThreadLoop: RuntimeException "
  267. +e.getMessage(), e);
  268. }
  269. if (acquiresFailed < Integer.MAX_VALUE)
  270. acquiresFailed++;
  271. continue;
  272. }
  273. //todo 判断返回的触发器存在
  274. if (triggers != null && !triggers.isEmpty()) {
  275. now = System.currentTimeMillis();
  276. //todo 若有没有触发的Trigger,下次触发时间 next_fire_time 这个会在启动的时候有个默认的misfire机制,
  277. // setNextFireTime();
  278. // 即start()启动时候的当前时间。
  279. long triggerTime = triggers.get( 0).getNextFireTime().getTime();
  280. long timeUntilTrigger = triggerTime - now;
  281. while(timeUntilTrigger > 2) {
  282. synchronized (sigLock) {
  283. if (halted.get()) {
  284. break;
  285. }
  286. if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
  287. try {
  288. // we could have blocked a long while
  289. // on 'synchronize', so we must recompute
  290. now = System.currentTimeMillis();
  291. timeUntilTrigger = triggerTime - now;
  292. if(timeUntilTrigger >= 1)
  293. sigLock.wait(timeUntilTrigger);
  294. } catch (InterruptedException ignore) {
  295. }
  296. }
  297. }
  298. if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
  299. break;
  300. }
  301. now = System.currentTimeMillis();
  302. timeUntilTrigger = triggerTime - now;
  303. }
  304. // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
  305. //todo 这种情况发生,如果releaseIfScheduleChangedSignificantly 决定 释放Trigger
  306. if(triggers.isEmpty())
  307. continue;
  308. // set triggers to 'executing'
  309. //todo 将触发器设置为“正在执行”
  310. List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
  311. boolean goAhead = true;
  312. synchronized(sigLock) {
  313. goAhead = !halted.get();
  314. }
  315. if(goAhead) {
  316. try {
  317. //todo 通知JobStore调度程序现在正在触发其先前已获取(保留)的给定触发器(执行其关联的作业)。
  318. List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
  319. if(res != null)
  320. bndles = res;
  321. } catch (SchedulerException se) {
  322. qs.notifySchedulerListenersError(
  323. "An error occurred while firing triggers '"
  324. + triggers + "'", se);
  325. //QTZ-179 : a problem occurred interacting with the triggers from the db
  326. //we release them and loop again
  327. for ( int i = 0; i < triggers.size(); i++) {
  328. qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
  329. }
  330. continue;
  331. }
  332. }
  333. //todo 循环List<TriggerFiredResult> bndles 集合,获取TriggerFiredResult和TriggerFiredBundle等
  334. for ( int i = 0; i < bndles.size(); i++) {
  335. TriggerFiredResult result = bndles.get(i);
  336. TriggerFiredBundle bndle = result.getTriggerFiredBundle();
  337. Exception exception = result.getException();
  338. if (exception instanceof RuntimeException) {
  339. getLog().error( "RuntimeException while firing trigger " + triggers.get(i), exception);
  340. qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
  341. continue;
  342. }
  343. // it's possible to get 'null' if the triggers was paused,
  344. // blocked, or other similar occurrences that prevent it being
  345. // fired at this time... or if the scheduler was shutdown (halted)
  346. //todo 如果触发器被暂停,阻塞或其他类似的事件阻止它在这时被触发,
  347. // 或者如果调度器被关闭(暂停),则可以获得'null'
  348. if (bndle == null) {
  349. qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
  350. continue;
  351. }
  352. JobRunShell shell = null;
  353. try {
  354. //todo 创建 JobRunShell ,并初始化
  355. shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
  356. shell.initialize(qs);
  357. } catch (SchedulerException se) {
  358. qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
  359. continue;
  360. }
  361. //todo 提交任务到 worker 线程.......
  362. if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
  363. // this case should never happen, as it is indicative of the
  364. // scheduler being shutdown or a bug in the thread pool or
  365. // a thread pool being used concurrently - which the docs
  366. // say not to do...
  367. //todo 这种情况不应该发生,因为它表示调度程序正在关闭或线程池或线程池中并发使用的错误 - 文档说不要这样做...
  368. getLog().error( "ThreadPool.runInThread() return false!");
  369. qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
  370. }
  371. }
  372. continue; // while (!halted)
  373. }
  374. } else { // if(availThreadCount > 0)
  375. // should never happen, if threadPool.blockForAvailableThreads() follows contract
  376. //todo 应该永远不会发生,如果threadPool.blockForAvailableThreads()遵循约定
  377. continue; // while (!halted)
  378. }
  379. long now = System.currentTimeMillis();
  380. long waitTime = now + getRandomizedIdleWaitTime();
  381. long timeUntilContinue = waitTime - now;
  382. //todo idleWaitTime == 30L * 1000L; idleWaitVariablness == 7 * 1000;
  383. // 计算getRandomizedIdleWaitTime()的值 : idleWaitTime - random.nextInt(idleWaitVariablness);
  384. synchronized(sigLock) {
  385. try {
  386. if(!halted.get()) {
  387. // QTZ-336 A job might have been completed in the mean time and we might have
  388. // missed the scheduled changed signal by not waiting for the notify() yet
  389. // Check that before waiting for too long in case this very job needs to be
  390. // scheduled very soon
  391. if (!isScheduleChanged()) {
  392. sigLock.wait(timeUntilContinue);
  393. }
  394. }
  395. } catch (InterruptedException ignore) {
  396. }
  397. }
  398. } catch(RuntimeException re) {
  399. getLog().error( "Runtime error occurred in main trigger firing loop.", re);
  400. }
  401. } // while (!halted)
  402. // drop references to scheduler stuff to aid garbage collection...
  403. //todo 删除对调度程序内容的引用以帮助垃圾回收...
  404. qs = null;
  405. qsRsrcs = null;
  406. }
  407. private static final long MIN_DELAY = 20;
  408. private static final long MAX_DELAY = 600000;
  409. private static long computeDelayForRepeatedErrors(JobStore jobStore, int acquiresFailed) {
  410. long delay;
  411. try {
  412. delay = jobStore.getAcquireRetryDelay(acquiresFailed);
  413. } catch (Exception ignored) {
  414. // we're trying to be useful in case of error states, not cause
  415. // additional errors..
  416. delay = 100;
  417. }
  418. // sanity check per getAcquireRetryDelay specification
  419. if (delay < MIN_DELAY)
  420. delay = MIN_DELAY;
  421. if (delay > MAX_DELAY)
  422. delay = MAX_DELAY;
  423. return delay;
  424. }
  425. private boolean releaseIfScheduleChangedSignificantly(
  426. List<OperableTrigger> triggers, long triggerTime) {
  427. if (isCandidateNewTimeEarlierWithinReason(triggerTime, true)) {
  428. // above call does a clearSignaledSchedulingChange()
  429. for (OperableTrigger trigger : triggers) {
  430. qsRsrcs.getJobStore().releaseAcquiredTrigger(trigger);
  431. }
  432. triggers.clear();
  433. return true;
  434. }
  435. return false;
  436. }
  437. private boolean isCandidateNewTimeEarlierWithinReason(long oldTime, boolean clearSignal) {
  438. // So here's the deal: We know due to being signaled that 'the schedule'
  439. // has changed. We may know (if getSignaledNextFireTime() != 0) the
  440. // new earliest fire time. We may not (in which case we will assume
  441. // that the new time is earlier than the trigger we have acquired).
  442. // In either case, we only want to abandon our acquired trigger and
  443. // go looking for a new one if "it's worth it". It's only worth it if
  444. // the time cost incurred to abandon the trigger and acquire a new one
  445. // is less than the time until the currently acquired trigger will fire,
  446. // otherwise we're just "thrashing" the job store (e.g. database).
  447. //
  448. // So the question becomes when is it "worth it"? This will depend on
  449. // the job store implementation (and of course the particular database
  450. // or whatever behind it). Ideally we would depend on the job store
  451. // implementation to tell us the amount of time in which it "thinks"
  452. // it can abandon the acquired trigger and acquire a new one. However
  453. // we have no current facility for having it tell us that, so we make
  454. // a somewhat educated but arbitrary guess ;-).
  455. synchronized(sigLock) {
  456. if (!isScheduleChanged())
  457. return false;
  458. boolean earlier = false;
  459. if(getSignaledNextFireTime() == 0)
  460. earlier = true;
  461. else if(getSignaledNextFireTime() < oldTime )
  462. earlier = true;
  463. if(earlier) {
  464. // so the new time is considered earlier, but is it enough earlier?
  465. long diff = oldTime - System.currentTimeMillis();
  466. if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 70L : 7L))
  467. earlier = false;
  468. }
  469. if(clearSignal) {
  470. clearSignaledSchedulingChange();
  471. }
  472. return earlier;
  473. }
  474. }
  475. public Logger getLog() {
  476. return log;
  477. }
  478. } // end of QuartzSchedulerThread

 

 

 

 

 


转载:https://blog.csdn.net/zhanglong_4444/article/details/104322964
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场