quartz在集群环境下的配置方案

转载:quartz在集群环境下的最终解决方案

最近项目中使用了spring+Quartz定时任务、但是项目最近要集群部署、多个APP下如何利用Quartz 协调处理任务。
      大家可以思考一下、现在有 A、B、C三个应用同时作为集群服务器对外统一提供服务、每个应用下各有一个Quartz、它们会按照既定的时间自动执行各自的任务。我们先不说实现什么功能,就说这样的架构其实有点像多线程。那多线程里就会存在“资源竞争”的问题,即可能产生脏读,脏写,由于三台 应用 里都有 Quartz,因此会存在重复处理 任务 的现象。
      解决方案一:只在一台 应用 上装 Quartz,其它两台不装,这样集群就没有意义了。
      解决方案二:使用其实Quartz自身可以实例化数据库的特性就可以解决问题
本方案优点:
1.     每台作为集群点的 应用上都可以布署 Quartz ;
2.     Quartz 的 TASK ( 12 张表)实例化如数据库,基于数据库引擎及 High-Available 的策略(集群的一种策略)自动协调每个节点的 QUARTZ ,当任一一节点的 QUARTZ 非正常关闭或出错时,另几个节点的 QUARTZ 会自动启动;
3.    无需开发人员更改原已经实现的 QUARTZ ,使用 SPRING+ 类反射的机制对原有程序作切面重构;
解决方案:
   1:去官网下载最新的 quartz 解压 在目录 docsdbTables 下就会找到 tables_mysql.sql  文件、建立数据库Quartz 并导入数据库。
     
  2:生成 quartz.properties 文件,把它放在工程的 src 目录下 修改配置文件如下:

  1. #==============================================================    
  2. #Configure Main Scheduler Properties    
  3. #==============================================================     
  4. org.quartz.scheduler.instanceName = quartzScheduler  
  5. org.quartz.scheduler.instanceId = AUTO  

  6. #==============================================================    
  7. #Configure ThreadPool    
  8. #==============================================================   
  9. org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool  
  10. org.quartz.threadPool.threadCount = 10  
  11. org.quartz.threadPool.threadPriority = 5  
  12. org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true 

  13. #==============================================================    
  14. #Configure JobStore    
  15. #==============================================================   
  16. org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX  
  17. org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate  
  18. org.quartz.jobStore.tablePrefix = QRTZ_  
  19. org.quartz.jobStore.isClustered = true  
  20. org.quartz.jobStore.clusterCheckinInterval = 20000    
  21. org.quartz.jobStore.dataSource = myDS  
  22.    
  23. #==============================================================    
  24. #Configure DataSource     (此处填你自己的数据库连接信息)
  25. #==============================================================   
  26. org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver  
  27. org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=UTF-8  
  28. org.quartz.dataSource.myDS.user = root  
  29. org.quartz.dataSource.myDS.password = 123  
  30. org.quartz.dataSource.myDS.maxConnections =30  
复制代码

3:重写 quartz 的 QuartzJobBean 类 
原因是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误,原因在于:
这个 MethodInvokingJobDetailFactoryBean 类中的 methodInvoking 方法,是不支持序列化的,因此在把 QUARTZ 的 TASK 序列化进入数据库时就会抛错。网上有说把 SPRING 源码拿来,修改一下这个方案,然后再打包成 SPRING.jar 发布,这些都是不好的方法,是不安全的。
必须根据 QuartzJobBean 来重写一个自己的类 。
BootstrapJob.java: 引导Job,通过Spring容器获取任务的Job,根据注入的targetJob,该Job必须实现Job2接口

  1. /**
  2. * 引导Job,通过Spring容器获取任务的Job,根据注入的targetJob,该Job必须实现Job2接口
  3. * @author zzp
  4. * @date 2014-7-7
  5. */
  6. public class BootstrapJob implements Serializable{

  7.         private String targetJob ; 
  8.         
  9.         public void executeInternal(ApplicationContext cxt) {
  10.                 Job2 job = (Job2)cxt.getBean(this.targetJob);
  11.                 job.executeInternal() ;
  12.         }

  13.         public String getTargetJob() {
  14.                 return targetJob;
  15.         }

  16.         public void setTargetJob(String targetJob) {
  17.                 this.targetJob = targetJob;
  18.         }
  19. }
