第八章goroutine和通道

  • goroutine

Go有两种并发编程的风格

goroutine和通道,它们支持通信顺序进程(CSP),CSP是一个并发的模式,在不同的执行体(goroutine)之间传递值,但是变量本身局限于单一的执行体

当一个程序启动时,只有一个goroutine来调用main函数,称它为主goroutine。新的goroutine通过go语句进行创建。语法上,一个go语句是在普通的函数或者方法调用前加上go关键字前缀。go语句使函数在一个新创建的goroutine中调用。go语句本身的执行立即完成

除了从main返回或者退出程序之外,没有程序化的方法让一个goroutine来停止另一个,但是有办法和goroutine通信来要求它自己停止

  • 示例:并发时钟服务器

net包,提供构建客户端和服务器程序的组件,这些程序通过TCP、UDP或者UNIX套接字进行通信

//顺序时钟服务器,以每秒钟一次的频率向客户端发送当前时间

package main

import (
    "io"
    "log"
    "net"
    "time"
)

func main() {
    listener, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        handleConn(conn)
    }
}

func handleConn(c net.Conn) {
    defer c.Close()
    for {
        _, err := io.WriteString(c, time.Now().Format("15:04:05
"))
        if err != nil {
            return
        }
        time.Sleep(1 * time.Second)
    }
}
  • 示例:并发回声服务器
  • 通道

如果说goroutine是Go程序并发的执行体,通道就是它们之间的连接。通道是可以让一个goroutine发送特定值到另一个goroutine的通信机制。每一个通道是一个具体类型的导管,叫作通道的元素类型。一个有int类型元素的通道写为chan int

使用内置的make函数创建一个通道  ch := make(chan int)

像map一样,通道是一个使用make创建的数据结构的引用。当复制或者作为参数传递到一个函数时,复制的是引用,这样调用者和被调用着都引用同一份数据结构。和其他引用类型一样,通道的零值是nil

同种类型的通道可以使用==符号进行比较。当二者都是同一通道数据的引用时,比较值为true。通道也可以和nil进行比较

通道有两个主要操作:发送send和接收receive,两者统称为通信。send语句从一个goroutine传输一个值到另一个正在执行接收表达式的goroutine。两个操作都使用<-操作符书写。发送语句中,通道和值分别在<-的左右两边。在接收表达式中,<-放在通道操作数前面。在接收表达式中,其结果未被使用也是合法的

ch <- x  //发送语句
x = <- ch  //赋值语句中的接收表达式
<-ch  //接收语句,丢弃结果
//通道支持第三个操作:关闭close,它设置一个标志位来指示值当前已经发送完毕,这个通道后面没有值了;关闭后的发送操作将导致宕机。在一个已经关闭的通道上进行接收操作,将获取所有已经发送的值,直到通道为空;这时任何接收操作会立即完成,同时获取到一个通道元素类型对应的零值

关闭通道
close(ch)

//使用简单的make调用创建的通道叫无缓冲unbuffered通道,但make还可以接受第二个可选参数,一个表示通道容量的整数。如果容量是0,make创建一个无缓冲通道
ch = make(chan int)  //无缓冲通道
ch = make(chan int, 0)  //无缓冲通道
ch = make(chan int, 3)  //容量为3的缓冲通道

无缓冲通道上的发送操作将会阻塞,直到另一个goroutine在对应的通道上执行接收操作,这时值传递完成,两个goroutine都可以继续执行。相反,如果接收操作先执行,接收方routine将阻塞,直到另一个goroutine在同一个通道上发送一个值

//pipeline1
func main() {
    naturals := make(chan int)
    squares := make(chan int)

    //counter
    go func() {
        for x := 0; ; x++{
            naturals <- x
        }
    }()

    //squarer
    go func() {
        for {
            x := <- naturals
            squares <- x * x
        }
    }()

    for {
        fmt.Println(<-squares)
    }
}

 如果发送方知道没有更多的数据要发送,告诉接收者所在goroutine可以停止等待是很有用的。可以通过内置的close函数来关闭通道  close(naturals)

  • channel的关闭和广播

在通道关闭后,任何后续的发送操作将会导致应用崩溃。当关闭的通道被读完(就是最后一个发送的值被接收)后,所有后续的接收操作顺畅进行,只是获取到的是零值。关闭naturals通道导致计算平方的循环快速运转,并将结果0传递给printer goroutine

没有一个直接的方式来判断是否通道已经关闭,但有接收操作的一个变种,它产生两个结果:接收到的通道元素,以及一个布尔值(通常称为ok),它为true的时候代表接收成功,false表示当前的接收操作在一个关闭的并且读完的通道上。使用这个特性,可以修改squarer的循环,当naturals通道读完以后,关闭squares通道

//squarer
go func() {
    for {
        x, ok := <-naturals
        if !ok {
            break  //通道关闭并且读完
        }
        squares <- x * x
    }
    close(squares)
}()

//Go语言也提供了range循环语法以在通道上迭代。这个语法更方便接收在通道上所有发送的值,接收完最后一个值后关闭循环

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    //counter
    go func() {
        for x := 0; x < 100; x++{
            naturals <- x
        }
        close(naturals)
    }()

    //squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()

    //printer
    for x := range squares {
        fmt.Println(x)
    }
}

结束时,关闭每一个通道不是必需的。只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道也是可以通过垃圾回收器根据它是否可以访问来决定是否回收它,而不是根据它是否关闭

试图关闭一个已经关闭的通道会导致宕机,就像关闭一个空通道一样

//向关闭的channel发送数据,会导致panic
//v, ok <- ch; ok为bool值,true表示正常接受,false表示通道关闭
//所有的channel接收者都会在channel关闭时,立刻从阻塞等待中返回且上述ok值为false。这个广播机制常被利用,进行向多个订阅者同时发送信号
func dataProducer(ch chan int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i++ { ch <- i }
close(ch) wg.Done() }() } func dataReceiver(ch chan
int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i++{ if data, ok := <-ch; ok{
             fmt.Println(data)
        }else {
break
} } wg.Done() }() }
//如果从关闭的通道中接收会收到通道中数据类型对应的零值
//向关闭的通道继续发送数据会导致panic

