大鹏任务调度

大鹏任务调度

1.原理

    利用反射查找被标注的任务类,调起quartz提交任务处理并处理.

2.使用

    注意:实际使用时需要定义thrift文件生成接口或特质
    若想只在一个实例中执行可以用
    MasterHelper.isMasterisMaster(String servieName, String versionName)
    判断当前实例是master.
    
    /**
     * 特质声明
     **/
    trait SyncGoodsTaskService {
        def handle():Unit;
    }
    
    /**
     * 在类上使用注解@ScheduledTask 标注此服务类定时任务.   
     **/
    @ScheduledTask
    class SimpleSyncGoodsTaskService extends SyncGoodsTaskService{

        /**
         * 定义定时任务执行时间.
         */
        @ScheduledTaskCron(cron = "0 0 0 * * ?")
        override def handle(): Unit = {
            
        }
    }


    interface SomeTaskService{
        void dealBill();
    }
    
    /**
     * 标注此服务为任务处理服务.
     **/
    @ScheduledTask
    public class SimpleSomeTaskService extends SomeTaskService{
        
        /**
         * 定义 CRON 表达式.
         **/
        @ScheduledTaskCron(cron="0 30 0 0 0 0 ?")
        public void dealBill(){
            //...下载帐单
        }
    }

3.分析

    public class TaskSchedulePlugin implements AppListener, Plugin {
    
        
        private static final Logger LOGGER = LoggerFactory.getLogger("container.scheduled.task");
    
        private final Container container;
    
        private Scheduler scheduler = null;
    
        public TaskSchedulePlugin(Container container) {
            this.container = container;
            container.registerAppListener(this);
        }
    
        @Override
        public void appRegistered(AppEvent event) {
            LOGGER.warn(getClass().getSimpleName() + "::appRegistered, event[" + event.getSource() + "], do nothing here");
        }
    
        /**
         * 应用停止时,停止任务调度
         **/
        @Override
        public void appUnRegistered(AppEvent event) {
            LOGGER.warn(getClass().getSimpleName() + "::appUnRegistered, event[" + event.getSource() + "]");
            stop();
        }
    
        /**
         * 注册所有任务调度
         **/
        @Override
        public void start() {
            LOGGER.warn("Plugin::" + getClass().getSimpleName() + "::start");
            container.getApplications().forEach(application -> {
                List<ServiceInfo> serviceInfos = application.getServiceInfos().stream()
                        .filter(serviceInfo ->
                                serviceInfo.ifaceClass.isAnnotationPresent(ScheduledTask.class))
                        .collect(Collectors.toList());
                serviceInfos.forEach(serviceInfo -> runTask(serviceInfo));
            });
        }
    
        @Override
        public void stop() {
            LOGGER.warn("Plugin::TaskSchedulePlugin stop");
            try {
                if (scheduler != null) {
                    if (scheduler.isInStandbyMode() || !scheduler.isStarted()) {
                        LOGGER.info(" start to shutdown scheduler: " + scheduler.getSchedulerName());
                        scheduler.shutdown();
                    }
                }
            } catch (SchedulerException e) {
                LOGGER.error(" Failed to shutdown scheduler: " + e.getMessage(), e);
            }
        }
    
        public void runTask(ServiceInfo serviceInfo) {
            Class<?> ifaceClass = serviceInfo.ifaceClass;
    
            Map<ProcessorKey, SoaServiceDefinition<?>> processorMap = ContainerFactory.getContainer().getServiceProcessors();
    
            List<Method> taskMethods = Arrays.stream(ifaceClass.getMethods()).filter(method -> method.isAnnotationPresent(ScheduledTaskCron.class))
                    .collect(Collectors.toList());
    
            SoaServiceDefinition soaServiceDefinition = processorMap.get(new ProcessorKey(serviceInfo.serviceName,
                    serviceInfo.version));
    
            if (soaServiceDefinition == null) {
                LOGGER.error(" SoaServiceDefinition Not found....serviceName: {}, version: {} ", serviceInfo.serviceName, serviceInfo.version);
                return;
            }
    
            taskMethods.forEach(method -> {
                String methodName = method.getName();
    
                ScheduledTaskCron cron = method.getAnnotation(ScheduledTaskCron.class);
                String cronStr = cron.cron();
    
                //new quartz job
                JobDataMap jobDataMap = new JobDataMap();
                jobDataMap.put("function", soaServiceDefinition.functions.get(methodName));
                jobDataMap.put("iface", soaServiceDefinition.iface);
                jobDataMap.put("serviceName", serviceInfo.serviceName);
                jobDataMap.put("versionName", serviceInfo.version);
                JobDetail job = JobBuilder.newJob(ScheduledJob.class)
                        .withIdentity(ifaceClass.getName() + ":" + methodName)
                        .setJobData(jobDataMap)
                        .build();
    
                CronTriggerImpl trigger = new CronTriggerImpl();
                trigger.setName(job.getKey().getName());
                trigger.setJobKey(job.getKey());
                try {
                    trigger.setCronExpression(cronStr);
                } catch (ParseException e) {
                    LOGGER.error("定时任务({}:{})Cron解析出错", ifaceClass.getName(), methodName);
                    LOGGER.error(e.getMessage(), e);
                    return;
                }
    
                if (scheduler == null) {
                    try {
                        scheduler = StdSchedulerFactory.getDefaultScheduler();
                        scheduler.start();
                    } catch (SchedulerException e) {
                        LOGGER.error("ScheduledTaskContainer启动失败");
                        LOGGER.error(e.getMessage(), e);
                        return;
                    }
                }
                try {
                    scheduler.scheduleJob(job, trigger);
                } catch (SchedulerException e) {
                    LOGGER.error(" Failed to scheduleJob....job: " + job.getKey().getName() + ", reason:" + e.getMessage(),
                            e);
                    return;
                }
                LOGGER.info("添加定时任务({}:{})成功", ifaceClass.getName(), methodName);
            });
        }
    }
原文地址:https://www.cnblogs.com/hhbk/p/9546647.html