并发的几个模式

下面的例子全部来自<Java程序性能优化-让你的程序更快更稳定>书中的第4章并发程序开发及优化。

future模式:

同时进行两个线程的业务,最终用的时间是耗时最长的线程的时间。

package cn.com.fzk.book;

public class FutureTest {
  public static void main(String[] args) {
    Client client = new Client();

    Data data = client.request("name");
    System.out.println("ok, i will sleep one second");
    try {
      Thread.sleep(1000);
    } catch (Exception e) {
    }
    System.out.println("data :" + data.getResult());
  }
}


class Client {
  public Data request(final String queryStr) {
    final FutureData future = new FutureData();
    new Thread() {
      public void run() {
        RealData realData = new RealData(queryStr);
        future.setRealData(realData);
      }
    }.start();
    return future;
  };
}



interface Data {
  public String getResult();
}


class FutureData implements Data {
  protected RealData realData = null;
  protected boolean isReady = false;

  public synchronized void setRealData(RealData realData) {
    if (isReady) {
      return;
    }
    this.realData = realData;
    isReady = true;
    notifyAll();
  }

  @Override
  public String getResult() {
    while (!isReady) {
      try {
        wait();
      } catch (Exception e) {
      }

    }
    return realData.result;
  }

}


class RealData implements Data {
  protected final String result;

  public RealData(String para) {
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < 10; i++) {
      sb.append(para);
    }
    try {
      Thread.sleep(2000);
    } catch (Exception e) {
    }

    result = sb.toString();
  }

  @Override
  public String getResult() {
    return result;
  }

}

future模式非常常用,jdk自带了一套。

package cn.com.fzk.book;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class JDKFutrueTest {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    FutureTask<String> future = new FutureTask<>(new JDKRealData("a"));
    ExecutorService executor = Executors.newFixedThreadPool(1);
    executor.submit(future);
    System.out.println("ok");
    try {
      Thread.sleep(1000);
    } catch (Exception e) {
    }

    System.out.println("data : " + future.get());
  }
}


class JDKRealData implements Callable<String> {
  private String para;

  public JDKRealData(String para) {
    this.para = para;
  }

  @Override
  public String call() throws Exception {
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < 10; i++) {
      sb.append(para);
    }
    try {
      Thread.sleep(100);
    } catch (Exception e) {
    }
    return sb.toString();
  }

}

MasterWorker模式

将master的任务分配到n个线程中同时执行一个大任务。下面计算1-100的立方和。ConcurrentLinkedQueue是线程安全的队列。

package cn.com.fzk.book;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class MasterWorder {
  public static void main(String[] args) {
    Master master = new Master(new PlusWorker(), 5);
    for (int i = 0; i < 100; i++) {
      master.summit(i);
    }
    master.execute();
    int result = 0;
    Map<String, Object> resultMap = master.getResultMap();
    while (resultMap.size() > 0 || !master.isComplete()) {
      Set<String> keys = resultMap.keySet();
      String key = null;
      for (String k : keys) {
        key = k;
        break;
      }
      Integer i = null;
      if (key != null) {
        i = (Integer) resultMap.get(key);
      }
      if (i != null) {
        result += i;
      }
      if (key != null) {
        resultMap.remove(key);
      }
    }

    System.out.println(result);
  }
}


class Master {
  Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
  Map<String, Thread> threadMap = new HashMap<String, Thread>();
  Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

  public boolean isComplete() {
    for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
      if (entry.getValue().getState() != Thread.State.TERMINATED) {
        return false;
      }
    }

    return true;
  }

  public Master(Worker worker, int countWorker) {
    worker.setWordQueue(workQueue);;
    worker.setResultMap(resultMap);
    for (int i = 0; i < countWorker; i++) {
      threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
    }
  }

  public void summit(Object job) {
    workQueue.add(job);
  }

  public Map<String, Object> getResultMap() {
    return resultMap;
  }

  public void execute() {
    for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
      entry.getValue().start();
    }
  }
}


abstract class Worker implements Runnable {
  Queue<Object> workQueue;
  Map<String, Object> resultMap;

  public void setWordQueue(Queue<Object> workQueue) {
    this.workQueue = workQueue;
  }

  public void setResultMap(Map<String, Object> resultMap) {
    this.resultMap = resultMap;
  }

  public abstract Object handle(Object input);

  @Override
  public void run() {
    while (true) {
      Object input = workQueue.poll();
      if (input == null) {
        break;
      }
      Object result = handle(input);
      resultMap.put(Integer.toString(input.hashCode()), result);
    }

  }
}


class PlusWorker extends Worker {

  @Override
  public Object handle(Object input) {
    Integer i = (Integer) input;
    return i * i * i;
  }

}

Guarded Suspension模式

消息队列等都是用的这种模式,client端将任务提交,worker端根据系统性能进行处理任务,如果不需要等待结果,client端的线程就已经直接完成了。
下面的例子client端等待server端的返回结果,用到的Data等是最上面的Future模式定义的类。

package cn.com.fzk.book;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class GuardedSuspension {
  public static void main(String[] args) {
    RequestQueue requestQueue = new RequestQueue();
    for (int i = 0; i < 10; i++) {
      new ServerThread(requestQueue, "ServerThread " + i).start();
    }
    for (int i = 0; i < 10; i++) {
      new ClientThread(requestQueue, "ClientThread " + i).start();
    }
  }
}


class Request {
  private String name;
  private Data response;

  public Request(String name) {
    this.name = name;
  }

  public String getName() {
    return this.name;
  }

  public synchronized Data getResponse() {
    return response;
  }

  public synchronized void setResponse(Data response) {
    this.response = response;
  }

  public String toString() {
    return "[ Request " + name + " ]";
  }
}


class RequestQueue {
  private LinkedList<Request> queue = new LinkedList<Request>();

  public synchronized Request getRequest() {
    while (queue.size() == 0) {
      try {
        wait();
      } catch (Exception e) {
      }
    }
    return (Request) queue.remove();
  }

  public synchronized void addRequest(Request request) {
    queue.add(request);
    notifyAll();
  }
}


class ServerThread extends Thread {
  private RequestQueue requestQueue;

  public ServerThread(RequestQueue requestQueue, String name) {
    super(name);
    this.requestQueue = requestQueue;
  }

  @Override
  public void run() {
    while (true) {
      final Request request = requestQueue.getRequest();
      final FutureData future = (FutureData) request.getResponse();
      RealData realData = new RealData(request.getName());
      future.setRealData(realData);
      System.out.println(Thread.currentThread().getName() + "handles" + request);
    }
  }
}


class ClientThread extends Thread {
  private RequestQueue requestQueue;
  private List<Request> myRequest = new ArrayList<Request>();

  public ClientThread(RequestQueue requestQueue, String name) {
    super(name);
    this.requestQueue = requestQueue;
  }

  @Override
  public void run() {
    for (int i = 0; i < 10; i++) {
      Request request =
          new Request("RequestID:" + i + " ThreadName" + Thread.currentThread().getName());
      System.out.println(Thread.currentThread().getName() + " requests " + request);
      request.setResponse(new FutureData());
      requestQueue.addRequest(request);
      myRequest.add(request);
      try {
        Thread.sleep(1000);
      } catch (Exception e) {
      }
      for (Request res : myRequest) {
        System.out.println("ClientThread Name is " + Thread.currentThread().getName()
            + " Response is: " + res.getResponse().getResult());
      }
    }
  }
}

还生产者消费者模式、作家读者模式、哲学家模式。

原文地址:https://www.cnblogs.com/badboyf/p/6596586.html