并行模式之Guarded Suspension模式

并行模式之Guarded Suspension模式

一)、Guarded Suspension: 保护暂存模式

应用场景:当多个客户进程去请求服务进程时,客户进程的请求速度比服务进程处里请求的速度快,此时,为了保护客户进程的请求不会丢失,使用 Guarded Suspension模式,增加一个请求队列,客户进程和服务进程共同维护这一个队列。

好处:

1).将客户进程的请求放入到请求队列中,当服务进程有能力处理客户请求时取出请求队列中的客户请求,并处理。

2).确保系统仅在有能力处理某个任务时,它才处理该任务,当系统没有能力处理任务时,它将暂存任务信息,等待系统空闲。

二)、Guarded Suspension的结构

1).请求队列(RequestQueue)

    作用:内部结构是一个链表集合LinkedList, 提供了暂缓客户请求,和取出客户请求的方法,客户进程和服务进程共同维护一个请求队列。
  1. .客户进程(ClientThread)

    作用: 发送请求,将请求存入都请求列表中。

3).服务进程(ServiceThread)

   作用:处理请求,对请求队列中的请求进行处理。

三)、Guarded Suspension的简单实现

Request类: 模拟请求的数据

/**
 * Guarded Suspension : 保护暂存模式。
 * 应用场景:
 *     当多个客户端去请求服务端应用程序,服务端无法快速的处理客户端的请求,此时,需要保存客户端的请求,将客户端的请求保存
 *     在一个缓存队列中,等待服务端处理。
 *
 * 结构: 由服务线程、客户线程、请求队列(用来存放客户的请求)构成,服务线程和客户线程共同维护一个请求队列。
 *
 */

/**
 * 构造一个请求对象
 */
public class Request {
    private String name;
    public Request(String name){
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString(){
        return name;
    }
}

RequestQueue类:请求队列

/**
 * 请求队列:
 *   属性:queue,用于存储客户请求的链表集合
 *   方法:getRequest(),获取链表中的请求
 *        addRequest(),添加客户情求
 */
public class RequestQueue {
    /**
     * 内部结构:链表集合,充当存储请求的缓冲区
     */
    protected LinkedList queue = new LinkedList();

    /**
     * 获取用户的请求
     */
    public synchronized Request getRequest(){
        //当服务端获取请求时,先判断队列中是否有待处理的请求,没有则让线程进入等待状态
        if(queue.size() == 0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //队列中有请求则通过remove()中获取请求,并移除,使用remove()默认移除链表第一个元素,并返回移除的值,
        //当链表中没有元素,调用remove()会抛出NoSerachElementException
        return (Request) queue.remove();
    }
    /**
     * 添加客户的请求
     */
    public synchronized void addRequest(Request request){
        queue.add(request);
        //唤醒等待的服务线程
        notifyAll();
    }
}

ClientThread: 客户进程

/**
 * 客户线程
 */
public class ClientThread implements Runnable{
    /**
     * 内部维护一个请求队列
     */
    protected RequestQueue requestQueue = new RequestQueue();
    public ClientThread(RequestQueue requestQueue){
        this.requestQueue = requestQueue;
    }

    @Override
    public void run() {
        //构造客户请求
        for(int i = 0; i < 10; i++){
            Request request = new Request("RequestId:"+ i + " Thread_Name:" + Thread.currentThread().getName());
            System.out.println(Thread.currentThread().getName()+" "+"request:"+request);
            //将请求放入请求队列
            requestQueue.addRequest(request);
            //控制客户端提交请求的速度,快于服务端处理请求的速度
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ClientThread Name is"+ Thread.currentThread().getName()+" "+"request:"+request);
        }
        System.out.println(Thread.currentThread().getName()+" end");
    }
}

ServiceThread: 服务进程

/**
 * 服务端线程
 *
 */
public class ServiceThread implements Runnable{
    /**
     * 内部维护一个请求队列
     */
    protected RequestQueue requestQueue = new RequestQueue();
    public ServiceThread(RequestQueue requestQueue){
        this.requestQueue = requestQueue;
    }