单向通道类型

Go的类型系统提供了单向通道类型,仅仅导出发送或接收操作。类型chan <- int是一个只能发送的通道,允许发送但不允许接收;类型<-chan int是一个只能接收的int类型通道,允许接收但是不能发送

因为close操作说明了通道上没有数据在发送,仅仅在发送方goroutine上才能调用它,所以试图关闭一个仅能接收的通道在编译时会报错

func counter(out chan<- int) {
    for x:=0; x<100;x++{
        out <- x
    }
    close(out)
}

func squarer(out chan<-int, in <-chan int) {
    for v: = range in {
        out <- v * v
    }
    close(out)
}

func printer(in <- chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go counter(naturals)
    go squarer(squarers, naturals)
    printer(squares)
}

//counter(naturals)的调用隐式地将chan int类型转化为参数要求的chan<-int类型。调用printer(squares)做了类似<-chan int的转变。在任何赋值操作中将双向通道转换为单向通道都是允许的,但是反过来是不行的,一旦有一个像chan<-int这样的单向通道,是没有办法通过它获取到引用同一个数据结构的chan int数据类型

缓冲通道

缓冲通道有一个元素队列,队列的最大长度在创建的时候通过make的容量参数来设置,下面的语句创建一个可以容纳三个字符串的缓冲通道

ch=make(chan string, 3)

缓冲通道上的发送操作在队列的尾部插入一个元素,接收操作从队列的头部移除一个元素。如果通道满了,发送操作会阻塞所在的goroutine直到另一个goroutine对它进行接收操作来留出可用的空间。反过来,如果通道是空的,执行接收操作的goroutine阻塞,直到另一个goroutine在通道上发送数据

程序需要直到通道缓冲区的容量,可以通过调用内置的cap函数获取

当使用内置的len函数时,可以获取当前通道内的元素个数

//缓冲通道的一个应用,它并发地向三个镜像地址发请求
func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <- responses   //return the quickest response
}

