多线程编程:多线程并发制单的开发记录【一】

最近天气很冷,记得保暖,我在帝都向各位问好。一次舒服的小长假回来,是不是都忘了增删改查怎么敲了,那就赶紧粘贴一会儿吧,别等着公司把你优化掉。

好了,老习惯,首先给各位脑补一下多线程必备的知识。

进程和线程:

下图是在来自知乎用户的解释,个人感觉狠到位

       进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

       线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。线程是程序中一个单一的顺序控制流程。进程内一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指运行中的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。

进程和线程的关系:

线程安全:

       何为线程安全?就是应用中多个线程访问某一个类(对象或方法)时,这个类始终能表现出正确的行为,那么这个类(对象或方法)就是线程安全的。

       线程安全就是多线程访问时,采用了加锁机制(在多线程安全上加锁也是一门技术活,不是说对于共享资源你简单加个同步关键字或定义成同步方法就OK了的,锁的不合理则会大大影响程序的性能,甚至影响到业务,这里本人亲身经历过一个问题,锁粒度大小对程序的影响,参考:http://www.cnblogs.com/1315925303zxz/p/7561236.html),当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致或者数据污染。

        线程不安全呢,就是不提供数据访问保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据。这里的加锁机制常见的如:synchronized。

由spring管理的线程池进行并发制单的业务设计:

业务需求:多个通道进行数据采集,数据采集就是拿着VIN码去抓取数据,但是当VIN码很多时,采集的速度就很慢,所以实施多线程多并发进行采集。

1、spring管理线程池的配置,这里需要说明的一点就是核心线程数和最大线程数的配置一定要按照自己业务的并发量来设定,否则不仅不会提升并发效率,反而会出现各种数据污染的情况。

 1    <!-- 异步线程池 -->
 2     <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
 3         <!-- 核心线程数,最小线程数 默认为1 -->
 4         <property name="corePoolSize" value="4" />
 5         <!-- 最大线程数,默认为Integer.MAX_VALUE -->
 6         <property name="maxPoolSize" value="8" />
 7         <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
 8         <property name="queueCapacity" value="2000" /> 
 9         <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
10         <property name="keepAliveSeconds" value="300" />
11         <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
12         <property name="rejectedExecutionHandler">
13             <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
14             <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
15             <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
16             <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
17             <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
18         </property>
19     </bean>

     <!-- 一汽大众和奥迪多并发制单 -->
      <bean id="multiThreadTask_VW_AD" class="cn.base.thread.MultiThreadTask_VW_AD"/>

2、线程制单业务(这里是伪代码),将符合条件的VIN码进行数据采集,否则记录到指定容器中统一处理(退单)。

初始化任务(VIN码)队列:

 1 public 
 2 class InitQueue {
 3     
 4     private static final Logger logger = LoggerFactory.getLogger(MultiThreadTask_VW_AD.class);
 5     
 6     /**
 7      *    并发队列
 8      */
 9     private static Queue<String> queue = null;
10     
11     //初始化并发队列
12     private static void initQueue(){
13         logger.info("初始化并发队列:{}");
14         if(queue == null){
15             queue = new ConcurrentLinkedQueue<String>();    //并发队列    
16         }
17         String tasklist = "JF1GH78F18G03614,JF1SH95F6AG110830,JF1SJ94D7DG010387333,JF1SH92F9CG26924,JF1SH92F5BG215090,JF1SH92F5BG222556,JF1SH92F4CG279994,JF1BR96D7CG114298,JF1BR96D0BG078632,JF1SH95F9AG094011,JF1SH98FXAG186997,JF1BM92D8BG022510,JF1BM92DXAG013855,JF1BM94D8EG0366";
18         String[] split = tasklist.split(",");
19         List<String> task = Arrays.asList(split);    //数组转集合
20         queue.addAll(task);        //按照集合中元素的顺序将集合中全部元素放进队列
21     }
22     
23     //对外提供一个任务队列
24     public static Queue<String> getQueue(){
25         initQueue();
26         return queue;
27     }
28 
29 }
View Code

制单:

 1 public 
class MultiThreadTask_VW_AD implements Runnable,Serializable{ 4 private static final Logger logger = LoggerFactory.getLogger(MultiThreadTask_VW_AD.class); 5 private static final long serialVersionUID = 0; 6 7 private static final Object lock = new Object(); //静态锁,任务队列在某一时刻只能由一个线程操作 8 private static List<String> errorVinList = new ArrayList<>(); //存放不合理VIN码的集合 9 10 private static Queue<String> taskQueue = InitQueue.getQueue(); //任务队列 11 12 //数据采集 13 @Override 14 public void run() { 15 synchronized (lock) { 16 int queueSize = taskQueue.size(); 17 if(queueSize > 0){ 18 logger.info("任务队列目前有{}个任务",queueSize); 19 String thisVin = taskQueue.poll(); 20 logger.info("当前任务VIN码:{}",thisVin); 21 if(thisVin.length() == 17){ 22 System.out.println("采集任务已完成【"+Thread.currentThread().getName()+"】"); 23 }else{ 24 System.out.println("不进行采集【"+Thread.currentThread().getName()+"】"); 25 errorVinList.add(thisVin); 26 System.err.println("未采集的VIN码有:"+errorVinList); 27 } 28 }else{ 29 logger.info("任务队列目前无任务"); 30 } 31 } 32 } 33 34 }

       再次强调:加锁的时候一定要掌握好锁的粒度,不然会影响并发效率。

3、模拟多通道进行数据采集。

 1 public 
 2 class ThreadPoolController {
 3     
 4     private static final Logger logger = LoggerFactory.getLogger(ThreadPoolController.class);
 5     
 6     /**
 7      * 一汽大众、奥迪多并发进行数据采集:
 8      */
 9     @Test
10     public void vwadTest(){
11         //加载线程池配置文件
12         ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:springs/applicationContext-threadpool.xml");
13         //通过applicationContext获取异步线程池对象threadPoolTaskExecutor
14         ThreadPoolTaskExecutor threadPool = (ThreadPoolTaskExecutor) ctx.getBean("threadPoolTaskExecutor");
15         //获取品牌制单业务对象(由spring管理的)
16         MultiThreadTask_VW_AD multiThreadTask_VW_AD = (MultiThreadTask_VW_AD) ctx.getBean("multiThreadTask_VW_AD");
17         for(int i=1; i<=15; i++){
18             threadPool.execute(multiThreadTask_VW_AD);
19         }
20         
21     }

后期继续更新多线程并发的文章,因为被这家伙给看上了,不得不深入剖析了,一直在学习的路上。

原文地址:https://www.cnblogs.com/1315925303zxz/p/7649823.html