Java多线程 之 多线程下载文件

*

 文件结构:

代码如下:

1,

package com.m0312.download.impl;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;

import com.m0312.download.api.Connection;

public class ConnectionImpl implements Connection{
    URLConnection urlCon;
    URL url;
    static final int BUFFER_SIZE = 1024;
    ConnectionImpl(String _url){
        try {
            url=new URL(_url);
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }
    }
    @Override
    public byte[] read(int startPos, int endPos) throws IOException {
        //只能写出第一部分
        byte[] buffer=new byte[endPos-startPos+1];
        HttpURLConnection urlCon2 = (HttpURLConnection)url.openConnection();
        urlCon2.setRequestProperty("Range", "bytes=" + startPos + "-"
                + endPos);
        InputStream is=urlCon2.getInputStream();
        //is.skip(startPos);
        is.read(buffer, 0, endPos-startPos+1);//因为没有+1,一直是只有三分之一部分
        is.close();
        return buffer;
        
        /**
         * 开始读[0,1603]
            开始读[1604,3207]
            is read length: 1024
            is read length: 1024
            baos.size: 1024
            baos.size: 1024
            开始读[3208,4811]
            is read length: 580
            baos.size: 1604    ///size会累积,等于度过的所有buffer size
            is read length: 1024
            baos.size: 1024
            is read length: 580
            baos.size: 1604
            is read length: 580
            baos.size: 1604
         */
    }

    @Override
    public int getContentLength() {
        return urlCon.getContentLength();
    }

    @Override
    public void close() {
        if(urlCon!=null){
            //???
        }
    }
    @Override
    public URLConnection getUrlCon() {
        return urlCon;
    }
    @Override
    public void setUrlCon(URLConnection urlCon) {
        this.urlCon = urlCon;
    }

}

2,

package com.m0312.download;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

import com.m0312.download.api.Connection;

public class DownloadThread extends Thread{

    Connection conn;
    int startPos;
    int endPos;
    String descFilePath;
    private CyclicBarrier cyclicBarrier;
    
    public DownloadThread( Connection conn, int startPos, int endPos){
        
        this.conn = conn;        
        this.startPos = startPos;
        this.endPos = endPos;
    }
    public DownloadThread( Connection conn, int startPos, int endPos,
            String descFilePath,CyclicBarrier cyclicBarrier){
        
        this.conn = conn;        
        this.startPos = startPos;
        this.endPos = endPos;
        this.descFilePath=descFilePath;
        this.cyclicBarrier=cyclicBarrier;
    }
    @Override
    public void run(){    
        try {
            /*byte[] bytes=conn.read(startPos, endPos);
            os=new FileOutputStream(new File(descFilePath)); 
            os.write(bytes, startPos, endPos-startPos+1);
            cyclicBarrier.await();//等待其他线程
            */            
            System.out.println("开始读["+startPos+","+endPos+"]");
            byte[] buffer = conn.read(startPos , endPos);
            RandomAccessFile file = new RandomAccessFile(descFilePath, "rw");
            file.seek(startPos);
            file.write(buffer, 0, buffer.length);
            file.close();
            cyclicBarrier.await();
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        //System.out.println("所有线程都下载完成");
        //通知 FileDownloader ,自己已经做完
        
    }
}

3,

package com.m0312.download;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.concurrent.CyclicBarrier;

import com.m0312.download.api.Connection;
import com.m0312.download.api.ConnectionManager;
import com.m0312.download.api.DownloadListener;


public class FileDownloader {
    
    String url;
    
    DownloadListener listener;
    
    ConnectionManager cm;
    private static final int THREAD_NUM = 3;

    //定义几个线程去下载  
    final int DOWN_THREAD_NUM = 3;  
    final String OUT_FILE_NAME = "e:/testfile/down.png";  
    InputStream[] isArr = new InputStream[DOWN_THREAD_NUM];  
    RandomAccessFile[] outArr = new RandomAccessFile[DOWN_THREAD_NUM];  
    
