并行模式之Master-Worker模式

并行模式之Master-Worker模式

一)、Master-Worker模式

作用: 将一个大任务分解成若干个小任务,分发给多个子线程执行。

注: 将大任务分解成小任务,小任务的实现逻辑要相同。

二)、Master-Worker模式的结构

Master-Worker的核心思想:由Master进程和Worker进程实现。

1)、Master进程:

接收和分配任务,整合最终的处理结果。

内部结构组成:

1.Worker进程队列

作用:执行Master分配的任务,开启多线程处理数据。

2.子任务队列

 作用:接收任务。

3.子结果集

 作用:整合多个线程的最终处理结果。

注:Master进程分配任务后会立即返回,不会等待系统全部处理完返回,处理过

     程是异步的,client不会出现等待状态。         

2)、Worker进程

实现Runable接口,定义一个handle()方法,在方法里实现小任务的业务逻辑,在run()方法中调用handle()方法

内部结构组成:

1.子任务队列

作用:接收Master分配的任务。

2.结果集队列

 作用:将处理的结果返给Master

3.handle()方法

作用:定义小任务的业务逻辑。

注:所有的Wroker对象共享Master的Worker队列,和结果集.

三)、Master-Worker的代码实现

使用Master-Worker模式实现 :

计算 1~100的立方和( 1^3 + 2^3 + ... + 100^3 )

思路:将计算任务分解为100个小任务,每个子任务计算单独的立方和,最后将计

         算结果返回给master。

Master类:

/**
 * Master类:
 *   内部维护了一个Worker进程队列、任务队列、结果集
 */
public class Master {
    /**
     * 任务队列, ConcurrentLinkedDeque: 基于链表的并发队列
     */
    protected Queue workerQueue = new ConcurrentLinkedDeque();

    /**
     * Worker进程队列
     */
    Map<String, Thread> threadMap = new HashMap<>();

    /**
     * 结果集
     */
    Map<String, Object> resultMap = new HashMap<>();

    /**
     * Master的构造,需要一个进程对象和进程数量
     */
    public Master(Worker worker, int countWorker){
        //将master的workerQueue和resultMap和Worker关联
        worker.setWorkerQueue(workerQueue);
        worker.setResultMap(resultMap);
        //根据需要的进程数量,创建进程
        for(int i = 0; i < countWorker; i++){
            //Integer.toString(i)为线程的名字
            threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
        }
    }

    /**
     * master进程提交任务
     */
    public void submit(Object input){
        workerQueue.add(input);
    }

    /**
     * master分配任务,执行任务,开启多线程
     */
    public void execute(){
        for(Map.Entry<String,Thread> entry : threadMap.entrySet()){
            entry.getValue().start();

        }
    }

    /**
     * 判断是否所有的子任务都结束了
     */
    public boolean isComplete(){
        //判断worker进程的状态是否都等于终止状态
        for(Map.Entry<String,Thread> entry : threadMap.entrySet()){
            if(entry.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    public Queue getWorkerQueue() {
        return workerQueue;
    }

    public void setWorkerQueue(Queue workerQueue) {
        this.workerQueue = workerQueue;
    }

    public Map<String, Thread> getThreadMap() {
        return threadMap;
    }

    public void setThreadMap(Map<String, Thread> threadMap) {
        this.threadMap = threadMap;
    }

    public Map<String, Object> getResultMap() {
        return resultMap;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
}

Worker类:实现Runable接口,定义handle()方法,在run()中调用handle()

/**
 * Worker类:
 *   小任务的逻辑实现类
 */
public class Worker implements Runnable{
    /**
     * 接收master的workerQueue,用于开启线程处理多个小任务
     */
    protected Queue workerQueue;

    /**
     * 接收master的resultMap,将各个小任务的处理结果返回给Master
     */
    protected Map<String, Object> resultMap;

    /**
     * 小线程的具体逻辑,接收任务并处理任务
     * 参数: input,分解的小任务
     */
    public Object handle(Object input){
        return input;
    }

    /**
     * 从任务列表中获取任务,调用handle()方法,执行任务
     */
    @Override
    public void run() {
        while(true){
            //获取任务列表的任务
            Object input = workerQueue.poll();
            //判断任务列表是否还有任务,若没有,则终止线程。
            if(input == null){
                break;
            }
            //有任务,调用handle()方法,执行任务
            Object result = handle(input);
            //将结果放在master的resultMap中
            resultMap.put(Integer.toString(input.hashCode()),result);
        }
    }

    public Queue getWorkerQueue() {
        return workerQueue;
    }

    public void setWorkerQueue(Queue workerQueue) {
        this.workerQueue = workerQueue;
    }

    public Map<String, Object> getResultMap() {
        return resultMap;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

}

PlushWorker类:继承了Worker类,重写handle()方法,定义任务的处理逻辑

**
 * 继承Worker类,定义handle()的业务逻辑
 */
public class PlushWorker extends Worker{
    /**
     * 对任务进行处理,在run()方法中调用该方法
     * @param input
     * @return
     */
    @Override
    public Object handle(Object input){
        int i = (Integer)input;
        return i * i * i;
    }
}

main类:使用master计算 1~100的立方和( 1^3 + 2^3 + ... + 100^3 )

/**
 * 使用Master-Worker模式实现 :
 *    计算 1~100的立方和( 1^3 + 2^3 + ... + 100^3 )
 *    1.将计算任务分解为100个小任务,每个子任务计算单独的立方和,最后将计算结果返回给master
 */
public class CubicAndCalculate {
    public static void main(String[] args) {
        //创建Master对象,开启五个线程对任务进行处理
        Master master = new Master(new PlushWorker(), 5);
        //将大任务分解成100个小任务
        for(int i = 1; i <= 100; i++){
            master.submit(i);
        }
        //开启多线程,并发的处理多个小任务
        master.execute();
        //master可以不用等全部的任务执行完便可以获取任务的执行结果
        Map<String, Object> resultMap = master.getResultMap();
        Integer re = 0;
        //加入两个判断是因为,每取出一个resultMap中的数据,都要对该数据删除,若消费resultMap的速度比生成的快,可通过判断任务线程是否执行完毕来控制数据的获取
        while(resultMap.size() > 0 || !master.isComplete()){
            //获取map的所有键
            Set<String> set = resultMap.keySet();
            String key = null;
            for(String k : set){
                key = k;
                break;
            }
            Integer i = null;
            //判断map中是否有数据,若key值为null,就不进行获取值操作
            if(key != null){
                i = (Integer) resultMap.get(key);
            }
            //如果key存在,i !=null,则对返回的部分的任务的立方和进行累加,并移除resultMap中对应的项
            if(i != null){
                re += i;
                resultMap.remove(key);
            }
        }
        System.out.println(re);
    }
}

结果:

25502500
金麟岂能忍一世平凡 飞上了青天 天下还依然
原文地址:https://www.cnblogs.com/Auge/p/11692725.html