Java.Util.Concurrent包中的一些接口和类

1.Callable<V> :接口,多线程的一种实现方式,实现类重写方法,重写的call()方法有返回值或者抛出异常,需要配合着FutureTask类(实现了Runnable接口)使用:

 1 public class CallableTest {
 2     public static void main(String[] args) {
 3         Test t=new Test();
 4         FutureTask<Integer> f=new FutureTask<>(t);  //获取结果,与callable配合着使用  
//FutureTask类的构造函数必须使用Runnable,或者Callable接口的实例
5 new Thread(f).start(); //FutureTask也实现了Runnable接口 6 Integer i= null; 7 try { 8 i = f.get(); //FutureTask的方法,获得个、返回的结果 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } catch (ExecutionException e) { 12 e.printStackTrace(); 13 } 14 System.out.println(i); 15 16 } 17 } 18 class Test implements Callable<Integer>{ //实现Runnable接口,重写call()方法,有返回值 19 int sum=0; 20 @Override 21 public Integer call() throws Exception { 22 for (int i = 0; i <=100; i++) { 23 sum=sum+i; 24 } 25 return sum; 26 } 27 }

2.Semaphore类:信号量,直接new对象,semaphore.acquire():获取信号量,如果获取失败如阻塞,semaphore.relase():释放信号量,semaphore.availablePermits():获取信号量的编号。

 Semaphore s2=new Semaphore(5,true);
 Semaphore s1=new Semaphore(5);  
//两种构造方法,(int,boolean) int:资源的访问量 boolean:是否是公平竞争 //公平竞争:等的越久越先执行
try {
s1.acquire();//请求资源,如果资源不可用该线程阻塞,直到有可用资源为止;
} catch (InterruptedException e) {
e.printStackTrace();
}
s1.release();//释放资源
 

 一个信号量的停车场案例:

public class Parking2 implements Runnable{
Semaphore semaphore;
int id;

public Parking2(Semaphore semaphore, int id) {
this.semaphore = semaphore;
this.id = id;
}

public static void main(String[] args) {
Semaphore semaphore=new Semaphore(4); //4个车位
ExecutorService executorService= Executors.newCachedThreadPool();//一个线程池
for (int i = 1; i <= 25; i++) {
executorService.execute(new Parking2(semaphore,i));
}
executorService.shutdown();

}

@Override
public void run() {
try {
push();
Thread.sleep(1000);
pop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//停
public void push() throws InterruptedException {
if(semaphore.availablePermits()>0){
System.out.println(this.id+"可以停车!");
}else{
System.out.println(this.id+"请等待,没有车位!");
}
semaphore.acquire();
System.out.println(this.id+"停车成功!");
}
//取
public void pop(){
semaphore.release();
System.out.println(this.id+"取车成功!");
}
}

3.ReentrantLock和Condition:

ReentrantLock:可重入排他锁,乐观锁,直接new对象,需要手动的加和释放锁,但比较灵活。

public class ReentrantTest implements Runnable{

Test t;
int id;

public ReentrantTest(Test t, int id) {
this.t = t;
this.id = id;
}

@Override
public void run() {
try {
t.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
ExecutorService executorService= Executors.newCachedThreadPool();//线程池
Test t=new Test();
//保证每一个线程访问同一个线程池,和同一把锁
for (int i = 1; i <= 30; i++) {
executorService.execute(new ReentrantTest(t,i));
}
executorService.shutdown();//关闭
}
}
class Test{
ReentrantLock reentrantLock=new ReentrantLock();//new一个锁

public void get() throws InterruptedException {
reentrantLock.lock(); //上锁
System.out.println(Thread.currentThread().getName()+"_启动");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"_结束");
//释放锁
reentrantLock.unlock();
}
}

Condition:是一个接口,创建Condition对象需要ReentrantLock.newCondition();主要有await():导致当前线程等待,singal():唤醒一个等待线程。如下代码实例证明:1.await(),signal()只能在当前对象获得锁(ReentrantLock)是被调用;2.当对象调用await()方法后,会交出锁,让其他线程对象争用;。在调用signal()后,锁对象又回到await()调用处,继续执行。

public class ConditionTest {
    static ReentrantLock lock=new ReentrantLock();
    static Condition condition=lock.newCondition();  //condition是一个接口,有await(),singal()方法
    static int count=1;

    public static void main(String[] args) throws InterruptedException {
        Runnable r1= () -> {
            lock.lock();
            System.out.println("r1带锁启动。。。");
            try {
                System.out.println("r1即将被锁住");
                condition.await();   //等待,并且交出锁
                System.out.println("r1又获新生!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("r1执行完毕");
                lock.unlock();
            }
        };

        Runnable r2= () -> {
            lock.lock();
            System.out.println("r2带锁启动。。。");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("r1即将被唤醒!");
            condition.signal();  //唤醒r1
            System.out.println("r2执行完毕");
            lock.unlock();

        };
        new Thread(r1).start();  //让r1先执行,但条件不足,只能先等待

        new Thread(r2).start();
    }
}

4.BlockingQueue:是一个阻塞队列的超类接口,此接口的实现类有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 。每个类的具体使用可以参考API。以下的实例使用LinkedBlockingQueue做的。

 1 //设计一个阻塞队列
 2 //每个多线程对象都会往queue(为对象共享static)中放元素会涉及:
 3 //      --->如果队列已满就阻塞等待,--->如果队列空就阻塞等待
 4 //本例中一共有三个执行:  Main主线程, 往blockingqueue放元素的线程 从blockingqueue中取元素的线程
 5          //三者一起执行
 6 public class TestBlockingQueue implements Runnable{
 7     //属性:
 8     int id;  //每个对象编号
 9     static BlockingQueue<String> queue=new LinkedBlockingQueue<>(3);
10 
11     //构造:
12     public TestBlockingQueue(int id) {
13         this.id = id;
14     }
15 
16     @Override
17     public void run() {
18         //每个线程元素都会把自己的ID放到队列中
19         try {
20             queue.put(String.valueOf(this.id));  //put():会抛异常
21             System.out.println(this.id+"已被放进队列");
22         } catch (InterruptedException e) {
23             e.printStackTrace();
24         }
25     }
26 
27     public static void main(String[] args) {
28         //定义一个线程池:
29         ExecutorService executorService= Executors.newCachedThreadPool();
30         for (int i = 0; i < 15; i++) {
31             //线程池执行线程对象
32             executorService.execute(new TestBlockingQueue(i));
33         }
34         Thread t=new Thread(()->{
35            //run()的方法体
36             while (true){
37                 try {
38                     Thread.sleep(100);
39                     if(queue.isEmpty()){//静态属性可以类名直接调
40                         break;
41                     }
42                     //拿出队列中的元素
43                     String string=TestBlockingQueue.queue.take();//拿出并删除队列头的元素
44                     System.out.println(string+"被拿出");
45                 } catch (InterruptedException e) {
46                     e.printStackTrace();
47                 }
48             }
49         });
50 
51         //让线程池去执行线程对象t
52         executorService.execute(t);
53 
54         //关闭线程池
55         executorService.shutdown();
56     }
57 }

5.CompletionService:是一个接口,创建实现类对象,是将生产新的异步任务和使用已完成任务结果分离开来的服务。有实现类:ExecutorCompletionService,方法:submit(),take(),具体用法如下实例:

 1 //测试: CompletionService = new ExecutorCompletionService(Executor实现类)--->接口,实现类,让线程池来执行任务
 2 //方法: 1.completionService.submit(Callable<V>的实现类)-->提交执行线程对象
 3    //   2.completionService.take().get()-->检索并删除下一个已完成的任务,get()获得该线程的执行结果
 4 //会阻塞等待
 5 public class CompletionServiceTest implements Callable<String> {
 6    int id;
 7 
 8     public CompletionServiceTest(int id) {
 9         this.id = id;
10     }
11 
12     @Override
13     public String call() throws Exception {
14         //每个线程打印,并返回自己的编号
15         System.out.println(this.id+"_已启动");
16         Thread.sleep(100);
17         System.out.println(this.id+"_结束");
18         return this.id+"_thread,call()方法的返回值";
19     }
20 
21     public static void main(String[] args) {
22         //线程池
23         ExecutorService executorService= Executors.newCachedThreadPool();
24                                                                               //放的是executor的实现类(线程池对象)
25         CompletionService <String>completionService= new ExecutorCompletionService<String>(executorService);
26                     //使用ExecutorCompletionService作为执行对象,LinkedBlockingQueue:作为完成队列
27 
28         for (int i = 0; i < 15; i++) {
29             completionService.submit(new CompletionServiceTest(i));
30              //提交执行
31         }
32         for (int i = 0; i < 15; i++) {
33             try {
34                 try {
35                     System.out.println(completionService.take().get());
36                        //并打印返回值
37                       //take():获取已完成的任务的结果,并从完成队列中删除该任务
38                 } catch (ExecutionException e) {
39                     e.printStackTrace();
40                 }
41             } catch (InterruptedException e) {
42                 e.printStackTrace();
43             }
44         }
45         //线程池关闭
46         executorService.shutdown();
47     }
48 }

6.CountDonwLatch:是继承自Object的类,允许一个或一组线程等待其他线程完成的辅助类

经典案例:10个人一起赛跑,得同时出发。

 1 //CountDownLatch是继承自Object的类
 2 //1.构造方法,直接new: countdownlatck=new CountDownLatch(int)
 3 //2.主要方法:countdownlatch.await():等待计数到达0,后开始线程执行,没有就阻塞等待
 4 //           countdownlatch.countDown():计数减一
 5 //一个10一起赛跑的案例: 要求:10个线程一起开始运行,等10个线程都运行结束后输出比赛结束的信息
 6 
 7 import java.util.concurrent.CountDownLatch;
 8 import java.util.concurrent.ExecutorService;
 9 import java.util.concurrent.Executors;
10 
11 public class CountDownLatchTest {
12     //比赛开始倒计时:
13     static CountDownLatch start=new CountDownLatch(1);
14 
15     //比赛结束倒计时:
16     static CountDownLatch end=new CountDownLatch(10);//10名选手,每来一个就减少一个
17 
18     public static void main(String[] args) {
19         //线程池
20         ExecutorService pool= Executors.newFixedThreadPool(10);
21         System.out.println("这是一场10人制比赛~~~");
22         for (int i = 0; i < 10; i++) {  //创建10个线程
23             int id=i+1;
24             Runnable runner= () -> {
25                 System.out.println("运动员:"+id+"准备就绪!");
26                 try {
27                     start.await();  //等待开始的指令
28 
29                     Thread.sleep(1000); //跑步中
30 
31                     System.out.println("运动员"+id+"已到达!");
32                 } catch (InterruptedException e) {
33                     e.printStackTrace();
34                 }finally {
35                     end.countDown();  //每跑完一个人就让计数减一
36                 }
37             };
38             pool.execute(runner);  //执行10个线程,都阻塞在开始指令之前
39         }
40 
41         start.countDown();
42 
43         try {
44             end.await();  //等待比赛结束
45         } catch (InterruptedException e) {
46             e.printStackTrace();
47         }
48         System.out.println("全场比赛结束!");
49         //关闭线程池
50         pool.shutdown();
51     }
52 }

7.CyclicBarrier:是一个类,作用是:允许一组线程互相等待到达一个屏障点前的同步辅助,这个屏障在所有等待的线程被释放后可以重新使用,这是该屏障称为循环(CountDownLatch不同之处)。

 1 //一个旅游团的例子
 2 //构造方法:cyclicBarrier=new CyclicBarrier(int)-->int:要等待的线程数
 3 //重要方法:await()-->当所有线程都执行了该方法,就一起继续向前执行,否则就等待
 4 public class CyclicBarrierTest {
 5     //属性: 三种策略到达5个城市的时间数组: 西安 北京 重庆 成都 杭州
 6     static int [] bycar={1,2,3,4,5};
 7     static int [] bybus={2,3,4,5,6};
 8     static int [] onfoot={3,4,5,6,7};
 9 
10     static String getNow(){
11         //获取现在的时间
12         SimpleDateFormat sdf=new SimpleDateFormat("hh:mm:ss");
13         return sdf.format(new Date());
14     }
15     static class Tour implements Runnable{
16         CyclicBarrier cyclicBarrier=new CyclicBarrier(3); //三家旅行公司,采用三种celue
17         String tourName; //旅行店的名字
18         int [] time;
19 
20         public Tour(CyclicBarrier cyclicBarrier, String tourName,int [] time) {
21             this.cyclicBarrier = cyclicBarrier;
22             this.tourName = tourName;
23             this.time=time;
24         }
25 
26         @Override
27         public void run() {
28             try {
29                 Thread.sleep(time[0]*1000);
30                 System.out.println(getNow()+":"+this.tourName+"到达-->西安");
31                 try {
32                     cyclicBarrier.await();   //等待其他线程执行到此处
33 
34                     //继续执行:
35                     Thread.sleep(time[1]*1000);
36                     System.out.println(getNow()+":"+this.tourName+"到达-->北京");
37                     cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
38 
39                     Thread.sleep(time[2]*1000);
40                     System.out.println(getNow()+":"+this.tourName+"到达-->重庆");
41                     cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
42 
43                     Thread.sleep(time[3]*1000);
44                     System.out.println(getNow()+":"+this.tourName+"到达-->成都");
45                     cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
46 
47                     Thread.sleep(time[4]*1000);
48                     System.out.println(getNow()+":"+this.tourName+"到达-->杭州");
49                     //cyclicBarrier.await();  //等待其他两个团到达北京,循环使用
50                     
51                 } catch (BrokenBarrierException e) {
52                     e.printStackTrace();
53                 }
54             } catch (InterruptedException e) {
55                 e.printStackTrace();
56             }
57         }
58     }
59     public static void main(String[] args) {
60         //定义线程池
61         ExecutorService pool= Executors.newFixedThreadPool(3);
62 
63         //定义CyclicBarrier
64         CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
65 
66         //线程执行线程,分别执行
67         pool.execute(new Tour(cyclicBarrier,"T_Walk",onfoot));
68         pool.execute(new Tour(cyclicBarrier,"T_Car",bycar));
69         pool.execute(new Tour(cyclicBarrier,"T_Bus",bybus));
70 
71         //关闭线程池
72         pool.shutdown();
73     }
74 }

8.Future<V>:是一个接口,实现类:FutureTask,CompletableFuture等,get()方法将返回V类型的数据。提供方法来检查计算是否完成,等待其完成,并检索计算结果。 结果只能在计算完成后使用方法get进行检索,如有必要,阻塞,直到准备就绪。 取消由cancel方法执行。 提供其他方法来确定任务是否正常完成或被取消。 计算完成后,不能取消计算。 如果您想使用Future ,以便不可撤销,但不提供可用的结果,则可以声明Future<?>表格的类型,并返回null作为基础任务的结果。

9.ScheduledExecutorService:接口:(实现类:ScheduledExecutorThreadPool),作用可以让一个EexcutorService在给定延迟后执行,或者是定期执行。

 1 //ScheduledExecutorService:接口:(实现类:ScheduledExecutorThreadPool)
 2 //作用可以让一个EexcutorService在给定延迟后执行,或者是定期执行。
 3 // 方法1:pool.ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6);
 4 //参数含义:command - 要执行的任务
 5 //         initialDelay - 延迟第一次执行的时间
 6 //         period - 连续执行之间的时期
 7 //         unit - initialDelay和period参数的时间单位
 8 
 9 //方法2:ScheduledFuture<?> schedule(Runnable command,
10 //                            long delay,
11 //                            TimeUnit unit)创建并执行在给定延迟后启用的单次操作。
12 //参数
13 //command - 要执行的任务
14 //delay - 从现在开始延迟执行的时间
15 //unit - 延时参数的时间单位
16 public class ScheduledExecutorServiceTest {
17     public static void main(String[] args) {
18         //创建线程池
19         final ScheduledExecutorService pool= Executors.newScheduledThreadPool(2);
20                        //创建一个可以在给定延迟后运行,或者定期执行的线程池
21         final Runnable runnable=new Runnable() {
22             int count=0;
23             @Override
24             public void run() {
25                 System.out.println(new Date()+"runnable"+(++count));
26             }
27         };
28 
29         //1秒后运行,每隔2秒运行一次
30         ScheduledFuture<?> scheduledFuture = pool.scheduleAtFixedRate(runnable,1,2,SECONDS);
31         //2秒后运行,往后每隔5秒运行一次
32         ScheduledFuture<?> scheduledFuture1 = pool.scheduleAtFixedRate(runnable, 2, 5, SECONDS);
33         //30秒后结束关闭任务,并关闭Scheduled
34         pool.schedule(new Runnable() {
35             @Override
36             public void run() {
37                 //关闭操作
38                 scheduledFuture.cancel(true);
39                 scheduledFuture1.cancel(true);
40                 //关闭线程池
41                 pool.shutdown();
42             }
43         },30,SECONDS);
44 
45     }
46 }

以上内容参考来自Java ApI文档;

以及推荐博客

 
原文地址:https://www.cnblogs.com/xbfchder/p/10975146.html