Golang并发编程-channel实战篇

         Golang并发编程-channel实战篇

                               作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

 

 

 

一.channel概述

  channel是Go语言中的一个核心类型,可以把它看成管道。并发核心单元通过它就可以发送或者接收数据进行通讯,这在一定程度上又进一步降低了编程的难度。
  channel是一个数据类型,主要用来解决go程的同步问题以及go程之间数据共享(数据传递)的问题。
  goroutine运行在相同的地址空间,因此访问共享内存必须做好同步。goroutine 奉行通过通信来共享内存,而不是共享内存来通信。
  引⽤类型channel可用于多个goroutine通讯。其内部实现了同步,确保并发安全。

1>.无缓冲的channel

  无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何数据值的通道。

  这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的goroutine阻塞等待。

  这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

  阻塞:
    由于某种原因数据没有到达,当前go程(线程)持续处于等待状态,直到条件满足,才解除阻塞。

  同步:
    在两个或多个go程(线程)间,保持数据内容一致性的机制。

  下图展示两个 goroutine 如何利用无缓冲的通道来共享一个值:
    在第1步,两个goroutine都到达通道,但哪个都没有开始执行发送或者接收。
    在第2步,左侧的goroutine将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个goroutine会在通道中被锁住,直到交换完成。
    在第3步,右侧的goroutine将它的手放入通道,这模拟了从通道里接收数据。这个goroutine一样也会在通道中被锁住,直到交换完成。
    在第4步和第5步,进行交换,并最终在第6步,两个goroutine都将它们的手从通道里拿出来,这模拟了被锁住的goroutine得到释放。两个goroutine现在都可以去做其他事情了。

  无缓冲的channel创建格式:
     make(chan Type)   //等价于"make(chan Type, 0)",如果没有指定缓冲区容量,那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收。

2>.有缓冲的channel

  有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。

  这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也不同。
  只有通道中没有要接收的值时,接收动作才会阻塞。
  只有通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
  这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。
  示例图如下:     在第
1步,右侧的 goroutine 正在从通道接收一个值。     在第2步,右侧的这个 goroutine独立完成了接收值的动作,而左侧的 goroutine 正在发送一个新值到通道里。     在第3步,左侧的goroutine 还在向通道发送新值,而右侧的 goroutine 正在从通道接收另外一个值。这个步骤里的两个操作既不是同步的,也不会互相阻塞。     最后,在第4步,所有的发送和接收都完成,而通道里还有几个值,也有一些空间可以存更多的值。   有缓冲的channel创建格式:     make(chan Type, capacity) //如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行。借助函数 len(ch) 求取缓冲区中剩余元素个数, cap(ch) 求取缓冲区元素容量大小。

二.channel的基本使用

1>.定义有缓冲区管道

package main

import "fmt"

func main() {
    /**
    和map类似,channel也一个对应make创建的底层数据结构的引用,创建channel语法格式如下所示:
        make(chan Type,capacity)

    以下是相关参数说明:
        chan:
            是创建channel所需使用的关键字。
        Type:
            是指定channel收发数据的类型。
        capacity:
            是指定channel的大小。
            当参数"capacity = 0"时,channel是无缓冲阻塞读写的,即该channel无容量。
            当参数"capacity > 0"时,channel有缓冲,是非阻塞的,直到写满capacity个元素才阻塞写入。
    */
    s1 := make(chan int, 3) //定义一个有缓冲区的channel

    //向channel写入数据
    s1 <- 110
    s1 <- 119
    s1 <- 120

    /**
    注意哈,由于上面已经写了3个数据了,此时s1这个channel的容量已经达到3个容量上限啦,即该channel已满.
    如果channel已满继续往该channel写入数据则会抛出异常:"fatal error: all goroutines are asleep - deadlock!"
    */
    //s1 <- 114

    //从channel读取数据
    fmt.Println(<-s1)
    fmt.Println(<-s1, <-s1)

    /**
    上面的代码已经对channel进行读写各三次,此时该channel中并没有数据啦。
    如果channel无数据,我们从该channel读取数据时也会抛出异常:"fatal error: all goroutines are asleep - deadlock!"
    */
    //fmt.Println(<-s1)
}

2>.定义无缓冲区管道

package main

import (
    "fmt"
    "time"
)

