5 并行模式与算法

单例模式

单例模式的好处:

  • 节省系统开销(省略new花费的时间)
  • 减轻GC压力(new次数减小,内存的使用频率也随之减少)

简单的单例:

public class Singleton {
    private static Singleton instance = new Singleton();
    private Singleton (){}
    public static Singleton getInstance() {
        return instance;
    }
}

上面代码需要注意几点:

  • Singleton构造函数设置为private。防止开发人员随意创建多余的实例。
  • instance对象必须为private(保证instance的安全)且是static的。工程方法getInstance()必须是static的。

缺点:Singleton实例在何时创建不受控制,instance对象会在第一次初始化时被创建

比如,在Singleton中加入一个属性:

public class Singleton {
  	public static int STATUS = 1;
   	//...和上面代码一致
}

那么,只要调用Singleton.STATUS就会实例化Singleton。而不用等到调用getInstance()方法。

延迟加载的单例:

特点:只会在instance第一次使用时创建对象。

缺点:使用了synchronized锁,并发坏境下对性能产生一定影响。

/**
 * 延迟的单例
 */
public class LazySingleton {
    private LazySingleton(){
    }
    private static LazySingleton instance = null;
    public static synchronized LazySingleton getInstance(){
        if (instance == null)
            instance = new LazySingleton();
        return instance;
    }
}

使用内部类实现的单例:

优点:在真正需要时创建对象;性能优越。

public class StaticSingleton {

    private StaticSingleton(){
        System.out.println("StaticSingleton is created!");
    }

    private static class SingletonHolder{
        private static StaticSingleton instance = new StaticSingleton();
    }

    public static StaticSingleton getInstance(){
        return SingletonHolder.instance;
    }
    
}

不变模式

什么是不变模式:通过使用一种不可改变的对象,依靠对象的不变性,达到在没有同步操作的多线程环境下依然保持内部状态的一致性和正确性。这就是不变模式。

核心思想:天生对多线程友好,对象一旦被创建,它的内部状态将永远不会发生改变。别的线程不能改变,它自己也不会改变自己。这和只读属性有点区别,只读属性可以自己改变自己。

应用场景满足两个条件:

  • 当对象创建后,其内部状态和数据不再发生任何变化。
  • 当对象需要被共享,被多线程频繁访问。

如何实现?需要注意4点:

  • 去除setter方法已经所有修改自身属性的方法。
  • 将所有属性设置为私有,并用final标记,确保其不可修改。
  • 确保没有子类可以重载修改它的行为。
  • 有一个可以创建完整对象的构造函数。
public final class Product {
    //一下所有final定义的对象都只赋值一次,随后不会改变
    private final String no;
    private final String name;
    private final double price;

    public Product(String no, String name, double price){
        this.no = no;
        this.name = name;
        this.price = price;
    }
    public String getNo() {
        return no;
    }
    public String getName() {
        return name;
    }
    public double getPrice() {
        return price;
    }
}

属性中的final确保所有数据都只赋值一次,定义class为final是不希望被继承。以免子类破坏该类的不变模式。

JDK中的例子:最典型的是String类,所有元数据的包装类都是不变模式。

注意:不变模式是通过回避问题而不是解决问题来处理多线程并发访问控制的,不变对象不需要同步操作。并发中性能上会提高。

生产者-消费者模式

介绍:生产者-消费者模式是一个典型的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在这个模式中,通产有两类线程,若干个生产者线程和若干个消费者线程,还有一个共享内存缓冲区。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间通过共享内存缓冲区进行通信

基本结构:

特点:生产者和消费者解耦,两者都不需要知道对方的存在。允许消费者和生产者之间存在时间差。

具体结构:

