Quartz Scheduler调度流程分析

date: 2019-08-31

Demo

QuartzSample.java

public class QuartzSample {
    public static void main(String[] args) throws SchedulerException {
        StdSchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.start();

        JobDetail job = JobBuilder
                .newJob(SampleJob.class)
                .withIdentity("job01", "group01")
                .build();

        String cron = "0 00 10 * * ?";
        CronTrigger cronTrigger = TriggerBuilder
                .newTrigger()
                .withIdentity("cronTrigger")
                .forJob("job01", "group01")
                .withSchedule(CronScheduleBuilder.cronSchedule(cron))
                .startNow()
                .build();

        scheduler.scheduleJob(job, cronTrigger);
    }
}

SampleJob.java

public class SampleJob implements Job {

    public void execute(JobExecutionContext jobExecutionContext) {
        System.out.println("hello quartz!");
    }
}

其中SampleJob是用户自定义的Job类,用于处理业务逻辑。
整个调度过程都是围绕Scheduler类进行的,关键的三个语句分别如下:

  • 1.Scheduler scheduler = sf.getScheduler();
  • 2.scheduler.scheduleJob(job, cronTrigger);
  • 3.scheduler.start();

时序图如下:
时序图

创建调度器

StdSchedulerFactory.getScheduler()源码:

public Scheduler getScheduler() throws SchedulerException {
    // 读取quartz配置文件,未指定则顺序遍历各个path下的quartz.properties文件
    // 解析出quartz配置内容和环境变量,存入PropertiesParser对象
    // PropertiesParser组合了Properties(继承Hashtable),定义了一系列对Properties的操作方法,比如getPropertyGroup()批量获取相同前缀的配置。配置内容和环境变量存放在Properties成员变量中
    if (cfg == null) {
        initialize();
    }

     // 获取调度器池,采用了单例模式
    // 其实,调度器池的核心变量就是一个hashmap,每个元素key是scheduler名,value是scheduler实例
    // getInstance()用synchronized防止并发创建
    SchedulerRepository schedRep = SchedulerRepository.getInstance();

    // 从调度器池中取出当前配置所用的调度器
    Scheduler sched = schedRep.lookup(getSchedulerName());

    if (sched != null) {
        if (sched.isShutdown()) {
            schedRep.remove(getSchedulerName());
        } else {
            return sched;
        }
    }

    // 如果调度器池中没有当前配置的调度器,则实例化一个调度器,主要动作包括:
    // 1)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。先class load线程池类,接着动态生成线程池实例bean,然后通过反射,使用setXXX()方法将以org.quartz.threadPool开头的配置内容赋值给bean成员变量;
    // 2)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。先class load任务存储类,接着动态生成实例bean,然后通过反射,使用setXXX()方法将以org.quartz.jobStore开头的配置内容赋值给bean成员变量;
    // 3)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。jobStore要指定为JDBCJobStore,dataSource才会有效;
    // 4)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
    // 5)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;
    // 6)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;
    // 7)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程;
    // 8)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
    sched = instantiate();

    return sched;
}

下面从instantiate方法中抽出一些关键代码进行分析:

ThreadPool tp = null;
QuartzScheduler qs = null;
tp.initialize();
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
Scheduler scheduler = instantiate(rsrcs, qs);
return scheduler;
  • 1.创建工作线程
    SimpleThreadPool.initialize()关键源码:
...
// create the worker threads and start them
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
	WorkerThread wt = workerThreads.next();
	wt.start();
	availWorkers.add(wt);
}

创建若干个Worker线程,并且调用线程的start方法启动各个线程。

  • 2.创建调度器线程
    QuartzScheduler.QuartzScheduler(...):
this.schedThread = new QuartzSchedulerThread(this, resources);

创建QuartzSchedulerThread实例。

QuartzSchedulerThread.QuartzSchedulerThread(...):

...
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);

初始化paused和halted变量,用于控制该线程运行(run方法)。

  • 3.创建调度器
StdSchedulerFactory.instantiate():
Scheduler scheduler = new StdScheduler(qs);
return scheduler;

创建StdScheduler实例并返回。

调度器绑定任务和触发器

QuartzScheduler.scheduleJob(JobDetail jobDetail,Trigger trigger):

public Date scheduleJob(JobDetail jobDetail,
		Trigger trigger) throws SchedulerException {
	// 检查调度器是否开启,如果关闭则throw异常到上层
	validateState();
	...
	// 获取trigger首次触发job的时间,以此时间为起点,每隔一段指定的时间触发job
	Date ft = trig.computeFirstFireTime(cal);
	...
	// 把job和trigger注册进调度器的jobStore
	resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
	// 通知job监听者
	notifySchedulerListenersJobAdded(jobDetail);
	// 通知调度器线程
	notifySchedulerThread(trigger.getNextFireTime().getTime());
	// 通知trigger监听者
	notifySchedulerListenersSchduled(trigger);

	return ft;
}

