forkJoinPool + mysql乐观锁实现定时任务唯一性

场景: 有一个爬虫任务,每一个任务需要花费的时间都特别的长,任务数量较大,200多万的样子,需要保证每一个执行的任务不会被重复执行,不然会非常地浪费时间。

forkJoinPool: 每次定时任务执行的时候用forkJoinPool去数据库查询多个任务,接着每个任务启动一个线程,充分利用计算机的cpu,提高效率

mysql乐观锁:在保证每台机器的执行不会重复的基础上,加入乐观锁,每台机器的某一个线程想要执行任务之前先去获取一把锁,获取到则执行,减少重复执行的时间,大大提高效率.

定时任务代码如下:

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.ForkJoinPool;

/**
 * @Description
 * @Date 2020/6/12 15:01
 * @Author dengxiaoyu
 */
@EnableScheduling
@Component
@Slf4j
@AllArgsConstructor
public class Task {
    private final IRunnerService runnerService;

    /**
     * 查询公司的社交信息
     */
    @Scheduled(fixedDelay =  30 * 1000L)
    public void searchSocialInfo() {
        log.info("开始查询公司社交信息");
        JSONArray taskArray = runnerService.pickNewTaskForExeStatus(1, 5);
        if (taskArray == null || taskArray.size() < 1) {
            log.info("暂无拉取到任务");
            return;
        }
        List<TaskResponseVO> taskResponseVOS = JSONObject.parseArray(taskArray.toJSONString(), TaskResponseVO.class);
        log.info("本次拉取到的任务个数[{}]", taskResponseVOS.size());
        long start = System.currentTimeMillis();
        ForkJoinSocialSearchTask task = new ForkJoinSocialSearchTask(taskResponseVOS, runnerService);
        ForkJoinPool forkJoinPool = new ForkJoinPool(5);
        forkJoinPool.execute(task);
        System.out.println("执行shutdown");
        forkJoinPool.shutdown();
        while (true) {
            if (forkJoinPool.isTerminated()) {
                System.out.println("所有的子线程都结束了!");
                break;
            }
        }
        System.out.println("线程池执行完毕");
        long end = System.currentTimeMillis();
        log.info("本次查询公司社交信息消耗的时间为{}毫秒", (end - start));

    }


}

forkJoinPoolTask代码

import lombok.extern.slf4j.Slf4j;


import java.util.List;
import java.util.concurrent.RecursiveAction;

/**
 * @Description 社交信息搜索
 * @Date 2020/6/15 9:45
 * @Author dengxiaoyu
 */
@Slf4j
public class ForkJoinSocialSearchTask extends RecursiveAction {
    /**
     * 最小阈值
     */
    private static final int MIN_THRESHOLD = 1;
    private List<TaskResponseVO> taskList;
    private IRunnerService runnerService;

    @Override
    protected void compute() {
        if (MIN_THRESHOLD == taskList.size()) {
            // 顺序执行该任务
            TaskResponseVO taskResponseVO = taskList.get(0);
            int taskId = taskResponseVO.getId();
            Integer lockSign = runnerService.acquireTaskLock(taskId);
            log.info("获取到任务锁,任务id{},获取到的锁标志{}", taskId, lockSign);
            if (lockSign != null && lockSign.equals(1)) {
                log.info("获取到任务锁,任务id{},开始执行任务", taskId);
                String officialWebsite = taskResponseVO.getOfficialWebsite();
                log.info("社交信息搜索开始执行任务,任务id{}, 公司官网{}", taskId, officialWebsite);
                long startTime = System.currentTimeMillis();
                runnerService.processSocialTask(officialWebsite, taskId);
                long endTime = System.currentTimeMillis();
                log.info("社交信息搜索任务执行完成,任务id{}, 耗时{}毫秒", taskId, endTime - startTime);
            } else {
                log.info("没有获取到任务锁,任务id{}", taskId);
            }
        } else {
            /**
             * 由于每个任务执行时间可能很长,将每个任务用一个线程单独执行,确保每个线程不会执行重复,
             * 比如,当taskList为10时,会拆分成10个线程执行
             */
            // 将任务分成两个子任务
            int size = taskList.size();
            // 取中间值
            int mid = size >> 1;
            // 递归调用本方法,拆分每个子任务
            ForkJoinSocialSearchTask left = new ForkJoinSocialSearchTask(taskList.subList(0, mid), runnerService);
            ForkJoinSocialSearchTask right = new ForkJoinSocialSearchTask(taskList.subList(mid, size - 1),
                    runnerService);
            left.fork();
            right.fork();
        }
    }

    public ForkJoinSocialSearchTask(List<TaskResponseVO> taskList, IRunnerService runnerService) {
        this.taskList = taskList;
        this.runnerService = runnerService;
    }

    public ForkJoinSocialSearchTask() {
    }
}
原文地址:https://www.cnblogs.com/kiwi-deng/p/13201685.html