JAVA-AbstractQueuedSynchronizer-AQS

 

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CountDownLatchExample1 {
    private final   static  int threadCount=200;
    public static void main(String[] args) throws  Exception{
        ExecutorService executorService= Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch=new CountDownLatch(threadCount);
        for(int i=0;i<threadCount;i++){
            final int threadNum=i;
            executorService.execute(()->{
                try{
                    test(threadNum);
                }catch (Exception e){
                    log.error("exception",e);
                }finally {
                    countDownLatch.countDown();
                }
            });
        }
        //countDownLatch.await();  //保证前面的线程都执行完
        countDownLatch.await(10, TimeUnit.MILLISECONDS);  //在规定时间内执行     
        log.info("finish");
    }
    private static void test(int threadNum) throws Exception{
        Thread.sleep(100);
        log.info("{}",threadNum);
        Thread.sleep(100);
    }
}

  

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample1 {
    private final   static  int threadCount=20;
    public static void main(String[] args) throws  Exception{
        ExecutorService executorService= Executors.newCachedThreadPool();
        final Semaphore semaphore=new Semaphore(3);
        for(int i=0;i<threadCount;i++){
            final int threadNum=i;
            executorService.execute(()->{
                try{
                    if(semaphore.tryAcquire()){  //可以指定执行的时间
                        test(threadNum);
                        semaphore.release();
                    }
                    //semaphore.acquire(3);//获取多个许可
                    //semaphore.acquire();//获取一个许可
                   // test(threadNum);
                   // semaphore.release();//释放一个许可
                    //semaphore.release(3);//释放多个许可
                }catch (Exception e){
                    log.error("exception",e);
                }
            });
        }

        log.info("finish");
    }
    private static void test(int threadNum) throws Exception{
        log.info("{}",threadNum);
        Thread.sleep(100);
    }
}

  

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;

@Slf4j
public class CyclicBarrierExamle1 {
    //private static CyclicBarrier barrier=new CyclicBarrier(5);
    private static CyclicBarrier barrier=new CyclicBarrier(5,()->{
       log.info("callback is running");
    });
    public static void main(String[] args) throws Exception{
        ExecutorService executor= Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            final int threadNum=i;
            Thread.sleep(1000);
            executor.execute(()->{
                try {
                    race(threadNum);
                }catch (Exception e){
                    log.error("exception",e);
                }
            });
        }
        executor.shutdown();
    }
    private static void race(int threadNum) throws Exception{
        Thread.sleep(100);
        log.info("{} is ready",threadNum);
        //barrier.await();
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        }catch (BrokenBarrierException |TimeoutException e){
            log.warn("BrokenBarrierException",e);
        }

        log.info("{}continue",threadNum);
    }
}

  

在读取中写入的方法:ReentrantReadWriteLock

线程安全的写法。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class LockExample1 {
    //请求总数
    public static  int clientTotal=5000;
    //同时并发执行的线程数
    public static int threadTotal=200;
    public static int count=0;
    private final static Lock lock=new ReentrantLock();

    public static void main(String[] args) throws Exception{
        ExecutorService executorService= Executors.newCachedThreadPool();
        final Semaphore semaphore=new Semaphore(threadTotal);
        final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
        for(int i=0;i<clientTotal;i++){
            final  int count=i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add(count);
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size{}",count);
    }
    private static void add(int i){
         lock.lock();
         try {
             count++;
         }finally {
             lock.unlock();
         }
    }
}

  

import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Slf4j
public class LockExample2 {
    private final Map<String, Data> map=new TreeMap<>();
    private final ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
    private final Lock readLock=lock.readLock();
    private final Lock writeLock=lock.writeLock();
    public Data get(String key){
      readLock.lock();
      try {
          return map.get(key);
      }finally {
          readLock.unlock();
      }
    }
    public Set<String> getAllKeys(){
       readLock.lock();
       try {
           return map.keySet();
       }finally {
           readLock.unlock();
       }
    }
    public Data put(String key, Data value){
        writeLock.lock();
        try {
            return map.put(key, value);
        }finally {
            readLock.unlock();
        }
    }
    class Data{

    }
}

  

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;

@Slf4j
public class LockExample5 {
    //请求总数
    public static  int clientTotal=5000;
    //同时并发执行的线程数
    public static int threadTotal=200;
    public static int count=0;
    private final static StampedLock lock=new StampedLock();

    public static void main(String[] args) throws Exception{
        ExecutorService executorService= Executors.newCachedThreadPool();
        final Semaphore semaphore=new Semaphore(threadTotal);
        final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
        for(int i=0;i<clientTotal;i++){
            final  int count=i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add(count);
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("size{}",count);
    }
    private static void add(int i){
        long stamp= lock.writeLock();
        try {
            count++;
        }finally {
            lock.unlock(stamp);
        }
    }
}

  

import java.util.concurrent.locks.StampedLock;
public class LockExample4 {
    class Point{
        private double x,y;
        private  final StampedLock sl=new StampedLock();
        void move(double deltaX,double deltaY){
            long stamp=sl.writeLock();
            try{
                x+=deltaX;
                y+=deltaY;
            }finally {
                sl.unlockWrite(stamp);
            }
        }

        //乐观锁案例
        double distanceFromOrigin(){
            long stamp=sl.tryOptimisticRead();//获得一个乐观读锁
            double currentX=x,currentY=y;//将两个字段读入本地局部变量
            if(!sl.validate(stamp)){  //检查发出乐观锁后同时是否有其他锁发生
                stamp=sl.readLock();//没有,再次获取一个读悲观锁
                try{
                    currentX=x;  //将两个字段读入本地局部变量
                    currentY=y; //将两个字段读入本地局部变量
                }finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX*currentX+currentY*currentY);
        }

        //悲观锁读写案例
        void moveIfAtOrigin(double newX,double newY){
            long stamp=sl.readLock();
            try {
                while (x==0.0&&y==0.0){ //循环,检查当前状态是否符合
                    long ws=sl.tryConvertToWriteLock(stamp);//将读锁转化为写锁
                    if(ws!=0L){  //确认转为写锁是否成功
                       stamp=ws;  //如果成功,替换票据
                       x=newX;   //进行状态改变
                       y=newY;   //进行状态改变
                        break;
                    }else {
                        sl.unlockRead(stamp);//如果不能成功转化为写锁
                        stamp=sl.writeLock();//显示直接进行写锁,然后通过循环再试
                    }
                }
            }finally {
                sl.unlock(stamp);//释放读锁和写锁
            }
        }
    }
}

  

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class LockExample6 {
    public static void main(String[] args) {
        ReentrantLock reentrantLock=new ReentrantLock();
        Condition condition=reentrantLock.newCondition();
        new Thread(()->{
            try {
                reentrantLock.lock();
                log.info("wait signal");
                condition.await();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            log.info("get signal");
            reentrantLock.unlock();
        }).start();

        new Thread(()->{
            reentrantLock.lock();
            log.info("get lock");
            try {
                Thread.sleep(300);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal~");
            reentrantLock.unlock();
        }).start();
    }
}

  

原文地址:https://www.cnblogs.com/sunliyuan/p/11259994.html