线程同步工具 Semaphore类使用案例

参考博文 :

  1. 线程同步工具(一)
  2. 线程同步工具(二)控制并发访问多个资源
  3. 并发工具类(三)控制并发线程数的Semaphore

使用Semaphore模拟互斥锁

当一个线程想要访问某个共享资源,首先,它必须获得semaphore。如果semaphore的内部计数器的值大于0,那么semaphore减少计数器的值并允许访问共享的资源。计数器的值大于0表示,有可以自由使用的资源,所以线程可访问并使用它们。

另一种情况,如果semaphore的计数器的值等于0,那么semaphore让线程进入休眠状态一直到计数器大于0。计数器的值等于0表示全部的共享资源都正被线程们使用,所以线程想要访问就必须等到某个资源成为自由的。

当线程使用完共享资源时,他就必须释放出semaphore,增加semaphore的内部计数器的值,让其他线程可以访问共享资源。

  • 方法:
    设置Semaphore的permits大小为1,这样同一个时刻,只能有一个线程,进入acquire和release保护的方法体内。
  • Code:

import java.util.concurrent.Semaphore;

//1. 使用semaphore保护的互斥打印队列
class PrintQueue {

    private final Semaphore semaphore;

    public PrintQueue() {
        // 只设定一个许可,这样同一个时刻 只能一个线程执行 printJob方法,从而实现互斥锁
        semaphore = new Semaphore(1);
    }

    //2.实现Implement the printJob()方法,此方法可以模拟打印文档,并接收document对象作为参数。
    public void printJob(Object document) {

        try {
            semaphore.acquire();

            //3.然后,实现能随机等待一段时间的模拟打印文档的行。
            long duration = (long) (Math.random() * 10);
            System.out.printf("%s: PrintQueue: Printing a Job during %d seconds
", Thread.currentThread().getName(), duration);
            Thread.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //7.最后,释放semaphore通过调用semaphore的release()方法。
            semaphore.release();
            System.out.printf("%s: PrintQueue: Printing a Job release the permits 
", Thread.currentThread().getName());
        }
    }
}

//  模拟打印进程
class PrintThread extends Thread {

    PrintQueue printQueue ;
    public PrintThread (PrintQueue printQueue) {
        this.printQueue = printQueue ;
    }

    @Override
    public void run() {
        printQueue.printJob(new Object());
    }
}

public class PrintQueueTest {
    public static void main(String[] args) {
        PrintQueue printQueue = new PrintQueue() ;

        PrintThread a = new PrintThread(printQueue) ;
        a.setName("A");

        PrintThread b = new PrintThread(printQueue);
        b.setName("B");

        PrintThread c = new PrintThread(printQueue) ;
        c.setName("C");

        a.start();
        b.start();
        c.start();
    }
}

控制并发访问多个资源

在上面的例子中,我们使用了semaphore来保护访问一个共享资源的,或者说一个代码片段每次只能被一个线程执行。但是semaphore也可以用来保护多个资源的副本,也就是说当你有一个代码片段每次可以被指定数量的多个线程执行时,可以考虑使用Semaphore。

下面的例子会有一个print queue 但可以在3个不同的打印机上打印文件。


import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class PrintQueue2 {
    // 模拟打印机 数组
    private boolean freePrinters[];
    private Lock lockPrinters;

    private final Semaphore semaphore;

    public PrintQueue2() {
        semaphore = new Semaphore(2);
        //模拟 初始化三个可用的打印机
        freePrinters = new boolean[]{true,true,true};
        lockPrinters = new ReentrantLock();
    }

    public void printJob(Object document) {
        int assignedPrinter =  -1 ;
        try {
            semaphore.acquire();

            // 获取可用的打印机 序号
             assignedPrinter = getPrinter();

            //7.然后, 随机等待一段时间来实现模拟打印文档的行。
            long duration = (long) (Math.random() * 10);
            System.out.printf("%s: PrintQueue: Printing a Job in Printer%d during %d seconds
", Thread.currentThread().getName(), assignedPrinter, duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(assignedPrinter != -1) { // 模拟释放打印机
                freePrinters[assignedPrinter] = true;
            }
            semaphore.release();
            System.out.printf("%s: PrintQueue: 打印结束释放打印机
", Thread.currentThread().getName());
        }
    }

    // 遍历找到当前可用的 打印机的索引下标
    private int getPrinter() {
        int ret = -1;
        try {
            lockPrinters.lock();
            for (int i = 0; i < freePrinters.length; i++) {
                if (freePrinters[i]) {
                    ret = i;
                    freePrinters[i] = false;
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lockPrinters.unlock();
        }
        return ret;
    }
}


//  模拟打印进程
class PrintThread extends Thread {

    PrintQueue2 printQueue2 ;
    public PrintThread (PrintQueue2 printQueue) {
        this.printQueue2 = printQueue ;
    }

    @Override
    public void run() {
        printQueue2.printJob(new Object());
    }
}

 class PrintQueueTest {
    public static void main(String[] args) {
        PrintQueue2 printQueue = new PrintQueue2() ;

        PrintThread a = new PrintThread(printQueue) ;
        a.setName("A");

        PrintThread b = new PrintThread(printQueue);
        b.setName("B");

        PrintThread c = new PrintThread(printQueue) ;
        c.setName("C");

        a.start();
        b.start();
        c.start();
    }
}

使用Semaphore控制并发线程数

Semaphore可以用来做流量控制,特别公用资源有限的应用场景,比如数据库连接。假设有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要进行存储到数据库中,而数据库的连接数只有10几个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。

public class SemaphoreTest2 {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);

    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("save data");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}

在代码中,虽然有30个线程正在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits)接收一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。

原文地址:https://www.cnblogs.com/boothsun/p/7145262.html