func Read(s chan int) {
    defer fmt.Println("读端结束~~~")
    for index := 0; index < 3; index++ {
        fmt.Printf("读取到channel的数据是: %d
", <-s)
    }
}

func Write(s chan int, value int) {
    defer fmt.Println("写端结束...")
    for index := 100; index < value; index++ {
        s <- index
    }
}

func main() {

    /**
    无缓冲区channel特性:
        只有在写段和读端同时准备就绪的情况下才能运行。
    */
    s1 := make(chan int) //定义一个无缓冲区的channel
    //s1 := make(chan int, 10) //定义一个有缓冲区的channel,其容量为10

    go Read(s1)

    go Write(s1, 110)

    /**
    我们让主Goroutine阻塞,写个死循环即可.
    */
    for {
        time.Sleep(1 * time.Second)
    }
}

3>.关闭channel

package main

import (
    "fmt"
    "runtime"
    "time"
)

func ReadChannel(s chan int) {
    defer fmt.Println("读端结束~~~")
    for index := 0; index < 5; index++ {
        if index == 103 {
            /**
            程序结束的时候,清理掉channel占用的空间,只影响写入,不影响读取。
            */
            close(s)
        }

        /**
        从channel中接收数据,并赋值给value,同时检查通道是否已关闭或者是否为空
        */
        value, ok := <-s
        if !ok { //等效于"ok != true"
            fmt.Println("channel已关闭或者管道中没有数据")
            runtime.Goexit()
        } else {
            fmt.Printf("读取到channel的数据是: %d
", value)
        }
    }
}

func WriteChannel(s chan int, value int) {
    defer fmt.Println("写端结束...")
    for index := 100; index < value; index++ {
        if index == 103 {
            /**
            程序结束的时候,清理掉channel占用的空间,只影响写入,不影响读取。
            */
            close(s)
        }
        s <- index
    }
}

func main() {

    s1 := make(chan int) //定义一个有缓冲区的channel,其容量为10

    go ReadChannel(s1)

    go WriteChannel(s1, 110)

    for {
        time.Sleep(1 * time.Second)
    }
}

三.单向channel 

1>.单向channel概述

  默认情况下,通道channel是双向的,也就是说,既可以往里面发送数据也可以同里面接收数据。

  但是,我们经常见一个通道作为参数进行传递而只希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向。

  温馨提示:
    一般情况下,创建管道都是双向的,在向函数传入数据的时候,可以是单向的。
    只读的管道不能传递给只写的管道,同理,只写的管道也不能传递给只读的管道,但是双向的可以传递给任意单向的管道。

2>.单向管道案例

package main

import (
    "fmt"
    "time"
)

/**
s的类型说明:
    "chan<- int"表示传入只写的管道
*/
func Send(s chan<- int, value int) {
    s <- value
}

/**
r的类型说明:
    "<-chan int"表示传入只读的管道
*/
func Receive(r <-chan int) {
    fmt.Printf("管道中的数据为:%d
", <-r)
}

func main() {

    //创建管道
    s1 := make(chan int, 5)

    go Receive(s1)

    go Send(s1, 110)

    for {
        time.Sleep(1 * time.Second)
    }
}

 

四.单向channel应用案例-生产者消费者模型

1>.什么是生产者消费者模型

  单向channel最典型的应用是"生产者消费者模型"。

  生产者消费者模型: 
    某个模块(函数等)负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、go程、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

  单单抽象出生产者和消费者,还够不上是生产者/消费者模型。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

  举一个寄信的例子来辅助理解一下,假设你要寄一封平信,大致过程如下:
    1>.把信写好——相当于生产者制造数据
    2>.把信放入邮筒——相当于生产者把数据放入缓冲区
    3>.邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区
    4>.邮递员把信拿去邮局做相应的处理——相当于消费者处理数据
    那么,这个缓冲区有什么用呢?为什么不让生产者直接调用消费者的某个函数,直接把数据传递过去,而画蛇添足般的设置一个缓冲区呢?缓冲区的好处大概如下:
      解耦:
        假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会直接影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合度也就相应降低了。
        接着上述的例子,如果不使用邮筒(缓冲区),须得把信直接交给邮递员。那你就必须要认识谁是邮递员。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识下一个邮递员(相当于消费者变化导致修改生产者代码)。 而邮筒相对来说比较固定,你依赖它的成本也比较低(相当于和缓冲区之间的弱耦合)。  
      处理并发:
        生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者只能无端浪费时间。
        使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。
        其实最当初这个生产者消费者模式,主要就是用来处理并发问题的。
        从寄信的例子来看。如果没有邮筒,你得拿着信傻站在路口等邮递员过来收(相当于生产者阻塞);又或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
      缓存(异步处理):
        如果生产者制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
        假设邮递员一次只能带走1000封信。万一某次碰上情人节送贺卡,需要寄出去的信超过1000封,这时候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来时再拿走。

2>.生产者消费者模型案例

package main

import (
    "fmt"
    "strconv"
    "time"
)

//定义生产者,假设生产者不消费
func Producer(p chan<- string) {
    defer fmt.Println("生产蛋糕结束")
    for index := 1; index <= 10; index++ {
        p <- "生产了" + strconv.Itoa(index) + "个蛋糕."
    }
}

//定义消费者,假设消费者不生产
func Consumer(c <-chan string) {
    defer fmt.Println("吃饱了")
    for index := 1; index <= 8; index++ {
        fmt.Println(<-c)
    }
}

func main() {
    s1 := make(chan string, 10)

    go Producer(s1)

    go Consumer(s1)

    for {
        time.Sleep(1 * time.Second)
    }
}

五.channel应用案例-定时器

  Go语言自带time包,包里面定义了定时器的结构,代码如下所示:
    type Timer struct {
        C <-chan Time
        r runtimeTimer
    }

  在定时时间到达之前,没有数据会写入到C,如果这个时候读C会一直阻塞,时间到了系统会向time.C中写入当前时间,此时阻塞解除。

  博主推荐阅读:
    https://www.cnblogs.com/yinzhengjie/p/12244385.html
    https://www.cnblogs.com/yinzhengjie/p/12245289.html

 

原文地址:https://www.cnblogs.com/yinzhengjie2020/p/12657206.html