    public FileDownloader(String _url) {
        this.url = _url;
        
    }
    private void createPlaceHolderFile(String fileName, int contentLen) throws IOException{
        
        RandomAccessFile file = new RandomAccessFile(fileName,"rw");
        
        for(int i=0; i<contentLen ;i++){
            file.write(0);
        }
        
        file.close();
    }
    public void execute(){
        // 在这里实现你的代码, 注意: 需要用多线程实现下载
        // 这个类依赖于其他几个接口, 你需要写这几个接口的实现代码
        // (1) ConnectionManager , 可以打开一个连接,通过Connection可以读取其中的一段(用startPos, endPos来指定)
        // (2) DownloadListener, 由于是多线程下载, 调用这个类的客户端不知道什么时候结束,所以你需要实现当所有
        //     线程都执行完以后, 调用listener的notifiedFinished方法, 这样客户端就能收到通知。
        // 具体的实现思路:
        // 1. 需要调用ConnectionManager的open方法打开连接, 然后通过Connection.getContentLength方法获得文件的长度
        // 2. 至少启动3个线程下载,  注意每个线程需要先调用ConnectionManager的open方法
        // 然后调用read方法, read方法中有读取文件的开始位置和结束位置的参数, 返回值是byte[]数组
        // 3. 把byte数组写入到文件中
        // 4. 所有的线程都下载完成以后, 需要调用listener的notifiedFinished方法
        
        // 下面的代码是示例代码, 也就是说只有一个线程, 你需要改造成多线程的。
        Connection conn = null;
        try {
            
            conn = cm.open(this.url);
            
            int length = conn.getContentLength();
            File desc=new File(OUT_FILE_NAME);
            if(desc.exists()){
                desc.delete();
            }            
            createPlaceHolderFile(OUT_FILE_NAME, length);            
            CyclicBarrier barrier=new CyclicBarrier(THREAD_NUM,new Runnable() {
                @Override
                public void run() {
                    listener.notifyFinished();
                }
            });
            
            isArr[0] = conn.getUrlCon().getInputStream();
            int fileLen = conn.getContentLength();  
            System.out.println("网络资源的大小" + fileLen);  
              
            // 每线程应该下载的字节数  
            long numPerThred = fileLen / DOWN_THREAD_NUM;  
            // 整个下载资源整除后剩下的余数取模  
            long left = fileLen % DOWN_THREAD_NUM;  
            int start=0;
            int end=0;
            for (int i = 0; i < DOWN_THREAD_NUM; i++) {  
                
                // 为每个线程打开一个输入流、一个RandomAccessFile对象,  
                // 让每个线程分别负责下载资源的不同部分。  
                //isArr[0]和outArr[0]已经使用,从不为0开始  
                
                // 分别启动多个线程来下载网络资源  
                if (i == DOWN_THREAD_NUM - 1) {  
                    // 最后一个线程下载指定numPerThred+left个字节  
                    start=(int) (i * numPerThred);
                    end=(int) ((i + 1) * numPerThred  
                            + left-1);
                } else {  
                    // 每个线程负责下载一定的numPerThred个字节  
                    start=(int) (i * numPerThred);
                    end=(int) ((i + 1) * numPerThred)-1;
                   
                } 
                new DownloadThread(conn, start, end,OUT_FILE_NAME,barrier).start();
                //Thread.sleep(1000);
            }  
            
        } catch (Exception e) {            
            e.printStackTrace();
        }finally{
            conn.close();
        }
    }
    
    public void setListener(DownloadListener listener) {
        this.listener = listener;
    }

    
    
    public void setConnectionManager(ConnectionManager ucm){
        this.cm = ucm;
    }
    
    public DownloadListener getListener(){
        return this.listener;
    }
    
}

4,

package com.m0312.download;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.m0312.download.api.ConnectionManager;
import com.m0312.download.api.DownloadListener;
import com.m0312.download.impl.ConnectionManagerImpl;

public class FileDownloaderTest {
    boolean downloadFinished = false;
    @Before
    public void setUp() throws Exception {
    }

    @After
    public void tearDown() throws Exception {
    }

    @Test
    public void testDownload() {
        
        String url = "http://127.0.0.3:8082/applogo.png";
        
        FileDownloader downloader = new FileDownloader(url);

    
        ConnectionManager cm = new ConnectionManagerImpl();
        downloader.setConnectionManager(cm);
        
        downloader.setListener(new DownloadListener() {
            @Override
            public void notifyFinished() {
                downloadFinished = true;
            }
        });
        downloader.execute();
        
        // 等待多线程下载程序执行完毕
        while (!downloadFinished) {
            try {
                System.out.println("还没有下载完成,休眠五秒");
                //休眠5秒
                Thread.sleep(5000);
            } catch (InterruptedException e) {                
                e.printStackTrace();
            }
        }
        System.out.println("下载完成!");
        /**
         * 网络资源的大小4812
            开始读[0,1603]
            开始读[1604,3207]
            开始读[3208,4811]
            下载完成!
         */
        

    }

}

*

有问题在公众号【清汤袭人】找我,时常冒出各种傻问题,然一通百通,其乐无穷,一起探讨


原文地址:https://www.cnblogs.com/qingmaple/p/6650086.html