    /**
     * 获取请求,处理请求
     */
    @Override
    public void run() {
        while(true) {
            final Request request = requestQueue.getRequest();
            //模拟请求处理耗时,控制请求处理的时间比请求发送时间耗时长
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            //模拟请求被处理
            System.out.println(Thread.currentThread().getName()+" " + "handles:" + request);
        }
    }

}

ClientService: 启动类

/**
 * 模拟客户端发送请求,服务端处理请求
 */
public class ClientAndService {
    public static void main(String[] args) {
        //客户端和服务端共享的请求队列
        RequestQueue queue = new RequestQueue();
        //客户端发送请求
        ClientThread clientThread = new ClientThread(queue);
        for(int i = 0; i < 10; i++){
            new Thread(clientThread,"ClientThread"+i).start();
        }
        //服务端处理请求
        ServiceThread serviceThread = new ServiceThread(queue);
        for(int i = 0; i < 10; i++){
            new Thread(serviceThread,"ServiceThread"+i).start();
        }
        //结论:服务端处理请求的速度比发送请求慢,因为加入了缓存的队列,当客户端结束发送请求,服务端依然可以从缓存队列中取出客户的
        //请求并处理,保证了客户的请求不会丢失。
    }
}

运行结果:

注:因为运行结果过长,以下给出的是部分的运行结果。

ServiceThread5 handles:RequestId:0 Thread_Name:ClientThread5
ServiceThread4 handles:RequestId:0 Thread_Name:ClientThread4
ServiceThread3 handles:RequestId:0 Thread_Name:ClientThread1
ClientThread Name isClientThread4 request:RequestId:9 Thread_Name:ClientThread4
ClientThread Name isClientThread2 request:RequestId:9 Thread_Name:ClientThread2
ClientThread2 end
ClientThread Name isClientThread0 request:RequestId:9 Thread_Name:ClientThread0
ClientThread0 end
ClientThread Name isClientThread5 request:RequestId:9 Thread_Name:ClientThread5
ClientThread5 end
ClientThread4 end
ClientThread Name isClientThread8 request:RequestId:9 Thread_Name:ClientThread8
ClientThread8 end
ClientThread Name isClientThread3 request:RequestId:9 Thread_Name:ClientThread3
ClientThread3 end
ClientThread Name isClientThread7 request:RequestId:9 Thread_Name:ClientThread7
ClientThread7 end
ClientThread Name isClientThread9 request:RequestId:9 Thread_Name:ClientThread9
ClientThread9 end
ClientThread Name isClientThread6 request:RequestId:9 Thread_Name:ClientThread6
ClientThread6 end
ClientThread Name isClientThread1 request:RequestId:9 Thread_Name:ClientThread1
ClientThread1 end
ServiceThread1 handles:RequestId:1 Thread_Name:ClientThread1
ServiceThread0 handles:RequestId:1 Thread_Name:ClientThread4
ServiceThread2 handles:RequestId:1 Thread_Name:ClientThread5

结论: 当客户进程停止发送请求时,服务进程仍在继续工作,这说明

          RequestQueue发挥了中间缓存的作用,客户端的请求没有丢失。

四)、携带返回结果的Guarded Suspension(+Future 模式)

1)、Guarded Suspension的弊端:

客户进程的Request不能获取服务进程的返回结果,当客户进程去请求服务进程必须使用服务进程的返回值,客户进程不知道服务进程何时处理请求,也不知道需要处理多久,因此,可以使用Future模式,在客户请求时返回一个虚假的数据FutureData对象,此时,客户进程可以先处理其它的业务逻辑,当服务真正的处理完请求,再返回真实的处理数据。


2)、Guarded Suspension(+Future 模式)的结构

1.Request: 请求类

作用: 接收请求信息,内部维护了一个数据返回结果接口Data。

2.Data: 接口

作用: FutureData 和 RealData的共同接口,定义了一个获取数据返回结果的方法。

3.FutureData: 虚假数据类

作用:轻量级对象,实现了Data接口,内部维护了一个RealData对象,在请求处理时注入, 是RealData的一个代理类,是客户进程发送请求后立即返回的一个虚假数据类。

4.RealData: 真实数据类

作用: 真实数据类,做获取数据的真实操作

5.RequestQueue: 请求队列

作用:内部结构是一个链表集合LinkedList, 提供了暂缓客户请求,和取出客户请求的方法,客户进程和服务进程共同维护一个请求队列。

6.ClienThread: 用户进程

作用:发送请求并获取请求的数据。

7.ServiceThread

作用:处理请求。

五)、Guarded Suspension(+Future 模式)的简单实现

Request类:请求类

/**
 * 构造请求类
 *   属性:
 *     name: 请求信息
 *     Data: 返回给客户端的处理结果
 *
 */
public class Request {
    /**
     * 请求信息
     */
    private String name;

    /**
     * 响应信息
     */
    private Data response;

    public Request(String name) {
        this.name = name;
    }

    @Override
    public String toString(){
        return name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Data getResponse() {
        return response;
    }

    public void setResponse(Data response) {
        this.response = response;
    }
}

Data: 数据结果接口

/**
获取数据结果的方法
*/
public interface Data {
    String getResult();
}

FutureData: 虚假数据类

/**
 * 虚假数据类: 轻量级对象
 */
public class FutureData implements Data {
    /**
     *  内部维护一个RealData
     */
   protected RealData realData;