func request(hostname string) (response string) {/*...*/}

如果使用一个无缓冲通道,两个比较慢的goroutine将被卡住,因为在它们发送响应结果到通道的时候没有goroutine来接收。这个情况叫作goroutine泄露,它属于一个bug。不像回收变量,泄露的goroutine不会自动回收

无缓冲和缓冲通道的选择、缓冲通道容量大小的选择,都会对程序的正确性产生影响。无缓冲通道提供强同步保障,因为每一次发送都需要和一次对应的接收同步;对于缓冲通道,这些操作则是解耦的。如果知道要发送的值数量的上限,通常会创建一个容量是使用上限的缓冲通道,在接收第一个值前就完成所有的发送。在内存无法提供缓冲容量的情况下,可能导致程序死锁

  • 并行循环???????

一些通用的并行模式,来并行执行所有的循环迭代

package thumbnail

//ImageFile从infile中读取一幅图像并把它的缩略图写入同一个目录中
//它返回生成的文件名,比如"foo.thumb.jpg"

func ImageFile(infile string) (string, error)

//makeThumbnails 生成指定文件的缩略图
func makeThumbnails(filenames []string) {
    for _, f := range filenames {
        if _, err := thumbnail.ImageFile(f); err != nil {
            log.Println(err)
        }
    }
}

//第一个并行版本,不正确
func makeThumbnails2(filenames []string) {
    for _, f := range filenames {
        go thumbnail.ImageFile(f)
    }
}

//没有一个直接的访问等待goroutine结束,但是可以修改内层goroutine,通过一个共享的通道发送事件来向外层goroutine报告它的完成
//makeThumbnails3并行生成指定文件的缩略图
func makeThumbnails3(filenames []string) {
    ch := make(chan struct{})
    for _, f := range filenames {
        go func(f string) {
            thumbnail.ImageFile(f)  //此处忽略了可能的错误
            ch <- struct{}{}
        }(f)
    }
    //等待goroutine完成
    for range filenames {
        <-ch
    }  
}
......

//为了知道什么时候最后一个goroutine结束,需要在每一个goroutine启动前递增计数,在每一个goroutine结束时递减计数。这需要一个特殊类型的计数器,它可以被多个goroutine安全地操作,然后有一个方法一直等到它变为0。这个计数器类型是sync.WaitGroup

//makeThumbnails6为从通道接收到的每个文件生成缩略图
//它返回其生成的文件占用的字节数
func makeThumbnails6(filenames <- chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup //工作goroutine的个数
for f := range filenames {
wg.Add(1)
//worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) //可以忽略错误
sizes <- info.Size()
}(f)
}
}
  • 示例:并发的Web爬虫
func crawl(url string) []string {
    fmt.Println(url)
    list, err := links.Extract(url)
    if err != nil {
        log.Print(err)
    }
    return list
}

//main函数,一个任务列表记录需要处理的条目队列,每一个条目是一个待爬取的URL列表,使用通道代替slice来表示队列。每一次对crawl的调用发生在它自己的goroutine中,然后将发现的链接发送回任务列表

func main() {
    
    worklist := make(chan []string)

    //从命令行参数开始
    go func() { worklist <- os.Args[1:] }()

    //并发爬取Web
    seen := make(map[string]bool)
    for list := range worklist {
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                go func(link string) {
                    worklist <- crawl(link)
                }(link)
            }
        }
    }
} 

可以使用容量为n的缓冲通道来建立一个并发原语,称为计数信号量。概念上,对于缓冲通道中的n个空闲槽,每一个代表一个令牌,持有者可以执行。通过发送一个值到通道中来领取令牌,从通道中接收一个值来释放令牌,创建一个新的空闲槽。这保证了在没有接收操作的时候,最多同时有n个发送

//重写crawl函数,使用令牌的获取和释放操作来包括对links.Extract函数的调用,这样保证最多同时20个调用可以进行

