Java中的管道流 PipedOutputStream和PipedInputStream

我们在学习IO流的时候可能会学字节流、字符流等,但是关于管道流的相信大部分视频或者教程都是一语带过,第一个是因为这个东西在实际开发中用的也不是很多,但是学习无止境,存在既有理。JDK中既然有个类那说明他并不是一无是处,只是我们目前还没有场景用到它,那说明我们说的还不够,知识点还不足以去驾驭它。

管道流其实是一个很有魅力的流,用法也很独特。他用来连接两个线程之间的通信,比如传输文件等。它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。费话不多说,我们来看一个例子:

public class PipdTest {

	public static void main(String[] args) throws IOException {

		// 创建一个发送者对象
		Sender sender = new Sender();
		// 创建一个接收者对象
		Receiver receiver = new Receiver();
		// 获取输出管道流
		PipedOutputStream outputStream = sender.getOutputStream();
		// 获取输入管道流
		PipedInputStream inputStream = receiver.getInputStream();
		// 链接两个管道,这一步很重要,把输入流和输出流联通起来
		outputStream.connect(inputStream);
		// 启动发送者线程
		sender.start();
		// 启动接收者线程
		receiver.start();
	}
}

/**
 * 发送线程
 * 
 * @author yuxuan
 *
 */
class Sender extends Thread {

	// 声明一个 管道输出流对象 作为发送方
	private PipedOutputStream outputStream = new PipedOutputStream();

	public PipedOutputStream getOutputStream() {
		return outputStream;
	}

	@Override
	public void run() {
		String msg = "Hello World";
		try {
			outputStream.write(msg.getBytes());
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				// 关闭输出流
				outputStream.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}

/**
 * 接收线程
 * 
 * @author yuxuan
 *
 */
class Receiver extends Thread {

	// 声明一个 管道输入对象 作为接收方
	private PipedInputStream inputStream = new PipedInputStream();

	public PipedInputStream getInputStream() {
		return inputStream;
	}

	@Override
	public void run() {
		byte[] buf = new byte[1024];
		try {
			// 通过read方法 读取长度
			int len = inputStream.read(buf);
			System.out.println(new String(buf, 0, len));
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				// 关闭输入流
				inputStream.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}

上面的代码有几个点需要掌握清楚。

1、第一个就是connect方法,他的源码是这么写的

    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
/*代表连接该管道输入流的输出流PipedOutputStream下一个字节将存储在循环缓冲数组buffer的位置。
当in<0说明缓冲数组是空的;当in==out说明缓冲数组已满。*/
        snk.in = -1;
        //代表该管道输入流下一个要读取的字节在循环缓冲数组中的位置
        snk.out = 0;
         //表示该管道输入流是否与管道输出流建立了连接,true为已连接
        snk.connected = true;
    }

我们可以看到,他是一个线程同步的方法,通过synchronized 关键字修饰。

除了调用connect方法之外,还可以在构造函数中直接传进去,源码如下:

当然管道流也有一些注意事项:

  • 管道流仅用于多个线程之间传递信息,若用在同一个线程中可能会造成死锁;
  • 管道流的输入输出是成对的,一个输出流只能对应一个输入流,使用构造函数或者connect函数进行连接;
  • 一对管道流包含一个缓冲区,其默认值为1024个字节,若要改变缓冲区大小,可以使用带有参数的构造函数;
  • 管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞;
  • 管道依附于线程,因此若线程结束,则虽然管道流对象还在,仍然会报错“read dead end”;
  • 管道流的读取方法与普通流不同,只有输出流正确close时,输出流才能读到-1值。

下面我们来看write方法的源码:

看到这里是不是一目了然了。以下还有一些注意事项,我们来看:

PipedInputStream运用的是一个1024字节固定大小的循环缓冲区。写入PipedOutputStream的数据实际上保存到对应的 PipedInputStream的内部缓冲区。从PipedInputStream执行读操作时,读取的数据实际上来自这个内部缓冲区。如果对应的 PipedInputStream输入缓冲区已满,任何企图写入PipedOutputStream的线程都将被阻塞。而且这个写操作线程将一直阻塞,直至出现读取PipedInputStream的操作从缓冲区删除数据。

这意味着,向PipedOutputStream写数据的线程不应该是负责从对应PipedInputStream读取数据的唯一线程。从图二可以清楚地看出这里的问题所在:假设线程t是负责从PipedInputStream读取数据的唯一线程;另外,假定t企图在一次对 PipedOutputStream的write()方法的调用中向对应的PipedOutputStream写入2000字节的数据。在t线程阻塞之前,它最多能够写入1024字节的数据(PipedInputStream内部缓冲区的大小)。然而,一旦t被阻塞,读取 PipedInputStream的操作就再也不会出现,因为t是唯一读取PipedInputStream的线程。这样,t线程已经完全被阻塞,同时,所有其他试图向PipedOutputStream写入数据的线程也将遇到同样的情形。这并不意味着在一次write()调用中不能写入多于1024字节的数据。但应当保证,在写入数据的同时,有另一个线程从PipedInputStream读取数据。

从PipedInputStream读取数据时,如果符合下面三个条件,就会出现IOException异常:

  1. 试图从PipedInputStream读取数据,
  2. PipedInputStream的缓冲区为“空”(即不存在可读取的数据),
  3. 最后一个向PipedOutputStream写数据的线程不再活动(通过Thread.isAlive()检测)。

这是一个很微妙的时刻,同时也是一个极其重要的时刻。假定有一个线程w向PipedOutputStream写入数据;另一个线程r从对应的 PipedInputStream读取数据。下面一系列的事件将导致r线程在试图读取PipedInputStream时遇到IOException异常:

  1. w向PipedOutputStream写入数据。
  2. w结束(w.isAlive()返回false)。
  3. r从PipedInputStream读取w写入的数据,清空PipedInputStream的缓冲区。
  4. r试图再次从PipedInputStream读取数据。这时PipedInputStream的缓冲区已经为空,而且w已经结束,从而导致在读操作执行时出现IOException异常。

如果一个写操作在PipedOutputStream上执行,同时最近从对应PipedInputStream读取的线程已经不再活动(通过 Thread.isAlive()检测),则写操作将抛出一个IOException异常。假定有两个线程w和r,w向 PipedOutputStream写入数据,而r则从对应的PipedInputStream读取。下面一系列的事件将导致w线程在试图写入 PipedOutputStream时遇到IOException异常:

  1. 写操作线程w已经创建,但r线程还不存在。
  2. w向PipedOutputStream写入数据。
  3. 读线程r被创建,并从PipedInputStream读取数据。
  4. r线程结束。
  5. w企图向PipedOutputStream写入数据,发现r已经结束,抛出IOException异常。

此篇文章主要用于理解运用管道流,如果在实际项目开发中用到的话建议一定要研究透在用,他的坑可不止我上面诺列的这些哦

有问题可以在下面评论,技术问题可以私聊我

原文地址:https://www.cnblogs.com/c1024/p/11012007.html