并发模型之Master-Worker设计模式

一、Master-Worker设计模式

Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完后,将结果返回给Master进程,由Master进行归纳和汇总,从而得到系统结果。

Master-Worker模式的好处是,它能将大任务分解成若干个小任务,并发执行,从而提高系统性能。而对于系统请求者Client来说,任务一旦提交,Master进程就会立刻分配任务并立即返回,并不会等系统处理完全部任务再返回,其处理过程是异步的。

二、Master-Worker设计模式代码实现

1、创建Task任务对象

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 /**
 4  * Created by Root on 5/12/2017.
 5  */
 6 public class Task {
 7 
 8     private int id;
 9 
10     private String name;
11 
12     private int price;
13 
14     public int getId() {
15         return id;
16     }
17 
18     public void setId(int id) {
19         this.id = id;
20     }
21 
22     public String getName() {
23         return name;
24     }
25 
26     public void setName(String name) {
27         this.name = name;
28     }
29 
30     public int getPrice() {
31         return price;
32     }
33 
34     public void setPrice(int price) {
35         this.price = price;
36     }
37 }

2、实现Worker对象

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 import java.util.concurrent.ConcurrentHashMap;
 4 import java.util.concurrent.ConcurrentLinkedQueue;
 5 
 6 /**
 7  * Created by Root on 5/12/2017.
 8  */
 9 public class Worker implements Runnable {
10 
11     private ConcurrentLinkedQueue<Task> workQueue;
12     private ConcurrentHashMap<String, Object> resultMap;
13 
14     public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
15         this.workQueue = workQueue;
16     }
17 
18     public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
19         this.resultMap = resultMap;
20     }
21 
22     @Override
23     public void run() {
24         while (true) {
25             Task input = this.workQueue.poll();
26             if (input == null) {
27                 break;
28             }
29             // 真正的去做业务处理
30             //Object output = handle(input);
31             // 改造
32             Object output = MyWorker.handle(input);
33             // 返回处理结果集
34             this.resultMap.put(Integer.toString(input.getId()), output);
35         }
36     }
37 
38 //    private Object handle(Task input) {
39 //        Object output = null;
40 //        try {
41 //            // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
42 //            Thread.sleep(500);
43 //            output = input.getPrice();
44 //        } catch (InterruptedException e) {
45 //            e.printStackTrace();
46 //        }
47 //        return output;
48 //    }
49 
50     // 优化,考虑让继承类去自己实现具体的业务处理
51     public static Object handle(Task input) {
52         return null;
53     }
54 
55 }

3、为了使程序更灵活,将具体的业务执行逻辑抽离,在具体的Worker对象去实现,如这里的MyWorker对象

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 /**
 4  * Created by Root on 5/13/2017.
 5  */
 6 public class MyWorker extends Worker {
 7 
 8     public static Object handle(Task input) {
 9         Object output = null;
10         try {
11             // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
12             Thread.sleep(500);
13             output = input.getPrice();
14         } catch (InterruptedException e) {
15             e.printStackTrace();
16         }
17         return output;
18     }
19 
20 }

4、Master类

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.ConcurrentHashMap;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 
 8 /**
 9  * Created by Root on 5/12/2017.
10  */
11 public class Master {
12 
13     // 1、使用一个ConcurrentLinkedQueue集合来装载所有需要执行的任务
14     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
15 
16     // 2、使用HashMap来装载所有的worker对象
17     private HashMap<String, Thread> workers = new HashMap<String, Thread>();
18 
19     // 3、使用一个容器承装每一个worker并发执行任务的结果集
20     private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
21 
22     // 4、构造方法
23     public Master(Worker worker, int workerCount) {
24         // 每一个worker对象都需要有Master的引用,workQueue用于任务的领取,resultMap用于任务的提交
25         worker.setWorkerQueue(this.workQueue);
26         worker.setResultMap(this.resultMap);
27 
28         for (int i = 0; i < workerCount; i++) {
29             workers.put("子节点" + Integer.toString(i), new Thread(worker));
30         }
31     }
32 
33     // 5、提交方法
34     public void submit(Task task) {
35         this.workQueue.add(task);
36     }
37 
38     // 6、需要有一个执行的方法(启动应用程序,让所有的worker工作)
39     public void execute() {
40         for (Map.Entry<String, Thread> me : workers.entrySet()) {
41             me.getValue().start();
42         }
43     }
44 
45     // 7、判断线程是否执行完毕
46     public boolean isComplete() {
47         for (Map.Entry<String, Thread> me : workers.entrySet()) {
48             // 判断所有的线程状态是否属于已停止状态
49             if (me.getValue().getState() != Thread.State.TERMINATED) {
50                 return false;
51             }
52         }
53         return true;
54     }
55 
56     // 8、返回结果集数据
57     public int getResult() {
58         int ret = 0;
59         for (Map.Entry<String, Object> me : resultMap.entrySet()) {
60             // 汇总逻辑
61             ret += (Integer) me.getValue();
62         }
63         return ret;
64     }
65 
66 }

5、测试,具体调用实现

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 import java.util.Random;
 4 
 5 /**
 6  * Created by Root on 5/13/2017.
 7  */
 8 public class MasterWorkerTest {
 9 
10     public static void main(String[] args) {
11 
12 //        Master master = new Master(new Worker(), 10);
13         // 改造
14 //        Master master = new Master(new MyWorker(), 10);
15         // 改造(获取当前机器可用线程数)
16         System.out.println("我的机器可用Processors数量:" + Runtime.getRuntime().availableProcessors());
17         Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
18 
19         Random r = new Random();
20         for (int i = 1; i <= 10; i++) {
21             Task t = new Task();
22             t.setId(i);
23             t.setName("任务" + i);
24             t.setPrice(r.nextInt(1000));
25             master.submit(t);
26         }
27         master.execute();
28 
29         long start = System.currentTimeMillis();
30 
31         while (true) {
32             if (master.isComplete()) {
33                 long end = System.currentTimeMillis() - start;
34                 int ret = master.getResult();
35                 System.out.println("最终的结果:" + ret + ",执行耗时:" + end);
36                 break;
37             }
38         }
39     }
40 
41 }

程序输出:

我的机器可用Processors数量:20
最终的结果:4473,执行耗时:500

从上面的运行结果来看,程序最终执行时间几乎就等于一个线程单独运行的时间,在此注意的是,同时执行的线程数是根据你执行此程序的机器配置决定的。

原文地址:https://www.cnblogs.com/Dylansuns/p/6847853.html