JAVA实现多线程生产者消费者模型

  • 问题背景: 做一个爬虫,从网上爬书籍,爬下来以后插入数据库

  • 问题设计: 用多个线程同时从网上爬书,爬下来的书使用一个线程作为插入,这些线程共享同一个书籍队列,并且同时开始运行

书籍POJO

 1 package thread;
 2 
 3 public class Book {
 4     private Integer bookId;
 5     private String bookName;
 6 
 7     public Integer getBookId() {
 8         return bookId;
 9     }
10 
11     public void setBookId(Integer bookId) {
12         this.bookId = bookId;
13     }
14 
15     public String getBookName() {
16         return bookName;
17     }
18 
19     public void setBookName(String bookName) {
20         this.bookName = bookName;
21     }
22 }

爬虫线程

 1 package thread;
 2 
 3 import java.util.List;
 4 import java.util.Queue;
 5 
 6 public class CrawlerThread extends Thread {
 7     private List<Integer> bookIdList;   //待爬书籍的Id号
 8     private Queue<Book> bookQueue;  //爬完书籍的共享存储队列
 9 
10     public CrawlerThread(List<Integer> bookIdList, Queue<Book> bookQueue) {
11         this.bookIdList = bookIdList;
12         this.bookQueue = bookQueue;
13     }
14 
15     public void run() {
16         for(int i = 0; i < bookIdList.size(); i++) {
17             Book book = new Book(); //把这两步当做爬虫爬书的过程
18             book.setBookId(bookIdList.get(i));
19             book.setBookName("书名" + book.getBookId());
20 
21             try {
22                 sleep((long) (Math.random() * 1000 * 1000));  //随机sleep 100 - 1000 秒,模拟爬虫爬书时的延时过程
23             } catch (InterruptedException e) {
24                 e.printStackTrace();
25             }
26 
27             bookQueue.offer(book);  //爬完的书籍插入共享队列
28         }
29     }
30 }

插入线程

 1 package thread;
 2 
 3 import java.util.Queue;
 4 
 5 public class InsertThread extends Thread {
 6     private Queue<Book> bookQueue;  //爬完书籍的共享存储队列
 7 
 8     public InsertThread(Queue<Book> bookQueue) {
 9         this.bookQueue = bookQueue;
10     }
11 
12     public void run() {
13         int timer = 0; //超时计时器
14 
15         while(timer < 30) { //如果连续30分钟bookQueue均为空,则超时,线程结束
16             if(bookQueue.size() != 0) { //如果队列不为空
17                 Book book;
18                 while((book = bookQueue.poll()) != null) {
19                     System.out.println(book.getBookName()); //把这步当成插入数据库吧
20                 }
21                 timer = 0;  //超时计时器清零
22             } else {
23                 try {
24                     sleep(60 * 1000);   //等待爬虫一分钟
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 }
28                 timer++;    //timer时间+1
29             }
30         }
31     }
32 }

主程序

 1 package thread;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.Queue;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 
 8 public class Start {
 9     public void main(String[] args) {
10         Queue<Book> bookQueue = new ConcurrentLinkedQueue<Book>();  //使用线程安全的队列作为共享书籍队列
11 
12         //构造待爬书籍Id号
13         List<Integer> bookIdList = new ArrayList<Integer>();
14         bookIdList.add(1);
15         bookIdList.add(2);
16         bookIdList.add(3);
17         bookIdList.add(4);
18         bookIdList.add(5);
19 
20         CrawlerThread ct = new CrawlerThread(bookIdList, bookQueue);    //此处你可以将Id列表分为多分,分配给多个线程爬,我就不写了,懒..- -
21         InsertThread it = new InsertThread(bookQueue);  //插入数据库线程
22 
23         ct.start();
24         it.start();
25     }
26 }
原文地址:https://www.cnblogs.com/zemliu/p/2502658.html