golang IO streaming

IO Streaming

Streaming IO in Go,引用此文,略有修改

io.Reader和io.Writer

io.Reader接口定义了从传输缓存读取数据

type Reader interface {
    Read(p []byte) (n int, err error)
}

Read方法接受一个[]byte作为读取数据后的接收者,返回读取的字节数n和错误err,当读尽缓存内容时,err为io.EOF。
Read方法的读取机制

  1. 尽可能读取len(p)字节到p
  2. 调用Read()后,n可能比len(p)小
  3. 如果出现错误,read()仍可能返回缓冲区p中的n个字节。例如,读取突然关闭的TCP套接字。根据您的使用,您可以选择将字节保持在p中或重试。
  4. 当Read()读尽数据后,可能返回非零的n和err=io.EOF。甚至出现返回非零n和err=nil的情况,但是接下来的Read()方法调用一定返回n=0, err=io.EOF
  5. 当n=0,err=nil时,并不意味着读取数据完毕,接下来的Read()可能返回更多的数据。

综上,实现Read()方法还是比较棘手的,幸好标准库有好多实现了io.Reader的接口。

strings.NewReader
// NewReader returns a new Reader reading from s. 
// It is similar to bytes.NewBufferString 
// but more efficient and read-only.
func NewReader(s string) *Reader

func main() {
	reader := strings.NewReader("Clear is better than clever")
	p := make([]byte, 4)
	
	for {
		n, err := reader.Read(p)
		if err != nil{
		    if err == io.EOF {
			fmt.Println(string(p[:n])) //should handle any remainding bytes.
			break
		    }
		    fmt.Println(err)
		    os.Exit(1)
		}
		fmt.Println(string(p[:n]))
	}
}
自定义io.Reader

实现Read()方法,去除非字母

type alphaReader struct {
	src string
	cur int
}

func newAlphaReader(src string) *alphaReader {
	return &alphaReader{src: src}
}

func alpha(r byte) byte {
	if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
		return r
	}
	return 0
}

func (a *alphaReader) Read(p []byte) (int, error) {
	if a.cur >= len(a.src) {
		return 0, io.EOF
	}

	x := len(a.src) - a.cur
	n, bound := 0, 0
	if x >= len(p) {
		bound = len(p)
	} else if x <= len(p) {
		bound = x
	}

	buf := make([]byte, bound)
	for n < bound {
		if char := alpha(a.src[a.cur]); char != 0 {
			buf[n] = char
		}
		n++
		a.cur++
	}
	copy(p, buf) // 使用copy方法,保证切片每次读取指定大小的内容
	return n, nil
}

func main() {
	reader := newAlphaReader("Hello! It's 9am, where is the sun?")
	p := make([]byte, 4)
	for {
		n, err := reader.Read(p)
		if err == io.EOF {
			break
		}
		fmt.Print(string(p[:n]))
	}
	fmt.Println()
}

借助标准库的io.Reader实现来简化代码

type alphaReader struct {
	reader io.Reader
}

func newAlphaReader(reader io.Reader) *alphaReader {
	return &alphaReader{reader: reader}
}

func alpha(r byte) byte {
	if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
		return r
	}
	return 0
}

func (a *alphaReader) Read(p []byte) (int, error) {
	n, err := a.reader.Read(p)
	if err != nil {
		return n, err
	}
	buf := make([]byte, n)
	for i := 0; i < n; i++ {
		if char := alpha(p[i]); char != 0 {
			buf[i] = char
		}
	}

	copy(p, buf)
	return n, nil
}

func main() {
	// use an io.Reader as source for alphaReader
	reader := newAlphaReader(strings.NewReader("Hello! It's 9am, where is the sun?"))
	p := make([]byte, 4)
	for {
		n, err := reader.Read(p)
		if err == io.EOF {
			break
		}
		fmt.Print(string(p[:n]))
	}
	fmt.Println()
}

与读取文件的io.Reader结合

func main() {
	// use an os.File as source for alphaReader
	file, err := os.Open("./alpha_reader3.go")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()
	
	reader := newAlphaReader(file)
	p := make([]byte, 4)
	for {
		n, err := reader.Read(p)
		if err == io.EOF {
			break
		}
		fmt.Print(string(p[:n]))
	}
	fmt.Println()
}

io.Writer

type Writer interface {
    Write(p []byte) (n int, err error)
}

Write方法接受一个[]byte作为输入

func main() {
	proverbs := []string{
		"Channels orchestrate mutexes serialize",
		"Cgo is not Go",
		"Errors are values",
		"Don't panic",
	}
	var writer bytes.Buffer

	for _, p := range proverbs {
		n, err := writer.Write([]byte(p))
		if err != nil {
			fmt.Println(err)
			os.Exit(1)
		}
		if n != len(p) {
			fmt.Println("failed to write data")
			os.Exit(1)
		}
	}

	fmt.Println(writer.String())
}
实现自定义io.Writer
type chanWriter struct {
	ch chan byte
}

func newChanWriter() *chanWriter {
	return &chanWriter{make(chan byte, 1024)}
}

