信号量

  信号量就是一个停车场,车位数量的固定的,入场先要取卡获得准入证,出去交钱还证。车位满了后面的车就得排队,等里面有车离场才能进来。可以作为资源池来应用,也可以实现流控。下面举例:

  1、有界列表:

package com.wulinfeng.concurrent;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class BoundedList<T> {

    private final Semaphore sem;
    private List<T> list;

    public BoundedList(int capacity) {
        this.sem = new Semaphore(capacity);

        // 包装一个线程安全的列表
        this.list = Collections.synchronizedList(new LinkedList<T>());
    }

    public boolean add(T t) throws InterruptedException {
        // 可用信号已用光表示到达列表边界,再插入则报错,
        if (sem.availablePermits() < 1) {
            throw new ArrayIndexOutOfBoundsException();
        }

        // 申请信号
        sem.acquire();

        boolean isAdded = false;
        try {
            isAdded = list.add(t);
            return isAdded;
        } finally {
            // 插入失败则释放信号
            if (!isAdded) {
                sem.release();
            }
        }
    }

    public boolean remove(T t) {
        boolean isRemoved = false;

        try {
            isRemoved = list.remove(t);
            return isRemoved;
        } finally {
            // 成功删除则释放信号
            if (isRemoved) {
                sem.release();
            }
        }
    }

    public T get(int index) {
        return list.get(index);
    }

    public static void main(String[] args) throws InterruptedException {
        BoundedList<String> bl = new BoundedList<String>(2);

        // 插入两个,ok
        bl.add("hello");
        bl.add("world");
        System.out.printf("%s,%s!
", bl.get(0), bl.get(1));

        // 删掉一个,再来一个,ok
        bl.remove("world");
        bl.add("wlf");
        System.out.printf("%s,%s!
", bl.get(0), bl.get(1));

        // 插入第3个,不ok,越界了
        bl.add("hi");
    }
}

  运行结果:

hello,world!
hello,wlf!
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
    at com.wulinfeng.concurrent.BoundedList.add(BoundedList.java:23)
    at com.wulinfeng.concurrent.BoundedList.main(BoundedList.java:73)

  2、并发流控

package com.wulinfeng.concurrent;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class BoundedRequest {
    private final Semaphore sem;
    private AtomicInteger concurrent = new AtomicInteger(0);
    private AtomicInteger count = new AtomicInteger(0);

    public BoundedRequest(int capacity) {
        this.sem = new Semaphore(capacity, true);
    }

    public void service(int request) throws InterruptedException {

        // 业务开始前,申请流量,计算并发数,+1
        sem.acquire();
        concurrent.incrementAndGet();

        // 统计请求数
        count.incrementAndGet();

        try {
            System.out.printf("request is %d, concurrent is %d, total is %d
", request, concurrent.get(), count.get());
        } finally {
            // 业务结束后,释放流量,计算并发数,-1
            sem.release();
            concurrent.decrementAndGet();
        }
    }

    public static void main(String[] args) {
        final ExecutorService es = Executors.newFixedThreadPool(20);
        final BoundedRequest br = new BoundedRequest(10);
        final Random r = new Random();

        try {
            // 并发50个请求调用service,但service只能容许10个并发
            for (int i = 0; i < 50; i++) {
                es.execute(new Runnable() {

                    @Override
                    public void run() {
                        // TODO Auto-generated method stub
                        try {
                            br.service(r.nextInt());
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                });
            }
        } finally {
            es.shutdown();
        }
    }
}

  运行结果:

request is -67903609, concurrent is 1, total is 1
request is -978201954, concurrent is 10, total is 10
request is 439522563, concurrent is 9, total is 11
request is -2146942906, concurrent is 9, total is 9
request is -954896952, concurrent is 8, total is 8
request is 2067503735, concurrent is 10, total is 15
request is 810919054, concurrent is 10, total is 16
request is 1003731274, concurrent is 7, total is 7
request is 2063957886, concurrent is 6, total is 6
request is 644103498, concurrent is 5, total is 5
request is 342441876, concurrent is 10, total is 20
request is 1815314900, concurrent is 4, total is 4
request is 404321393, concurrent is 3, total is 3
request is -1235311585, concurrent is 9, total is 22
request is -1237413462, concurrent is 2, total is 2
request is -127636333, concurrent is 8, total is 23
request is -2097660524, concurrent is 8, total is 24
request is 417352835, concurrent is 8, total is 25
request is 444231559, concurrent is 10, total is 21
request is 1685762597, concurrent is 7, total is 26
request is 1762045311, concurrent is 10, total is 19
request is -1051782985, concurrent is 7, total is 27
request is 1003679426, concurrent is 10, total is 18
request is 1248691488, concurrent is 10, total is 17
request is 2132860492, concurrent is 10, total is 34
request is 1747138502, concurrent is 10, total is 14
request is -810925416, concurrent is 10, total is 14
request is -465530125, concurrent is 8, total is 12
request is 1240058852, concurrent is 7, total is 35
request is 1913104291, concurrent is 10, total is 33
request is -2066217933, concurrent is 6, total is 36
request is 353906021, concurrent is 9, total is 32
request is 950986644, concurrent is 8, total is 31
request is 1714410402, concurrent is 5, total is 38
request is -1877803772, concurrent is 8, total is 30
request is -109502915, concurrent is 10, total is 45
request is -2082474915, concurrent is 10, total is 46
request is -501217703, concurrent is 7, total is 29
request is -951197497, concurrent is 6, total is 28
request is 764730170, concurrent is 10, total is 48
request is -1313695793, concurrent is 9, total is 47
request is 946662034, concurrent is 10, total is 44
request is -1263988957, concurrent is 9, total is 43
request is 1928455344, concurrent is 8, total is 42
request is -1302384905, concurrent is 7, total is 41
request is -828386720, concurrent is 6, total is 40
request is -196657133, concurrent is 5, total is 39
request is -118394562, concurrent is 3, total is 50
request is 801023059, concurrent is 6, total is 37
request is -1885395270, concurrent is 10, total is 49

   3、有界队列

package com.wulinfeng.concurrent;

import java.util.concurrent.Semaphore;

public class BoundedBuffer<E> {
    // 两个信号量加起来等于缓存大小capacity
    private final Semaphore availableItems, availableSpaces;
    private final E[] items;
    private int putPosition = 0, takePosition = 0;

    public BoundedBuffer(int capacity) {
        availableItems = new Semaphore(0); //允许从缓存中删除的元素个数
        availableSpaces = new Semaphore(capacity); // 允许插入到缓存中的元素个数
        items = (E[]) new Object[capacity];
    }

    public boolean isEmpty() {
        return availableItems.availablePermits() == 0;
    }

    public boolean isFull() {
        return availableSpaces.availablePermits() == 0;
    }

    public void put(E x) throws InterruptedException {
        availableSpaces.acquire();
        doInsert(x);
        availableItems.release();
    }

    public E take() throws InterruptedException {
        availableItems.acquire();
        E item = doExtract();
        availableSpaces.release();
        return item;
    }

    private synchronized void doInsert(E x) {
        int i = putPosition;
        items[i] = x;
        putPosition = (++i == items.length) ? 0 : i;
    }

    private synchronized E doExtract() {
        int i = takePosition;
        E x = items[i];
        items[i] = null;
        takePosition = (++i == items.length) ? 0 : i;
        return x;
    }
}

  上面有界队列是通过两个信号量实现的,具体测试类见栅栏http://www.cnblogs.com/wuxun1997/p/6816553.html

原文地址:https://www.cnblogs.com/wuxun1997/p/6822868.html