使用线程Callable实现分段获取一个url连接的资源数据

一.简介

     使用Callable的线程实现方式,返回每此请求区间的数据,当线程执行完毕以后把每个线程对应返回的数据段组成完整的数据。

 关于从网上下载一个文件分多个线程同时下载。

    主要使用到HttpURLConnection对象的setRequestProperty(String key,String value);方法

     简单说一下如何使用,setRequestProperty()方法严格上讲是HttpURLConnection的父类---URLConnection的方法,而URL.openConnection()返回的是一个URLConnection对象,而一般我们都用他的子类HttpURLConnection去做链接和网络传输工作。

二.解决思路

  1. 把指定文件(通过HttpURLConnection.getContentLength();获得文件大小),分成指定线程数下载,或者指定每个线程承担下载任务的大小
  2. 计算所需线程数
  3. 把线程获取的多段数据重组为完整的数据ByteArrayOutputStream
  4. 或者将获得的数据保存本地(使用RandomAccessFile流存储)

三.代码

  1 package demo.buffer;
  2 
  3 import java.io.BufferedInputStream;
  4 import java.io.ByteArrayOutputStream;
  5 import java.io.File;
  6 import java.io.FileNotFoundException;
  7 import java.io.IOException;
  8 import java.io.InputStream;
  9 import java.net.HttpURLConnection;
 10 import java.net.MalformedURLException;
 11 import java.net.URL;
 12 import java.util.HashMap;
 13 import java.util.Map;
 14 import java.util.concurrent.Callable;
 15 import java.util.concurrent.ExecutionException;
 16 import java.util.concurrent.ExecutorService;
 17 import java.util.concurrent.Executors;
 18 import java.util.concurrent.Future;
 19 import javax.imageio.stream.FileImageOutputStream;
 20 
 21 /**
 22  * 非直接缓冲区  读写操作
 23  * @author Administrator
 24  *
 25  */
 26 public class Test003 {
 27     public final static String sate = "http://www.baidu.com/img/bd_logo1.png";//本题所下载的文件是一个百度图标
 28     public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
 29         try {
 30             //创建一个URL对象
 31             URL url = new URL(sate);
 32             //建立一个链接
 33             HttpURLConnection conn = (HttpURLConnection) url.openConnection();
 34             //制定相应方式
 35             conn.setRequestMethod("GET");
 36             //进行连接
 37             conn.connect();
 38             //获得响应码
 39             int code = conn.getResponseCode();
 40             System.out.println("服务器响应码:"+code);
 41             //如果响应码是200 则链接下载。。。
 42             if(code == HttpURLConnection.HTTP_OK){
 43                 //获得文件长度
 44                 int contentLength = conn.getContentLength();
 45                 System.out.println("文件总长度:"+contentLength);
 46                 //假定每个线程负责下载的长度为2048k
 47                 int blockSize = 1024*2;
 48                 //如果能除尽指定线程数  否则线程数+1
 49                 int size = contentLength%blockSize==0 ? contentLength/blockSize : (contentLength/blockSize)+1;
 50 
 51                 //拼接保存多次请求的内容
 52                 ByteArrayOutputStream os = new ByteArrayOutputStream();
 53                 //键值保存每个线程的结果数据
 54                 Map<Integer, byte[]> map=new HashMap<Integer, byte[]>();
 55                 /**
 56                  * 创建固定线程数量的线程池 
 57                  * 通过Executor来设计应用程序可以简化开发过程,提高开发效率,并有助于实现并发,在开发中如果需要创建线程可优先考虑使用Executor 
 58                  */
 59                 ExecutorService pool = Executors.newFixedThreadPool(size);
 60                 for(int i = 0;i<size;i++){
 61                     //创建有返回值的任务
 62                     Callable call;
 63                     if(i==(size-1)){
 64                         call = new DownLoadThreads(i, (contentLength-(blockSize*i)),contentLength);
 65                         System.out.println("启动了第:"+i+"  线程---------------------"+(contentLength-(blockSize*i)));
 66                         Future<byte[]> fu = pool.submit(call);
 67                         map.put(i,fu.get());
 68                         break;
 69                     }
 70                     call = new DownLoadThreads(i, blockSize, (i+1)*blockSize);
 71                     System.out.println("启动了第:"+i+"  线程---------------------"+blockSize);
 72                     //执行任务并获取Future对象   
 73                     Future<byte[]> fu = pool.submit(call); 
 74                     map.put(i,fu.get());
 75                 }
 76                 //关闭线程池   
 77                 pool.shutdown();  
 78                 System.out.println("==============size"+map.size());
 79                 for(int i = 0;i<size;i++){
 80                     byte b[]=map.get(i);
 81                     os.write(b, 0, b.length-1);//文件下标是从1开始的,所以长度要从0下标开始,长度减1结束
 82                 }
 83                 byte[] b=os.toByteArray();
 84                 File file = new File("D:\tmp\02.png");
 85                 FileImageOutputStream imageOutput = new FileImageOutputStream(file);
 86                 imageOutput.write(b, 0, b.length);
 87                 imageOutput.close();
 88                 System.err.println(b.length);
 89             }
 90         } catch (MalformedURLException e) {
 91             e.printStackTrace();
 92         } catch (IOException e) {
 93             e.printStackTrace();
 94         }
 95     }
 96     
 97     //缓冲输入流转字节数组缓冲区
 98     public static byte[] read(InputStream in) throws IOException {
 99         BufferedInputStream bis = new BufferedInputStream(in);
100         ByteArrayOutputStream baos = new ByteArrayOutputStream();
101         int c = bis.read();
102         while((c!=-1)){
103             baos.write(c);
104             c = bis.read();
105         }
106         bis.close();
107         return baos.toByteArray();
108     }
109 }
110 
111 /**
112  * java1.5进行了优化,就出现了callable,就有了返回值和抛异常
113    callable和runnable都可以应用于executors。而thread类只支持runnable
114  * @author Administrator
115  *
116  */
117 class DownLoadThreads implements Callable{
118     private int fileCount; 
119     private int blockSize; 
120     private int TotalLength; 
121     private byte[] b;
122     public int getFileCount() {
123         return fileCount;
124     }
125     public void setFileCount(int fileCount) {
126         this.fileCount = fileCount;
127     }
128     public byte[] getB() {
129         return b;
130     }
131     public void setB(byte[] b) {
132         this.b = b;
133     }
134     public DownLoadThreads(int fileCount,int blockSize,int TotalLength){
135         this.fileCount = fileCount;
136         this.blockSize = blockSize;
137         this.TotalLength= TotalLength;
138     }
139     public Object call() throws Exception {
140         try {
141             URL url = new URL(Test003.sate);
142             HttpURLConnection conn = (HttpURLConnection) url.openConnection();
143             conn.setRequestMethod("GET");
144             ////设置请求数据的区间
145             conn.setRequestProperty("Range", "bytes="+(TotalLength-blockSize)+"-"+(TotalLength));
146             conn.connect();
147             System.out.println(fileCount+"开始:"+(TotalLength-blockSize)+",结束:"+TotalLength);
148 
149             int code = conn.getResponseCode();
150             System.out.println(fileCount+">>>Code>>>"+code);
151             if(code == HttpURLConnection.HTTP_OK || code == 206){
152                 byte []b=Test003.read(conn.getInputStream());
153                 System.out.println(fileCount+"下载长度:"+b.length);
154                 System.out.println("第"+fileCount+"段下载完毕!!!");
155                 return b;
156             }
157             System.out.println("第"+fileCount+"段下载失败!!!");
158         } catch (FileNotFoundException e) {
159             e.printStackTrace();
160         } catch (IOException e) {
161             e.printStackTrace();
162         }
163 
164         return null;
165     }
166 }

三.重组后的数据:

原文地址:https://www.cnblogs.com/KdeS/p/13468230.html