【并发】4、文件锁,多线程队列拷贝文件,并读取的操作

  最近有这样一个功能点,我实现了一个多线程的队列,生产线程ftp获取文件,然后扫描指定目录,获取文件基础信息,然后put进入队列,然后消费者通过这个信息解析文件进行入库,因为文件会比较多,所以到时候会起多个消费线程去解析数据,并且考虑到复用,不用的文件,消费消除要分化操作(策略模式),但是遇到一个问题,就是生产线程可能重复扫描到一个还未解析完成的文件

  1。改进,新建一个consum目录,扫描到一个文件,就移动到consum目录,这样ftp操作就无脑向生产文件夹拉文件就行

  2.还有个文件,就是如果消费线程如果解析不够快,也有可能导致文件被重复扫描到,那么为了避免,就需要扫到一个文件,就把文件移动到指定的目录下,等待消费

  3.新文件,当生产队列不够快,消费队列空闲比较多了的时候,不同的文件操作,会导致线程锁死,因为生产线程文件还未写入,消费线程就开始读取,这个时候线程卡死,造成死锁

  4.为了解决这个问题,我们再写文件的时候对文件上一个文件锁,读取文件的消费线程,等待生产线程写入完成之后再进行操作,这里有个demo来模拟这个线程锁

package io;

import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;

/**
 * @ProjectName: cutter-point
 * @Package: io
 * @ClassName: FileLockTest
 * @Author: xiaof
 * @Description: 测试多线程下的文件锁
 * @Date: 2019/3/7 9:34
 * @Version: 1.0
 */
public class FileLockTest {

    //第一个线程不断吧文件从文件A拷贝到B,然后从B拷贝到A
    //第二个线程不断遍历B文件夹,并遍历B文件夹的数据,输出到控制台
    private final static String baseDir1 = "C:\Users\xiaof\Desktop\1\dir-t1";
    private final static String baseDir2 = "C:\Users\xiaof\Desktop\1\dir-t2";

    @Test
    public void test1() {
//        String fileName = "多线程队列.png";
        FileOutputStream fileOutputStream = null;
        FileInputStream fileInputStream = null;
        //反复吧文件拷贝到两个目录中
        try {
            while(true) {
                System.out.println(Thread.currentThread() + "-休眠1s开始新一轮拷贝");
                Thread.sleep(1000);
                //判断文件目录是否有文件
                File dir1 = new File(baseDir1);
                String souDir = "";
                String desDir = "";
                if(dir1.listFiles().length > 0) {
                    souDir = baseDir1;
                    desDir = baseDir2;
                } else {
                    //吧文件从2拷贝到1
                    souDir = baseDir2;
                    desDir = baseDir1;
                }

                //吧文件从1拷贝到2
                //1.遍历所有文件,依次上锁
                File sourFileDir = new File(souDir);
                for(int i = 0; i < sourFileDir.listFiles().length; ++i) {
                    File tmpFile = sourFileDir.listFiles()[i];
                    //输出
                    fileInputStream = new FileInputStream(tmpFile);
                    fileOutputStream = new FileOutputStream(desDir + "/" + tmpFile.getName());
                    //对写的文件进行上锁,读的文件不需要上锁
                    FileChannel fileChannel = fileOutputStream.getChannel();
                    FileLock fileLock = null;
                    //操作之前判断是否已经被锁住
                    while(true) {
                        fileLock = fileChannel.tryLock();
                        if(fileLock != null) {
                            //已上锁
                            break;
                        } else {
                            System.out.println(Thread.currentThread() + "-文件已被锁定,休眠1s");
                            Thread.sleep(1000);
                        }
                    }

                    //拷贝数据
                    byte buf[] = new byte[1024];
                    int len = 0;
                    while((len = fileInputStream.read(buf)) != -1) {
                        fileOutputStream.write(buf, 0, len);
                    }

                    fileOutputStream.flush();

                    //最后删除源文件
                    fileLock.release();//解锁
                    fileChannel.close();
                    fileOutputStream.close();
                    fileInputStream.close();
                    fileInputStream = null;
                    fileOutputStream = null;

                    if(tmpFile.delete()) {
                        System.out.println("删除源文件");
                    }

                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

    @Test
    public void test2() {
        //读取文件并输出
        FileOutputStream fileOutputStream = null;
        FileInputStream fileInputStream = null;
        try{
            //判断文件是否存在
            while (true) {
                Thread.sleep(1000);

                File dir1 = new File(baseDir1);
                String souDir = "";
                String desDir = "";

                souDir = baseDir1;
                desDir = baseDir2;

//                if(dir1.listFiles().length > 0) {
//                    souDir = baseDir1;
//                    desDir = baseDir2;
//                } else {
//                    //吧文件从2拷贝到1
//                    souDir = baseDir2;
//                    desDir = baseDir1;
//                }

                File sourFileDir = new File(souDir);
                for(int i = 0; i < sourFileDir.listFiles().length; ++i) {
                    File tmpFile = sourFileDir.listFiles()[i];
                    //输出
                    fileInputStream = new FileInputStream(tmpFile);

                    //拷贝数据
                    byte buf[] = new byte[1024];
                    int len = 0;
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    while((len = fileInputStream.read(buf)) != -1) {
                        byteArrayOutputStream.write(buf, 0, len);
                    }

                    System.out.println("读取目录:" + souDir);
                    System.out.println(byteArrayOutputStream.toString());
                    //最后删除源文件
                    fileInputStream.close();
                    fileInputStream = null;

                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

 

原文地址:https://www.cnblogs.com/cutter-point/p/10488077.html