//令牌是一个计数信号量
//确保并发请求限制在20个以内
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
    fmt.Println(url)
    tokens <- struct{}{}  //获取令牌
    list, err := links.Extract(url) <- tokens  //释放令牌
    
    if err != nil {
        log.Print(err)
    }
    return list
}
//第二个问题是这个程序永远不会结束,即使它已经从初始URL发现了所有的可达链接
//为了让程序终止,当任务列表为空且爬去goroutine都结束以后,需要从主循环退出
func main() {
    worklist := make(chan []string)
    var n int  //等待发送到任务列表的数量
    
    //从命令行参数开始
    n++
    go func() {
        worklist <- os.Args[1:]
    }()

    //并发爬取Web
    seen := make(map[string]bool)
    for ; n > 0; n-- {
        list := <- worklist
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                n++
                go func(link string) {
                    worklist <- crawl(link)
                }(link)
            }
        }
    }
}
//在这个版本中,计数器n跟踪发送到任务列表中的任务个数。每次知道一个条目被发送到任务列表时,就递增变量n,第一次递增是在发送初始化命令行参数之前,第二次递增是在每次启动一个新的爬取goroutine的时候。
//一个解决过度并发问题的替代方案,这个版本使用最初的crawl函数,它没有计数信号量,但是通过20个长期存活的爬虫goroutine来调用它,这样确保最多20个HTTP请求并发执行
func main() {
    worklist := make(chan []string)  //可能有重复的URL列表
    unseenLinks := make(chan string)  //去重后的URL列表

    //向任务列表中添加命令行参数
go func() { worklist <- os.Args[1:] }()
//创建20个爬虫goroutine来获取每个不可见链接
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}
//主goroutine对URL列表进行去重
//并把没有爬取过的条目发送给爬虫程序
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link
}
}
}
}
//爬取goroutine使用同一个通道unseenLinks进行接收,主goroutine负责对从任务列表接收到的条目进行去重,然后发送每一个没有爬取过的条目到unseenLinks通道
//然后被爬取goroutine接收
  • 使用select多路复用
//多渠道的选择  case后面跟不同的channel  阻塞
select {
case ret := <-retCh1:
    t.Logf("result %s", ret)
case ret := <-retCh2:
    t.Logf("result %s", ret)
default:
    t.Error("No one returned")
}

//超时控制
select {
case ret :<-retCh:
    t.Logf("result %s", ret)
case <-time.After(time.Second * 1):
    t.Error("time out")
}

time.Tick函数返回一个通道,它定期发送事件

func main() {
    fmt.Println("Commencing countdown.")
    tick := time.Tick(1 * time.Second)
    for countdown := 10; countdown > 0; countdown-- {
        fmt.Println(countdown)
        <-tick
    }
    launch()
}

abort := make(chan struct{})
go func() {
    os.Stdin.Read(make([]byte, 1))  //读取单个字节
    abort <- struct{}{}
}()

//现在每一次倒计时迭代需要等待事件到达两个通道中的一个:计时器通道,前提是一切顺利;或者中止事件前提是有异常anomaly。不能只从一个通道上接收,因为哪一个操作都会在完成前阻塞。所以需要多路复用那些操作过程,为了实现这个目的,需要一个select语句:
select {
case <- ch1:
    //...
case x := <-ch2:
    //...use x...
case ch3 <- y:
    //...
default:
    //...
}
//上面是select语句的通用形式。像switch语句一样,它有一系列的情况和一个可选的默认分支。每一个情况指定一次通信(在一些通道上进行发送或接收操作)和关联的一段代码块
//select一直等待,直到一次通信来告知有一些情况可以执行。然后,它进行这次通信,执行此情况所对应的语句;其他的通信将不会发生。对于没有对应情况的select,select{}将永远等待

func main() {
    //...创建中止通道...
    fmt.Println("Commencing countdown. Press return to abort.")
    select {
    case <- time.After(10 * time.Second):
        //不执行任何操作
    case <- abort:
        fmt.Println("Launch aborted!")
        return
    }
    launch()
}
//通道ch的缓冲区大小为1,它要么是空的,要么是满的
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
    select {
    case x := <-ch:
        fmt.Println(x)
    case ch <- i:
    }
}
//下面的select语句使每一次迭代使用1s来等待中止,但不会更长
func main() {
    //...创建中止通道...
    fmt.Println("Commencing countdown. Press return to abort.")
    tick := time.Tick(1 * time.Second)
    for countdown := 10; countdown > 0; countdown-- {
        fmt.Println(countdown)
        select {
        case <- tick:
            //什么操作也不执行
        case <- abort:
            fmt.Println("Launch aborted!")
            return
        }
    }
    launch()
}

