线程池基础二

我们来看程序的代码:

package com.bjsxt.height.concurrent019;

import java.util.concurrent.CountDownLatch;

public class UseCountDownLatch {

    public static void main(String[] args) {
        
        final CountDownLatch countDown = new CountDownLatch(2);
        
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("进入线程t1" + "等待其他线程处理完成...");
                    countDown.await();
                    System.out.println("t1线程继续执行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t1");
        
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t2线程进行初始化操作...");
                    Thread.sleep(3000);
                    System.out.println("t2线程初始化完毕,通知t1线程继续...");
                    countDown.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t3线程进行初始化操作...");
                    Thread.sleep(4000);
                    System.out.println("t3线程初始化完毕,通知t1线程继续...");
                    countDown.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        t1.start();
        t2.start();
        t3.start();
        
        
        
    }
}

程序运行的结果是:

t3线程进行初始化操作...
t2线程进行初始化操作...
t2线程初始化完毕,通知t1线程继续...
t3线程初始化完毕,通知t1线程继续...
t1线程继续执行...

t1线程只有等t2和t3线程初始化完成之后才能执行.....

package com.bjsxt.height.concurrent019;
import java.io.IOException;  
import java.util.Random;  
import java.util.concurrent.BrokenBarrierException;  
import java.util.concurrent.CyclicBarrier;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors; 
public class UseCyclicBarrier {

    static class Runner implements Runnable {  
        private CyclicBarrier barrier;  
        private String name;  
        
        public Runner(CyclicBarrier barrier, String name) {  
            this.barrier = barrier;  
            this.name = name;  
        }  
        @Override  
        public void run() {  
            try {  
                Thread.sleep(1000 * (new Random()).nextInt(5));  
                System.out.println(name + " 准备OK.");  
                barrier.await();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (BrokenBarrierException e) {  
                e.printStackTrace();  
            }  
            System.out.println(name + " Go!!");  
        }  
    } 
    
    public static void main(String[] args) throws IOException, InterruptedException {  
        CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
        ExecutorService executor = Executors.newFixedThreadPool(3);  
        
        executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
        executor.submit(new Thread(new Runner(barrier, "lisi")));  
        executor.submit(new Thread(new Runner(barrier, "wangwu")));  
  
        executor.shutdown();  
    }  
  
}  

程序的运行结果是:

lisi 准备OK.
zhangsan 准备OK.
wangwu 准备OK.
wangwu Go!!
lisi Go!!
zhangsan Go!!

future模式看我们之前的多线程基础一future模式,jdk给我们提供了封装好的工具类

package com.bjsxt.height.concurrent019;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class UseFuture implements Callable<String>{
    private String para;
    
    public UseFuture(String para){
        this.para = para;
    }
    
    /**
     * 这里是真实的业务逻辑,其执行可能很慢
     */
    @Override
    public String call() throws Exception {
        //模拟执行耗时
        Thread.sleep(5000);
        String result = this.para + "处理完成";
        return result;
    }
    
    //主控制函数
    public static void main(String[] args) throws Exception {
        String queryStr = "query";
        //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
        FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
        
        FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
        //创建一个固定线程的线程池且线程数为1,
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //这里提交任务future,则开启线程执行RealData的call()方法执行
        //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值
        
        Future f1 = executor.submit(future);        //单独启动一个线程去执行的
        Future f2 = executor.submit(future2);
        System.out.println("请求完毕");
        
        try {
            //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
            System.out.println("处理实际的业务逻辑...");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
        System.out.println("数据:" + future.get());
        System.out.println("数据:" + future2.get());
        
        executor.shutdown();
    }

}

程序的运行结果是:

请求完毕
处理实际的业务逻辑...
数据:query处理完成
数据:query处理完成

 

package com.bjsxt.height.concurrent019;

import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Semaphore;  
  
public class UseSemaphore {  
  
    public static void main(String[] args) {  
        // 线程池  
        ExecutorService exec = Executors.newCachedThreadPool();  
        // 只能5个线程同时访问  
        final Semaphore semp = new Semaphore(5);  
        // 模拟20个客户端访问  
        for (int index = 0; index < 20; index++) {  
            final int NO = index;  
            Runnable run = new Runnable() {  
                public void run() {  
                    try {  
                        // 获取许可  
                        semp.acquire();  
                        System.out.println("Accessing: " + NO);  
                        //模拟实际业务逻辑
                        Thread.sleep((long) (Math.random() * 10000));  
                        // 访问完后,释放  
                        semp.release();  
                    } catch (InterruptedException e) {  
                    }  
                }  
            };  
            exec.execute(run);  
        } 
        
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        //System.out.println(semp.getQueueLength());
        
        
        
        // 退出线程池  
        exec.shutdown();  
    }  
  
}  

每次只能运行5个线程同时执行某个方法

程序运行的结果是:

Accessing: 0
Accessing: 3
Accessing: 2
Accessing: 1
Accessing: 4
Accessing: 5
Accessing: 6
Accessing: 7
Accessing: 8
Accessing: 9
Accessing: 10
Accessing: 11
Accessing: 12
Accessing: 13
Accessing: 14
Accessing: 15
Accessing: 16
Accessing: 17
Accessing: 18
Accessing: 19

使用信号量实现流量的控制,达到限流的功能,减少业务访问的压力

原文地址:https://www.cnblogs.com/kebibuluan/p/7641348.html