架构师养成记--10.master-worker模式

  master-worker模式是一种并行计算模式,分为master进程和worker进程两个部分,master是担任总管角色,worker才是执行具体任务的地方。

  总体流程应该是这样的:

具体一点,代码实现流程应该是这样的:

client:

import java.util.Random;

public class Main {

    public static void main(String[] args) {
        
        Master master = new Master(new Worker(), 20);//并发数20,也就是有20个Worker在工作
        
        Random r = new Random();
        for(int i = 1; i <= 100; i++){//总共有100个任务
            Task t = new Task();
            t.setId(i);
            t.setPrice(r.nextInt(1000));
            master.submit(t);//提交任务,向WorkerQueue<Task> 中加入元素
        }
        master.execute();//启动所有的worker
        long start = System.currentTimeMillis();
        
        while(true){
            if(master.isComplete()){//100个任务执行完成
                long end = System.currentTimeMillis() - start;
                int priceResult = master.getResult();//获取所有任务的执行结果
                System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
                break;
            }
        }
        
    }
}

 Master:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

    //1 有一个盛放任务的容器
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
    
    //2 需要有一个盛放worker的集合
    private HashMap<String, Thread> workers = new HashMap<String, Thread>();
    
    //3 需要有一个盛放每一个worker执行任务的结果集合
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
    
    //4 构造方法
    public Master(Worker worker , int workerCount){
        worker.setWorkQueue(this.workQueue);
        worker.setResultMap(this.resultMap);
        
        for(int i = 0; i < workerCount; i ++){
            this.workers.put(Integer.toString(i), new Thread(worker));
        }
        
    }
    
    //5 需要一个提交任务的方法
    public void submit(Task task){
        this.workQueue.add(task);
    }
    
    //6 需要有一个执行的方法,启动所有的worker方法去执行任务
    public void execute(){
        for(Map.Entry<String, Thread> me : workers.entrySet()){
            me.getValue().start();
        }
    }

    //7 判断是否运行结束的方法
    public boolean isComplete() {
        for(Map.Entry<String, Thread> me : workers.entrySet()){
            if(me.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }        
        return true;
    }

    //8 计算结果方法
    public int getResult() {
        int priceResult = 0;
        for(Map.Entry<String, Object> me : resultMap.entrySet()){
            priceResult += (Integer)me.getValue();
        }
        return priceResult;
    }

Worker:

 1 import java.util.concurrent.ConcurrentHashMap;
 2 import java.util.concurrent.ConcurrentLinkedQueue;
 3 
 4 public class Worker implements Runnable {
 5 
 6     private ConcurrentLinkedQueue<Task> workQueue;
 7     private ConcurrentHashMap<String, Object> resultMap;
 8     
 9     public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
10         this.workQueue = workQueue;
11     }
12 
13     public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
14         this.resultMap = resultMap;
15     }
16     
17     @Override
18     public void run() {
19         while(true){
20             Task input = this.workQueue.poll();
21             if(input == null) break;
22             Object output = handle(input);
23             this.resultMap.put(Integer.toString(input.getId()), output);
24         }
25     }
26 
27     private Object handle(Task input) {
28         Object output = null;
29         try {
30             //处理任务的耗时。。 比如说进行操作数据库。。。
31             Thread.sleep(500);
32             output = input.getPrice();
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         }
36         return output;
37     }
38 
39 
40 
41 }

Task:

public class Task {

    private int id;
    private int price ;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    } 
    
}
原文地址:https://www.cnblogs.com/sigm/p/6232561.html