//time.Tick函数的行为很像创建一个goroutine在循环里面调用time.sleep,然后在它每次醒来时发送事件。当上面的倒计时函数返回时,它停止从tick通道中接收事件,但是计时器goroutine还在运行,徒劳地向一个没有goroutine在接收的通道不断发送-->会发生goroutine泄露
//Tick函数很方便使用,但是它仅仅在应用的整个生命周期中都需要时才合适。否则,我们需要使用这个模式:
ticker := time.NewTicker(1 * time.Second)

<-ticker.C  //从ticker的通道接收

ticker.Stop()  //造成ticker的goroutine终止

//有时候我们试图在一个通道上发送或接收,但是不想在通道没有准备好的情况下被阻塞--非阻塞通信。这使用select语句也可以做到。select可以有一个默认情况,它用来指定在没有其他的通信发生时可以立即执行的动作

//下面的select语句尝试从abort通道中接收一个值,如果没有值,它什么也不做。这是一个非阻塞的接收操作;重复这个动作称为对通道轮询:
select {
case <- abort:
    fmt.Printf("Launch aborted!
")
    return
default:
    //不执行任何操作
}

//通道的零值是nil
//nil通道有时候很有用。因为在nil通道上发送和接收将永远阻塞,对于select语句中的情况,如果其通道是nil,它将永远不会被选择
  • 示例:并发目录遍历
  • 取消

有时候我们需要让一个goroutine停止它当前的任务。一个goroutine无法直接终止另一个,因为这样会让所有的共享变量状态处于不确定状态

对于取消操作,需要一个可靠的机制在一个通道上广播一个事件,这样很多goroutine可以认为它发生了,然后可以看到它已经发生

//加入取消机制,第一步,创建一个取消通道,在它上面不发送任何值,但是它的关闭表明程序需要停止它正在做的事情。也定义了一个工具函数cancelled,在它被调用的时候检测或轮询取消状态
var done = make(chan struct{})  //done是一个通道

func cancelled() bool {
    select {
    case <- done:
        return true
    default:
        return false
    }
}

//接下来创建一个读取标准输入的goroutine,它通常连接到终端。一旦开始读取任何输入(例如用户按回车键)时,这个goroutine通过关闭done通道来广播取消事件
//当检测到输入时取消遍历
go func() {
    os.Stdin.Read(make([]byte, 1))  //读一个字节
    close(done)
}()

//现在需要让goroutine来响应取消操作。在主goroutine中,添加第三个情况到select语句中,它尝试从done通道接收。如果选择这个情况,函数将返回,但是在返回之前它必须耗尽fileSizes通道,丢弃它所有的值,直到通道关闭。这样是为了保证所有的walkDir调用可以执行完,不会卡在向fileSizes通道发送消息上
for {
    select {
    case <- done:
        //耗尽fileSizes以允许已有的goroutine结束
        for range fileSizes{
            //不执行任何操作
        }
        return
        case size, ok := <- fileSizes:
            //......
    }
}

//walkDir goroutine在开始的时候轮询取消状态,如果设置状态,什么都不做立即返回。它让在取消后创建的goroutine什么都不做:
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan <- int64) {
    defer n.Done()
    if cancelled() {
        return
    }
    for _, entry := range dirents(dir) {
        //......
    }
}

 Context与任务取消

根Context:通过context.Background()创建

子Context:context.WithCancel(parentContext)创建

    ctx, cancel := context.WithCancel(context.Background())

当前Context被取消时,基于他的子context都会被取消

接收取消通知<-ctx.Done()

  • 示例:聊天服务器
原文地址:https://www.cnblogs.com/liushoudong/p/13067556.html