func (w *chanWriter) Chan() <-chan byte {
	return w.ch
}

func (w *chanWriter) Write(p []byte) (int, error) {
	n := 0
	for _, b := range p {
		w.ch <- b
		n++
	}
	return n, nil
}

func (w *chanWriter) Close() error {
	close(w.ch)
	return nil
}

func main() {
	writer := newChanWriter()
	go func() {
		defer writer.Close()
		writer.Write([]byte("Stream "))
		writer.Write([]byte("me!"))
	}()
	for c := range writer.Chan() {
		fmt.Printf("%c", c)
	}
	fmt.Println()
}

chanWriter也实现了io.Closer, 调用writer.Close()关闭channel,避免chanWriter.Write
方法一直等待channel。

os.File

亦实现了io.Writer和io.Reader接口,因此向文件写文件可以调用writer.Write([]byte), os.Create返回*os.File

func main() {
	proverbs := []string{
		"Channels orchestrate mutexes serialize
",
		"Cgo is not Go
",
		"Errors are values
",
		"Don't panic
",
	}
	// func Create(name string) (*File, error)
	file, err := os.Create("./proverbs.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()

	for _, p := range proverbs {
		n, err := file.Write([]byte(p))
		if err != nil {
			fmt.Println(err)
			os.Exit(1)
		}
		if n != len(p) {
			fmt.Println("failed to write data")
			os.Exit(1)
		}
	}
	fmt.Println("file write done")
}

os.Open返回的*os.File是只读的

func main() {
    // 如果需要读写文件,使用os.OpenFile
    // 	file, err := os.OpenFile("./proverbs.txt", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
	file, err := os.Open("./proverbs.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()

	p := make([]byte, 4)
	for {
		n, err := file.Read(p)
		if err == io.EOF {
			break
		}
		fmt.Print(string(p[:n]))
	}
}

os.Stdout, os.Stdin, os.Stderr 其实也是*os.File

func main() {
	proverbs := []string{
		"Channels orchestrate mutexes serialize
",
		"Cgo is not Go
",
		"Errors are values
",
		"Don't panic
",
	}

	for _, p := range proverbs {
		n, err := os.Stdout.Write([]byte(p))
		if err != nil {
			fmt.Println(err)
			os.Exit(1)
		}
		if n != len(p) {
			fmt.Println("failed to write data")
			os.Exit(1)
		}
	}
}

io.Copy 方便的将流数据拷贝到目标reader

func main() {
	proverbs := new(bytes.Buffer)
	proverbs.WriteString("Channels orchestrate mutexes serialize
")
	proverbs.WriteString("Cgo is not Go
")
	proverbs.WriteString("Errors are values
")
	proverbs.WriteString("Don't panic
")

	file, err := os.Create("./proverbs.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()

	// copy from reader data into writer file
	if _, err := io.Copy(file, proverbs); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	fmt.Println("file created")
}
// 或者
func main() {
	file, err := os.Open("./proverbs.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()

	if _, err := io.Copy(os.Stdout, file); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}

io.WriteString()

可以方便地将字符串写入到writer

func main() {
	file, err := os.Create("./magic_msg.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()
	if _, err := io.WriteString(file, "Go is fun!"); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}

Pipe writers and readers

输入io.PipeWriter和io.PipeReader模型IO操作,如在内存管道中。数据被写入管道的写入端,并使用单独的GO例程在管道的读取器端读取数据。

func main() {
	proverbs := new(bytes.Buffer)
	proverbs.WriteString("Channels orchestrate mutexes serialize
")
	proverbs.WriteString("Cgo is not Go
")
	proverbs.WriteString("Errors are values
")
	proverbs.WriteString("Don't panic
")

	piper, pipew := io.Pipe()

	// write in writer end of pipe
	go func() {
		defer pipew.Close()
		io.Copy(pipew, proverbs)
	}()

	// read from reader end of pipe.
	io.Copy(os.Stdout, piper)
	piper.Close()
}

Buffered IO

GO通过包bufio支持缓冲IO,这使得使用文本内容更加容易。例如,下面的程序读取以值‘ ’分隔的逐行文件的内容。

func main() {
	file, err := os.Open("./planets.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	defer file.Close()
	reader := bufio.NewReader(file)

	for {
		line, err := reader.ReadString('
')
		if err != nil {
			if err == io.EOF {
				break
			} else {
				fmt.Println(err)
				os.Exit(1)
			}
		}
		fmt.Print(line)
	}

}

Util package

包ioutil是io的一个子包,它为IO提供了一些方便的功能。例如,以下代码使用函数ReadFile将文件的内容加载到[]byte中

package main

import (
  "io/ioutil"
   ...
)

func main() {
	bytes, err := ioutil.ReadFile("./planets.txt")
	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	fmt.Printf("%s", bytes)
}

还有其他IO Stream 操作

  • file IO
  • buffered IO
  • network IO
  • formatted IO
原文地址:https://www.cnblogs.com/linyihai/p/10721193.html