![生-消_2019-09-27_15-09-37](D:user80004133Pictures生-消_![](https://img2018.cnblogs.com/blog/1541399/201911/1541399-20191129145700779-1544050544.png

  • 生产者:提交用户请求,提取用户任务,并装入内存缓冲区
  • 消费者:消费任务
  • 内存缓冲区:缓存任务,供生产者提交,消费者消费的缓冲区域。
  • 任务:缓冲区缓冲的数据结构
  • Main:使用生产者和消费者的客户端

例子:实现一个基于生产者-消费者求整数平方的并行程序

生产者:它构建PCData对象,并放入BlockingQueue队列中。

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingDeque<PCData> queue; //内存缓冲区,通过构造时外部引入,保证和消费者用的是同样的内存缓冲区.
    private static AtomicInteger count = new AtomicInteger(); //总数,原子操作.
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingDeque<PCData> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        PCData data = null;
        Random random = new Random();
        System.out.println("start producter .." + Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(random.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data + " is put into Queue");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {      //插入一个数据到缓冲区
                    System.out.println("failed to put data " + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        isRunning = false;
    }
}

消费者:从BlockingQueue中取得PCData对象,并进行相应计算。

public class Consumer implements Runnable {
    private BlockingDeque<PCData> queue;	//共享缓冲区
    private static final int SLEEPTIME = 1000;

    public Consumer(BlockingDeque<PCData> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("start Consumer id : " + Thread.currentThread().getId());
        Random r = new Random();
        try {
            while (true) {
                PCData data = queue.take();		//从共享缓冲区中拿到一个数据消费
                if (null != data) {
                    int re = data.getIntData() * data.getIntData();
                    System.out.println(MessageFormat.format("{0} * {0} = {1}", data.getIntData(), re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

共享数据模型:PCData

public class PCData {
    private final int intData;

    public PCData(int d) {
        intData = d;
    }

    public PCData(String d) {
        intData = Integer.parseInt(d);
    }

    public int getIntData() {
        return intData;
    }

    @Override
    public String toString() {
        return "PCData{" +
                "intData=" + intData +
                '}';
    }
}

主函数:创建一个生产者和消费者共用的共享缓冲区,创建生产者和消费者线程,并提交到线程池。

class Main {
    public static void main(String[] a) throws InterruptedException {
        BlockingDeque<PCData> queue = new LinkedBlockingDeque<>(10);
        Producer producter1 = new Producer(queue);
        Producer producter2 = new Producer(queue);
        Producer producter3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(producter1);
        es.execute(producter2);
        es.execute(producter3);
        es.execute(consumer1);
        es.execute(consumer2);
        es.execute(consumer3);
        //运行时间
        Thread.sleep(1000 * 10);
        //停止生产者
        producter1.stop();
        producter2.stop();
        producter3.stop();
        //停止生产者后,预留时间给消费者执行
        Thread.sleep(1000 * 5);
        es.shutdown();
    }
}

高性能的生产者-消费者:无锁的实现

现成的Disruptor框架是一个无锁的缓存框架。

无锁的缓存框架:Disruptor

介绍:Disruptor框架使用无锁的方式实现了一个环形队列,非常适用于生产者-消费者模式,比如事件和消息发布。

用Disruptor实现生产者-消费者案例

提高消费者的响应时间:选择合适的策略

消费者如何监控缓冲区中的信息呢?,Disruptor提供了几种策略:

  • BlockingWaitStrategy(默认):使用BlockingWaitStrategy和使用BlockingQueue非常类似的,它们都使用锁和条件进行数据的监控和线程的唤醒。最节省cup,但在高并发下性能最糟。
  • SleepingWaitStrategy:它先使用自旋,不成功再使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行线程休眠,以确保不占用太多CPU。不太占CPU,适用于对延时要求不高的场合。对生产者影响最小。典型的应用场景是异步日志
  • YieldingWaitStrategy适用于低延时场景。很占CPU(最好逻辑CPU数量大于消费者线程数)。消费者线程会不断循环监控缓冲区变化,在循环内部,会使用Thread.yield()让出CPU给别的线程执行时间。
  • BusySpinWaitStrategy最疯狂的等待策略。特耗CPU,特低延迟。它就是一个死循环!只有在延迟非常苛刻的场合才考虑使用它。

CPU Cache的优化:解决伪共享问题

什么是伪共享问题?

为了提高CPU速度,CPU有一个高速缓存Cache。在高速缓存中,读写数据的最小单位为缓存行,它是从主存(memory)复制到缓存(Cache)的最小单位。

如果两个变量存放在一个缓存行时,在多线程访问中,可能会相互影响彼此的性能。如图,当cpu1的x被修改,cpu2上的缓存行就会变成无效,导致Cache无法命中。如果CPU不能命中缓存,系统的吞吐量就会下降。

Snipaste_2019-10-11_19-25-06

解决办法

让x变量一行,行的空位置让padding填充。这样,就能保证x被修改时,cpu2的缓存不会失效。

原文地址:https://www.cnblogs.com/sean-zeng/p/11957781.html