复制代码
  1. /**
  2. * Quartz 与 Spring 集成时,自定义的Job可以拥有Spring的上下文,
  3. * 因此定义了该接口,自定义的Job需要实现该接口,并实现executeInternal的task,
  4. * 这样解决了Quartz 与Spring 在集群环境下,可以不需要序列化,
  5. * 只需要在executeInternal获取Spring 上下文中的target job bean.
  6. * 调用其相关的处理函数,来处理任务
  7. * @author zzp
  8. * @date 2014-7-7
  9. */
  10. public interface Job2 extends Serializable{

  11.         /**
  12.          * 处理任务的核心函数
  13.          * 
  14.          * @param cxt Spring 上下文
  15.          */
  16.         void executeInternal();

  17. }
复制代码

重写 MethodInvokingJobDetailFactoryBean类 方法如下:


  1. <p>public void execute(JobExecutionContext context) throws JobExecutionException
  2.                 {
  3.                         try
  4.                         {
  5.                                 logger.debug("start");
  6.                                 String targetClass = context.getMergedJobDataMap().getString("targetClass");
  7.                                 //logger.debug("targetClass is "+targetClass);
  8.                                 Class targetClassClass = null;
  9.                                 if(targetClass!=null)
  10.                                 {
  11.                                         targetClassClass = Class.forName(targetClass); // Could throw ClassNotFoundException
  12.                                 }
  13.                                 Object targetObject = context.getMergedJobDataMap().get("targetObject");
  14.                                 if(targetObject instanceof BootstrapJob){
  15.                                         //Job2 job = (Job2)targetObject;
  16.                                         //job.executeInternal(context.getScheduler().getContext().)
  17.                                         ApplicationContext ac = (ApplicationContext)context.getScheduler().getContext().get("applicationContext");
  18.                                         BootstrapJob target = (BootstrapJob)targetObject ;
  19.                                         target.executeInternal(ac);
  20.                                 }else{
  21.                                         //logger.debug("targetObject is "+targetObject);
  22.                                         String targetMethod = context.getMergedJobDataMap().getString("targetMethod");
  23.                                         //logger.debug("targetMethod is "+targetMethod);
  24.                                         String staticMethod = context.getMergedJobDataMap().getString("staticMethod");
  25.                                         //logger.debug("staticMethod is "+staticMethod);
  26.                                         Object[] arguments = (Object[])context.getMergedJobDataMap().get("arguments");
  27.                                         //logger.debug("arguments are "+arguments);
  28.                                         
  29.                                         //logger.debug("creating MethodInvoker");
  30.                                         MethodInvoker methodInvoker = new MethodInvoker();
  31.                                         methodInvoker.setTargetClass(targetClassClass);
  32.                                         methodInvoker.setTargetObject(targetObject);
  33.                                         methodInvoker.setTargetMethod(targetMethod);
  34.                                         methodInvoker.setStaticMethod(staticMethod);
  35.                                         methodInvoker.setArguments(arguments);
  36.                                         methodInvoker.prepare();
  37.                                         //logger.info("Invoking: "+methodInvoker.getPreparedMethod().toGenericString());
  38.                                         methodInvoker.invoke();
  39.                                 }
  40.                         }
  41.                         catch(Exception e)
  42.                         {
  43.                                 throw new JobExecutionException(e);
  44.                         }
  45.                         finally
  46.                         {
  47.                                 logger.debug("end");
  48.                         }
  49.                 }
  50.         }</p><p>
  51. </p>
复制代码

