Spring Batch源码阅读-JobLauncher之SimpleJobLauncher(四)

启动job

spring batch针对JobLauncher只有一个实现就是SimpleJobLauncher具体可以看《Spring Batch源码阅读-初始化(三)》的实现

 1 @Controller
 2 public class OrderSyncJobController {
 3     @Autowired
 4     private ModularBatchConfiguration modularBatchConfiguration;
 5 
 6     /**
 7      * 启动任务,如果任务失败,再次调用则是重新执行
 8      * BatchAutoConfiguration 初始化
 9      */
10     @Autowired
11     private JobOperator jobOperator;
12 
13     @RequestMapping("/startOrderJob")
14     @ResponseBody
15     public String startOrderJob() throws Exception {
16         Map<String, JobParameter> parameters = new HashMap<>();
17         parameters.put("date", new JobParameter(13L));
18         JobParameters jobParameters = new JobParameters(parameters);
19         Job job = modularBatchConfiguration.jobRegistry().getJob("syncOrderJob");
20         //通过jobLauncher 启动job<1>
21         modularBatchConfiguration.jobLauncher().run(job, jobParameters);
22         return "ddd";
23     }
24 
25 }

SimpleJobLauncher

<1>run

 @Override
    public JobExecution run(final Job job, final JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
            JobParametersInvalidException {

        Assert.notNull(job, "The Job must not be null.");
        Assert.notNull(jobParameters, "The JobParameters must not be null.");

        final JobExecution jobExecution;
        //获取job的最后一个execution执行记录 一个job可以执行多次<2>
        JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
        if (lastExecution != null) {
            //job是否支持多次启动默认是true 我们可以初始化job的时候动态加载
            if (!job.isRestartable()) {
                throw new JobRestartException("JobInstance already exists and is not restartable");
            }
            /**
             * 获取最后一次执行Executing的 stepExecution信息
             */
            for (StepExecution execution : lastExecution.getStepExecutions()) {
                BatchStatus status = execution.getStatus();
                //如果任意一个step正处于停止或者运行中则抛错
                if (status.isRunning() || status == BatchStatus.STOPPING) {
                    throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                            + lastExecution);
                } else if (status == BatchStatus.UNKNOWN) {//最后一次step运行出现未知异常也报错
                    throw new JobRestartException(
                            "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
                                    + "The last execution ended with a failure that could not be rolled back, "
                                    + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
                }
            }
        }
        /**
         * 校验jobParameters 我们通过以下方式设置校验器
         * modularBatchConfiguration.jobBuilders()
         *                 .get("demoJob")
         *                 .validator(new JobParametersValidator() {
         *                     @Override
         *                     public void validate(JobParameters parameters) throws JobParametersInvalidException {
         *                         //执行校验逻辑 校验不通过则抛错
         *                     }
         *                 })
         */
        job.getJobParametersValidator().validate(jobParameters);
        //每次job 启动都会创建一个新的Executing 这里则是创建并持久化
        jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

        try {
            //默认设置的是异步的处理器
            taskExecutor.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
                                    + "]");
                        }
                        //调用对应job的execute
                        job.execute(jobExecution);
                        if (logger.isInfoEnabled()) {
                            Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime());
                            logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
                                    + "] and the following status: [" + jobExecution.getStatus() + "]"
                                    + (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
                        }
                    }
                    catch (Throwable t) {
                        if (logger.isInfoEnabled()) {
                            logger.info("Job: [" + job
                                    + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
                                    + "]", t);
                        }
                        rethrow(t);
                    }
                }

                private void rethrow(Throwable t) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException) t;
                    }
                    else if (t instanceof Error) {
                        throw (Error) t;
                    }
                    throw new IllegalStateException(t);
                }
            });
        }
        catch (TaskRejectedException e) {
            //根据job异常修改jobExecuting状态
            jobExecution.upgradeStatus(BatchStatus.FAILED);
            if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
                jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
            }
            jobRepository.update(jobExecution);
        }

        return jobExecution;
    }

 <2>

一般我们都是使用mysql 所以参考mysql的实现就行

org.springframework.batch.core.repository.support.SimpleJobRepository#getLastJobExecution

 @Nullable
    public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
        //根据job名字和jobParameter获取 jobInstance<3>
        JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
        if (jobInstance == null) {
            return null;
        }
        //获得最新一次JobExecution
        JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);

        if (jobExecution != null) {
            //如果有的话 则获取jobEexcution的执行纪律
            jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution));
            stepExecutionDao.addStepExecutions(jobExecution);
        }
        return jobExecution;

    }

<3> 

org.springframework.batch.core.repository.dao.JdbcJobInstanceDao#getJobInstance(java.lang.String, org.springframework.batch.core.JobParameters)

 @Override
    @Nullable
    public JobInstance getJobInstance(final String jobName,
                                      final JobParameters jobParameters) {

        Assert.notNull(jobName, "Job name must not be null.");
        Assert.notNull(jobParameters, "JobParameters must not be null.");

        //通过jobKeyGenerator 获取jobKey 我们可以自定义 默认是org.springframework.batch.core.DefaultJobKeyGenerator
        String jobKey = jobKeyGenerator.generateKey(jobParameters);

        RowMapper<JobInstance> rowMapper = new JdbcJobInstanceDao.JobInstanceRowMapper();

        List<JobInstance> instances;
        //根据key查询job 这也就是为什么我们一个job执行成功后 要换参数才能启动成功
        if (StringUtils.hasLength(jobKey)) {
            instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY),
                    rowMapper, jobName, jobKey);
        } else {
            instances = getJdbcTemplate().query(
                    getQuery(FIND_JOBS_WITH_EMPTY_KEY), rowMapper, jobName,
                    jobKey);
        }

        if (instances.isEmpty()) {
            return null;
        } else {
            Assert.state(instances.size() == 1, "instance count must be 1 but was " + instances.size());
            return instances.get(0);
        }
    }
原文地址:https://www.cnblogs.com/LQBlog/p/15440052.html