    /**
     * 使用一个变量来监控是否成功注入RealData
     */
    private boolean isReady = false;

    /**
     * 注入真实的RealData
     */
    public synchronized void setRealData(RealData realData){
        if(isReady){
            return;
        }
        this.realData = realData;
        //注入成功,将isReady值设为true,唤醒等待的线程
        isReady = true;
        notifyAll();
    }

    /**
     * 获取真实的返回数据
     */
    @Override
    public synchronized String getResult() {
        //先判断RealData对象是否注入成功,若没有则让线程进入等待桩状态
        if(!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //注入成功,返回真实的处理结果
        return realData.getResult();
    }
}

RealData: 真实数据类

/**
 * 真实的数据类: 是一个重量级对象
 */
public class RealData implements Data {
    /**
     * 请求信息
     */
    private Request request;
    /**
     * 模拟构造方法构造很慢
     */
    public RealData(Request request){
        this.request = request;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Override
    public String getResult() {
        return "我处理了 "+request;
    }
}

RequestQueue: 请求队列

/**
 * 请求队列: ServiceThread和ClientThread共同维护的对象
 *    属性:
 *       LinkdList: 存储客户端的请求
 *    方法:
 *       addRequest(): 添加请求
 *       getRequest(): 获取请求
 */
public class RequestQueue {
    /**
     * 请求队列,
     */
    protected LinkedList requestQueue = new LinkedList();

    /**
     * 获取请求
     */
    public synchronized Request getRequest(){
        //获取请求时先判断队列中是否有请求,若没有让线程进入当待状态
        if(requestQueue.size() == 0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //有请求,返回链表第一个元素,并移除,remove()默认移除链表第一个元素,并返回移除值
        //当链表没有元素时,调用remove()方法会抛NoSearchElementException
        return (Request) requestQueue.remove();
    }

    /**
     * 添加请求
     */
    public synchronized void addRequest(Request request){
        requestQueue.add(request);
        //当用户添加请求后,队列存在数据,唤醒服务端的等待线程
        notifyAll();
    }
}

ClientThread: 客户进程

**
 * 客户线程
 *    属性: RequestQueue,请求队列,将请求放入队列中
 */
public class ClientThread extends Thread{
    /**
     * 请求队列
     */
    protected RequestQueue requestQueue;

    public ClientThread(RequestQueue requestQueue, String name) {
        super(name);
        this.requestQueue = requestQueue;
    }
    @Override
    public void run(){
        for(int i = 0; i < 10; i++) {
            //构造request,模拟发送的请求
            Request request = new Request("[RequestId:" + i + " Thread_Name:" + Thread.currentThread().getName());
            //将请求放入请求队列
            requestQueue.addRequest(request);
            //用户发送完请求变可以获得一个轻量级的响应对象
            request.setResponse(new FutureData());
            //模拟用户不用等待真实数据的返回,处理其他业务逻辑
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //获取真实的响应数据
            System.out.println(request.getResponse().getResult());
            System.out.println(Thread.currentThread().getName()+" "+request);
            //模拟用户发送请求的时间比服务处理请求的快
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("ClientThread name is "+Thread.currentThread().getName());
        }
        System.out.println(Thread.currentThread().getName()+" is end!");
    }
}

ServiceThread: 服务进程

/**
 * 服务进程:
 *   属性:RequestQueue,获取请求的队列
 */
public class ServiceThread extends Thread{
    /**
     * 请求队列
     */
    protected RequestQueue requestQueue;

    public ServiceThread(RequestQueue requestQueue, String name) {
        super(name);
        this.requestQueue = requestQueue;
    }

    @Override
    public void run(){
       while(true){
           //从队列中获取请求
           Request request = requestQueue.getRequest();
           //进行真正的数据处理并返回
           FutureData future = (FutureData) request.getResponse();
           RealData realData = new RealData(request);
           future.setRealData(realData);
           //模拟用户对请求进行了处理
           System.out.println(Thread.currentThread().getName()+" handle"+request);
           //模拟用户处理请求的时间长于发送请求的时间
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }

}

ClientService: 启动类

/**
 * 客户端发送请求,服务端处理请求
 */
public class ClientService {
    public static void main(String[] args) {
        //客户进程和服务进程共同维护一个请求队列
        RequestQueue requestQueue = new RequestQueue();
        //客户端请求
        for(int i = 0; i < 10; i++){
           new ClientThread(requestQueue,"ClientThread"+i).start();
        }
        //服务端请求
        for(int i = 0; i < 10; i++){
            new ServiceThread(requestQueue, "ServiceThread"+ i).start();
        }
    }
}

运行结果:

金麟岂能忍一世平凡 飞上了青天 天下还依然
原文地址:https://www.cnblogs.com/Auge/p/11699589.html