`
mengqingyu
  • 浏览: 328635 次
  • 性别: Icon_minigender_1
  • 来自: 天津
社区版块
存档分类
最新评论

Quartz(三)原理及源码分析

 
阅读更多
quartz配置文件中可以通过以下两种配置读取方式
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore(从内存中读取定时任务)
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX(从数据库中读取定时任务)

以JobStoreTX为例,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在服务器启动时初始化Scheduler时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger。
Scheduler:接口定义了对定时任务主要操作方法。
Trigger:定时器定义了任务执行时间。
JobDetail:定时任务一个JobDetail可以配置多个Trigger,主要定义了定时执行的类,方法,参数等信息。


任务定时执行:在服务器启动以后,每隔几秒至几十秒就会轮询StdJDBCDelegate类中selectTriggerToAcquire方法,执行sql查询qrtz_triggers表中NEXT_FIRE_TIME(下次触发时间)字段符合当前时间段并且TRIGGER_STATE状态为WAITING(等待中),符合此条件的Trigger查询出来,关联jobDetail来得到配置参数,调用配置好的业务方法。
	ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
	
	// Try to give jdbc driver a hint to hopefully not pull over 
	// more than the few rows we actually need.
	ps.setFetchSize(5);
	ps.setMaxRows(5);
	
	ps.setString(1, STATE_WAITING);
	ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
	ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
	rs = ps.executeQuery();
	
	while (rs.next() && nextTriggers.size() < 5) {
		nextTriggers.add(new Key(
				rs.getString(COL_TRIGGER_NAME),
				rs.getString(COL_TRIGGER_GROUP)));
	}
	
	return nextTriggers;


TRIGGER_STATE的更新:每次查找到匹配的Trigger,执行调度任务的同时,通过StdJDBCDelegate类的updateTriggerStateFromOtherState方法来更新TRIGGER_STATE状态为ACQUIRED(运行中),在执行完定时调度的业务方法时,将TRIGGER_STATE状态为WAITING(等待中)。
	ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATE_FROM_STATES));
	ps.setString(1, newState);
	ps.setString(2, triggerName);
	ps.setString(3, groupName);
	ps.setString(4, oldState1);
	ps.setString(5, oldState2);
	ps.setString(6, oldState3);

	return ps.executeUpdate();

NEXT_FIRE_TIME的更新:在暂停、恢复任务状态或是每次时间轮询时都会判断NEXT_FIRE_TIME值是否小于当前时间,如果小于当前时间将永远不会执行,所以quartz会自动通过表达式来计算,参照当前时间来计算下次执行时间,执行updateTrigger方法更新NEXT_FIRE_TIME,或是每次查找到匹配的Trigger,执行调度任务的同时也会更新NEXT_FIRE_TIME下次运行时间。
    if(updateJobData) {
		ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
	} else {
		ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
	}
		
	ps.setString(1, trigger.getJobName());
	ps.setString(2, trigger.getJobGroup());
	setBoolean(ps, 3, trigger.isVolatile());
	ps.setString(4, trigger.getDescription());
	long nextFireTime = -1;
	if (trigger.getNextFireTime() != null) {
		nextFireTime = trigger.getNextFireTime().getTime();
	}
	ps.setBigDecimal(5, new BigDecimal(String.valueOf(nextFireTime)));
	long prevFireTime = -1;
	if (trigger.getPreviousFireTime() != null) {
		prevFireTime = trigger.getPreviousFireTime().getTime();
	}
	ps.setBigDecimal(6, new BigDecimal(String.valueOf(prevFireTime)));
	ps.setString(7, state);
	if (trigger instanceof SimpleTrigger && ((SimpleTrigger)trigger).hasAdditionalProperties() == false ) {
		//                updateSimpleTrigger(conn, (SimpleTrigger)trigger);
		ps.setString(8, TTYPE_SIMPLE);
	} else if (trigger instanceof CronTrigger && ((CronTrigger)trigger).hasAdditionalProperties() == false ) {
		//                updateCronTrigger(conn, (CronTrigger)trigger);
		ps.setString(8, TTYPE_CRON);
	} else {
		//                updateBlobTrigger(conn, trigger);
		ps.setString(8, TTYPE_BLOB);
	}
	ps.setBigDecimal(9, new BigDecimal(String.valueOf(trigger
			.getStartTime().getTime())));	
	//代码略......

注意:在调试和测试quartz时,在局域网下如果没有做服务器集群,多个人启动了多台服务器,连接到同一台数据库服务器时会出现问题,因为quartz是多线程的,通过数据表记录来获得执行的任务,当多台服务器异步更新和读取的时候就会造成数据读取不同步问题,而出现bug。解决办法就是,测试时自己单独使用一个数据库,或是修改配置文件让其他人通过RAMJobStore读取,这样就不会造成多台机器同时读取一个库的问题发生了。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics