多线程 Worker Thread 模式

Worker是“工人”的意思,worker thread pattern中,工人线程(worker thread)会一次抓一件工作来处理,当没有工作可做时,工人线程会停下来等待心得工作过来。

Worker Thread也叫做background thread,另外,也有人把视点放在管理工人线程的地方,称之为Thread Pool。

最近闲时在看《Java多线程设计模式》,很不错,语言浅显易懂,而且编排也好,很有启发性,现在挑其中一章来写写心得

worker thread是我们平时用的很多的一种多线程模式,只不过我们常常不把它当模式罢了。基本内容是:有一个流水线(channel),流水线一端有客户线程client,另一端有工人线程worker,客户不断把新的任务(request)放入流水线,工人在另一头获得任务,并执行,客户和工人的数量可多可少,就这么简单

这个所谓的pattern初看好像似曾相识,就是一个thread pool嘛, 按通常的做法,request可以实现Runnable接口,把要做的事情放在run方法中,由worker去执行,具体实现时还要注意同步的问题

不过,由此我们可以想到Swing的工作方式。Swing是事件驱动的,它有一个the event-dispatch queue,这里之所以用the,是因为这个队列是唯一的,就和上面说的流水线一样。Swing的各个组件相当于客户,不断把各种事件(键盘或者鼠标事件,等等)塞入event queue中,queue有个专门的线程负责把这些事件送给相应的listener,就实现了最基本的事件驱动模型。如果不采用这种模型,即事件由专门线程处理的话,界面的相应速度就很差了

如果有用过Java做游戏的话,应该都接触过javax.swing.SwingUtilities这个类,里面有个invokeAndWait方法,就是用来让其他线程操作Swing组件的。为什么不能直接操作呢,如上所述,event dispatch queue是唯一的,因此Swing组件在设计时就没有过多考虑多线程的问题,反正由event dispatch queue统一操作,这样可以提高速度(尽管Swing本来就很慢),但是当其他线程要操作Swing组件时,就可能有潜在的不稳定因素,所以才有了invokeAndWait方法,调用此方法的线程会wait直到所需操作已经完成

还没说完,再仔细想想,我们在程序里也常常对那些组件直接操作而非用什么invokeAndWait,这里又有一些细节值得注意。首先是在你调用组件的setVisible等方法之前,你是可以随便改组件的,调用完setVisible之后,只有少数方法,比如repaint,addListener等等。最后,根据jdk文档所言,这个方法是用来给应用程序线程改变GUI外观的。如果非要直接改,不一定会出错,多线程本来就是比较难说的,呵呵,我也没试过,改天可以尝试一下。

每 個執行緒(线程)處理一個請求,每次執行緒執行完請求後,再次嘗試取得下一個請求並執行,這是Worker Thread的基本概念,對於一些需要冗長計算或要在背景執行的請求,可以採用Worker Thread。

在 
Thread-Per-Message 模式 中,其實已經有點Worker Thread的概念,在Service物件接收到資料後,以匿名方式建立執行緒來處理資料,那個建立的執行緒就是Worker Thread,只不用過就丟了。

Worker Thread可以應用在不同的場合,例如在 
Guarded Suspension 模式 的範例,是使用一個執行緒來處理請求佇列中的請求,如果請求不斷來到,且請求中可能有冗長的處理,則請求佇列中的請求可能會來不及消化。

您可以為請求佇列中的每個請求配給一個執行緒來處理,不過實際上,只要建立足夠多的執行緒即可, 

Worker Thread模式在Request的管理上像是 Producer Consumer 模式,在Request的行为上像是 Command 模式

Producer Consumer模式专注于Product的生产与消费,至于Product被消费时是作何处理,则不在它的讨论范围之中。 

如果您的Product是一个Request,消费者取得Request之后,执行Request中指定的请求方法,也就是使用Command模式,并且您的Request缓冲区还管理了Consumer,就有Worker Thread模式的意思了。

在Sequence Diagram上,可以看出Worker Thread同时展现了Producer Consumer模式与Command模式:

public class Request {
    private final String name; //  委托者�
    private final int number;  // 请求编号
    private static final Random random = new Random();
    public Request(String name, int number) {
        this.name = name;
        this.number = number;
    }
    public void execute() {
        System.out.println(Thread.currentThread().getName() + " executes " + this);
        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
        }
    }
    public String toString() {
        return "[ Request from " + name + " No." + number + " ]";
    }
}

ClientThread:

public class ClientThread extends Thread {
    private final Channel channel;
    private static final Random random = new Random();
    public ClientThread(String name, Channel channel) {
        super(name);
        this.channel = channel;
    }
    public void run() {
        try {
            for (int i = 0; true; i++) { //可以修改为合适的
                Request request = new Request(getName(), i);
                channel.putRequest(request);
                Thread.sleep(random.nextInt(1000));
            }
        } catch (InterruptedException e) {
        }
    }
}

WorkerThread:

public class WorkerThread extends Thread {  
    private final Channel channel;  
    public WorkerThread(String name, Channel channel) {  
        super(name);  
        this.channel = channel;  
    }  
    public void run() {  
        while (true) {  
            Request request = channel.takeRequest();  
            request.execute();  
        }  
    }  
}  

Channel: 可以LinkedList

public class Channel {
    private static final int MAX_REQUEST = 100;
    private final Request[] requestQueue;
    private int tail;  // ��下一个putRequest的地方
    private int head;  // ��下一个takeRequest的地方
    private int count; // Request的数量

    private final WorkerThread[] threadPool;

    public Channel(int threads) {
        this.requestQueue = new Request[MAX_REQUEST];
        this.head = 0;
        this.tail = 0;
        this.count = 0;

        threadPool = new WorkerThread[threads];
        for (int i = 0; i < threadPool.length; i++) {
            threadPool[i] = new WorkerThread("Worker-" + i, this);
        }
    }
    public void startWorkers() {
        for (int i = 0; i < threadPool.length; i++) {
            threadPool[i].start();
        }
    }
    public synchronized void putRequest(Request request) {
        while (count >= requestQueue.length) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        requestQueue[tail] = request;
        tail = (tail + 1) % requestQueue.length;
        count++;
        notifyAll();
    }
    public synchronized Request takeRequest() {
        while (count <= 0) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        Request request = requestQueue[head];
        head = (head + 1) % requestQueue.length;
        count--;
        notifyAll();
        return request;
    }
}

测试:

public class Main {
    public static void main(String[] args) {
        Channel channel = new Channel(5);   // ��工人线程的數量
        channel.startWorkers();
        new ClientThread("Alice", channel).start();
        new ClientThread("Bobby", channel).start();
        new ClientThread("Chris", channel).start();
    }
}

参考了:http://www.riabook.cn/doc/designpattern/WorkerThread.htm

http://openhome.cc/Gossip/DesignPattern/WorkerThread.htm

http://chxiaowu.iteye.com/blog/1320767

更多:http://developer.51cto.com/art/201305/395279_1.htm

原文地址:https://www.cnblogs.com/youxin/p/3587292.html