java多线程系列14 设计模式 Master-Worker

Master-Worker模式是常用的并行设计模式可以将大任务划分为小任务,是一种分而治之的设计理念

系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理任务  。子进程处理完成以后,会把结果返回给Master

原理图如下

 

下面代码演示 

public abstract class Worker implements Runnable {
	// 任务队列,用于取得子任务
	protected Queue<Object> workQueue;
	// 子任务处理结果集
	protected Map<String, Object> resultMap;

	public void setWorkQueue(Queue<Object> workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(Map<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	@Override
	public void run() {
		while (true) {
			Object object = workQueue.poll();  //因为当空的时候 会返回空 
		    if(object==null){  
                break;  
            }  
			Object rtn = handle(object);
			resultMap.put(rtn.hashCode() + "", rtn);
		}

	}
    //具体的执行逻辑 交给子类
	public abstract Object handle(Object input) ;

}

  

public class PlusWorker extends Worker {
	   @Override  
	    public Object handle(Object input) {  
	        Integer i =(Integer)input;  
	        return i;  
	    }  
}

  

public class Master {

	// 存放提交的任务
	Queue<Object> planQueue = new ConcurrentLinkedQueue<>();
	// 处理任务的线程
	Map<String, Thread> threads = new HashMap<String, Thread>();
	// 存储每个任务的完成结果
	private Map<String, Object> result = new ConcurrentHashMap<String, Object>();

	public Master(Worker w, int workerNum) {
		if (workerNum <= 0) {
			throw new RuntimeException("workerNum 不能《=0");
		}
		w.setResultMap(result);
		w.setWorkQueue(planQueue);
		for (int i = 0; i < workerNum; i++) {
			threads.put(i + "", new Thread(w, i + ""));
		}
	}

	// 添加任务的方法
	public void submit(Object obj) {
		planQueue.add(obj);
	}

	public void execute() {

		for (Map.Entry<String, Thread> entry : threads.entrySet()) {
			entry.getValue().start();
		}
	}

	public Map<String, Object> getResMap() {
		return result;
	}

	// 判断所有工作线程是否执行完毕
	public boolean isComplete() {
		for (Map.Entry<String, Thread> entry : threads.entrySet()) {
			if (entry.getValue().getState() != Thread.State.TERMINATED) {
				return false;
			}
		}
		return true;
	}

	// 返回结果集
	public Object getRes() {
		Integer re = 0;
		Map<String, Object> resultMap = result;
		for (String string : resultMap.keySet()) {
			Integer ob = (Integer) resultMap.get(string);
			re = ob + re;
		}
		return re;
	}
}

  测试类

     

public class Main {
public static void main(String[] args) {
//	long now = System.currentTimeMillis();
	Master master = new Master(new PlusWorker(), 1);
	for (int i = 1; i <= 1000; i++) {
		master.submit(i);
	}

	master.execute();
     //保存最终结算结果 
     while(true)
     {
    	 if(master.isComplete()){
    		 System.out.println(master.getRes());
    		 break;
    	 }
    		 
     }
	
}
}

  

注意::::重量级的工作配合多核cpu效果不错,轻量级的程序单线程工作效果不一定比并发的差.

原文地址:https://www.cnblogs.com/javabigdata/p/7010278.html