QuartzDeleteQueAction 任务类、一定要实现接口job2、只做参考。

  1. public class QuartzDeleteQueAction implements Job2 {
  2.         private static final long serialVersionUID = 1L;
  3.         private IQuesGroupService quesGroupService;
  4.         public void executeInternal(){     
  5.              LogUtil.jobInfo("Quartz的任务调度执行删除教研组试题html文件开始");
  6.              try {
  7.                      ServletContext context = ContextLoader.getCurrentWebApplicationContext().getServletContext();
  8.                  String pathHtml = context.getRealPath(Constants.PATH_HTML);
  9.                      //获取被删除试题No
  10.                      List<Object> list =  quesGroupService.queryDeleteQues();
  11.                      for(Object obj:list){
  12.                              String quesName = pathHtml+"ques_"+obj.toString()+".html";
  13.                              FileUtil.delFile(quesName);//删除无用html文件
  14.                      }
  15.                 } catch (Exception e) {
  16.                         e.printStackTrace();
  17.                 }
  18.             LogUtil.jobInfo("Quartz的任务调度执行删除教研组试题html文件结束");
  19.     }
  20.     public IQuesGroupService getQuesGroupService() {
  21.             return quesGroupService;
  22.     }
  23.     public void setQuesGroupService(IQuesGroupService quesGroupService) {
  24.             this.quesGroupService = quesGroupService;
  25.     }
  26. }
复制代码



4:配置 applicationContext-job.xml:


  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4.         xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  6.             http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  7.             http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
  8.             http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" default-autowire="byName" default-lazy-init="true">
  9.         
  10.          <!-- 要调用的工作类 -->
  11.      
  12.      <bean id="quartzJob" class="com.web.action.QuartzDeleteQueAction"></bean>
  13.      <!-- 引导Job -->
  14.          <bean id="bootstrapJob" class="com.acts.web.quartz.BootstrapJob">
  15.                 <property name="targetJob" value="quartzJob" />
  16.          </bean>
  17.           
  18.          <!-- 重写方法 -->
  19.          <bean id="jobTask" class="com.acts.web.quartz.MethodInvokingJobDetailFactoryBean">
  20.                 <property name="concurrent" value="true" />
  21.                 <property name="targetObject" ref="bootstrapJob" />
  22.          </bean>
  23.          
  24.      <!-- 定义触发时间 -->
  25.      
  26.      <bean id="doTime" class="org.springframework.scheduling.quartz.CronTriggerBean">
  27.             <property name="jobDetail">
  28.                 <ref bean="jobTask"/>
  29.             </property>
  30.             <!-- cron表达式 -->
  31.             <property name="cronExpression">
  32.                 <!--5点到20点 每10分钟一次调度 -->
  33.                 <value>0 0/10 5-20 * * ?</value>
  34.             </property>
  35.      </bean>
  36.         <!-- 总管理类 如果将lazy-init='false'那么容器启动就会执行调度程序 -->
  37.         <bean id="startQuertz" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
  38.             <property name="configLocation" value="classpath:quartz.properties" /> 
  39.             <property name="dataSource" ref="dataSourceQuartz" />
  40.             <property name="triggers">
  41.                 <list>
  42.                     <ref bean="doTime"/>
  43.                     
  44.                 </list>
  45.             </property>
  46.             <!-- 就是下面这句,因为该 bean 只能使用类反射来重构 -->
  47.             <property name="applicationContextSchedulerContextKey" value="applicationContext" /> 
  48.       </bean>
  49. </beans>
复制代码

      此时手动 停掉那台运行 QUARTZ 过了 10分钟左右,另一个节点的 quartz 自动监测到了集群中运行着的 quartz 的 instance 已经 停掉 ,因此 quartz 集群会自动把任一台可用的 APP上启动起一个 quartz job 的任务。
     至此 Quartz使用 集群策略已经ok,不用改原有代码,配置一下我们就可做到 Quartz的集群与自动错误冗余。

所需jar包:quartz-all-1.6.6.jar   spring.jar  mysql-connector-java-3.1.11-bin.jar  commons-pool-1.3.jar  commons-logging-1.0.4.jar  commons-dbcp-1.2.1.jar

原文地址:https://www.cnblogs.com/kongweiteng/p/6733017.html