Master-Worker集群计算demo

Task为要执行的任务实体类:

package com.viewhigh.mdop.bi.test;

/**
 * Created by zzq on 2017/5/11.
 */
public class Task {
    private int id;
    private String name;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public Task setName(String name) {
        this.name = name;
        return this;
    }
}

Master为分布式计算代理类,负责创建多个工作线程来处理任务,并将结果汇总,内部维护任务队列,结果map集合和线程map集合:

package com.viewhigh.mdop.bi.test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by zzq on 2017/5/11.
 */
public class Master {
    //任务队列
    private Queue<Task> workerQueue = new LinkedBlockingQueue();

    //工作线程集合
    private Map<Integer, Thread> workerMap = new HashMap();

    //工作线程返回结果
    private Map<Integer, Object> workerResult = new ConcurrentHashMap();

    public Map<Integer, Object> getWorkerResult() {
        return workerResult;
    }

    public Master(Worker worker) {
        int currProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("当前机器最大线程数:" + currProcessors);

        worker.setWorkerQueue(workerQueue);
        worker.setWorkerResultMap(workerResult);

        for (int i = 0; i < currProcessors; i++) {
            workerMap.put(i, new Thread(worker));
        }
    }

    public void submitTask(Task task) {
        workerQueue.add(task);
    }

    public void submitTask(List<Task> taskList) {
        for (Task task : taskList)
            workerQueue.add(task);
    }

    public void execute() {
        for (Thread thread : workerMap.values()) {
            thread.start();
        }
    }

    public boolean finish() {
        for (Thread thread : workerMap.values()) {
            if (thread.getState() != Thread.State.TERMINATED)//线程未终止
                return false;
        }
        return true;
    }
}

Worker承担计算和计算结果汇总,处理队列中的Task:

package com.viewhigh.mdop.bi.test;

import java.util.Map;
import java.util.Queue;

/**
 * Created by zzq on 2017/5/11.
 */
public class Worker implements Runnable {

    private Queue<Task> workerQueue;
    private Map<Integer, Object> workerResult;

    @Override
    public void run() {
        while (true) {
            Task task = workerQueue.poll();

            if (task == null) break;

            Object ret = handle(task);
            workerResult.put(task.getId(), ret);
        }
    }

    public Object handle(Task task) {
        try {
            Thread.sleep(1000);
            return task.getName();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void setWorkerQueue(Queue<Task> workerQueue) {
        this.workerQueue = workerQueue;
    }

    public void setWorkerResultMap(Map<Integer, Object> workerResult) {
        this.workerResult = workerResult;
    }
}

测试类:

package com.viewhigh.mdop.bi.test;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.*;

/**
 * Created by zzq on 2017/5/11.
 */
public class MasterTest {
    @Test
    public void submitTask() throws Exception {
        List<Task> list = new ArrayList();

        for (int i = 0; i < 10; i++) {
            Task task1 = new Task();
            task1.setId(i);
            task1.setName("_序号_:" + Integer.toString(i));
            list.add(task1);
        }

        Master master = new Master(new Worker());
        master.submitTask(list);

        master.execute();
        long currTime = System.currentTimeMillis();

        while (!master.finish()) {
//            System.out.println("执行中。。。");
        }

//        for (Object a : master.getWorkerResult().values())
//            System.out.println((String) a);
        System.out.println("消耗的时间:" + Long.toString(System.currentTimeMillis() - currTime));
    }


}
原文地址:https://www.cnblogs.com/zzq-include/p/6842787.html