调度器开始调度任务

QuartzScheduler.start():

public void start() throws SchedulerException {
	...
	// 这句最关键,通过变量使调度器线程跳出一个无限循环,开始轮询所有trigger触发job
	schedThread.togglePause(false);
	...
}

QuartzSchedulerThread.togglePause(boolean pause):

void togglePause(boolean pause) {
    synchronized (sigLock) {
        paused = pause;

        if (paused) {
            signalSchedulingChange(0);
        } else {
            sigLock.notifyAll();
        }
    }
}

QuartzSchedulerThread.run():

// 调度器线程一旦启动,将一直运行此方法
@Override
public void run() {
	...
    // while()无限循环,每次循环取出时间将到的trigger,触发对应的job,直到调度器线程被关闭
    // halted是一个AtomicBoolean类变量,有个volatile int变量value,其get()方法仅仅简单的一句return value != 0,get()返回结果表示调度器线程是否开关
    // volatile修饰的变量,存取必须走内存,不能通过cpu缓存,这样一来get总能获得set的最新真实值,因此volatile变量适合用来存放简单的状态信息
    // 顾名思义,AtomicBoolean要解决原子性问题,但volatile并不能保证原子性,详见http://blog.csdn.net/wxwzy738/article/details/43238089
    while (!halted.get()) {
		// check if we're supposed to pause...
		// sigLock是个Object对象,被用于加锁同步
		// 需要用到wait(),必须加到synchronized块内
		synchronized (sigLock) {
			while (paused && !halted.get()) {
				try {
					// wait until togglePause(false) is called...
					// 这里会不断循环等待,直到QuartzScheduler.start()调用了togglePause(false)
					// 调用wait(),调度器线程进入休眠状态,同时sigLock锁被释放
					// togglePause(false)获得sigLock锁,将paused置为false,使调度器线程能够退出此循环,同时执行sigLock.notifyAll()唤醒调度器线程
					sigLock.wait(1000L);
				} catch (InterruptedException ignore) {
				}

				// reset failure counter when paused, so that we don't
				// wait again after unpausing
				acquiresFailed = 0;
			}

			if (halted.get()) {
				break;
			}
		}

		...
		int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
		if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

			...
			// 获取马上到时间的trigger
			// 允许取出的trigger个数不能超过一个阀值,这个阀值是线程池个数与org.quartz.scheduler.batchTriggerAcquisitionMaxCount配置值间的最小者

			// 调度器在trigger队列中寻找30秒内一定数目的trigger(需要保证集群节点的系统时间一致)
			triggers = qsRsrcs.getJobStore().acquireNextTriggers(
					now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
			...

			if (triggers != null && !triggers.isEmpty()) {
				...

				// 触发trigger
				List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
				if(res != null)
					bndles = res;

				for (int i = 0; i < bndles.size(); i++) {
					...
					JobRunShell shell = null;
					shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
					shell.initialize(qs);
					...

					// 执行与trigger绑定的job
					// shell是JobRunShell对象,实现了Runnable接口
					// SimpleThreadPool.runInThread(Runnable)从线程池空闲列表中取出一个工作线程
					// 工作线程执行WorkerThread.run(Runnable),详见下方WorkerThread的讲解
					if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
						...
					}
				}

				continue; // while (!halted)
			}
		} else { continue;}
		...

    } // while (!halted)
	...
}

SimpleThreadPool.runInThread(Runnable runnable):

if (!isShutdown) {
    WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
    busyWorkers.add(wt);
    wt.run(runnable);
} else {
    ...
}

从availWorkers中取出一个工作线程执行其run(Runnable newRunnable)方法。

WorkerThread.run(Runnable newRunnable):

public void run(Runnable newRunnable) {
    synchronized(lock) {
        if(runnable != null) {
            throw new IllegalStateException("Already running a Runnable!");
        }

        runnable = newRunnable;
        lock.notifyAll();
    }
}

WorkerThread.run():

@Override
public void run() {
	boolean ran = false;
	
	while (run.get()) {
		try {
			synchronized(lock) {
				while (runnable == null && run.get()) {
					lock.wait(500);
				}

				if (runnable != null) {
					ran = true;
					runnable.run();
				}
			}
		} 
		...
	}
}

此时执行的是JobRunShell中的run方法。

JobRunShell.run():

public void run() {
	Job job = jec.getJobInstance();
	...
	job.execute(jec);
	...
}

至此,已经形成了一个完整的执行过程。

总的来说,核心代码就是在QuartzSchedulerThread.run()方法while循环中调用sigLock.wait(),等待可以跳出while循环的条件成立,当条件成立时,立马调度sigLock.notifyAll()使线程跳出while。通过这样的代码,可以实现调度器线程等待启动、工作线程等待job等功能。


参考:

原文地址:https://www.cnblogs.com/cloudflow/p/13894300.html