Golang 漫谈之channel妙法

除了《Kubernetes GO》系列之外,对于golang相关知识,同时准备了《Golang 漫谈》以增雅趣,不足之处,万望海涵,在此特别感谢雨痕的Golang 源码剖析

Golang 1.13.1已在9月26日正式发布,主要修复CVE-2019-16276,当然docker等相关组件也同时做了update

channel是Golang提供的goroutine间的通信方式,其为Golang并发模型CSP的关键,Golang鼓励用通讯实现数据共享,如果需要跨进程通信,建议使用分布式方案或者消息队列来解决。该文章主要介绍,以下内容:

  • channel介绍及范例
  • channel用法
  • channel使用场景
  • channel原理赏析

下面在进入正题之前,简要介绍一下CSP模型:

传统并发模型分为Actor模型与CSP模型,其中CSP全称为Communicating Sequential Processess,CSP模型有并发执行体(进程、线程、协程),和消息通道组成,执行体之间通过消息通道进行通讯,CSP模型关注消息发送的载体,即消息管道,而Actor关注的是内部的状态,那么Golang中执行体对应的是goroutine,消息通道对应的是channel。


一、channel介绍及范例

如上所言,channel 提供了一种通信机制,其为gouroutine之间的通信提供了一种可能,执行体拷贝数据,channel负责传递,有以下应用场景:

  • 广播,如消费者/生产者模型

  • 交换数据

  • 并发控制

  • 显示通知等

Golang鼓励使用通讯来实现数据共享,而不是经由内存。

1.1 channel特性

1)线程安全:hchan mutex

2)先进先出:copying into and out of hchan buffer

3)channel的高性能所在:

  • 调用runtime scheduler实现,OS thread不需要阻塞;
  • 跨goroutine栈可以直接进行读写;
1.2 channel类型

channel分为非缓存channel与缓存channel。

  • 无缓存channel

    从无缓存的channel中读取消息会堵塞,直到有goroutine往channel中发送消息;同理,向无缓存的channel中发送消息也会堵塞,直到有goroutine从channel中读取消息。

  • 有缓存channel

    有缓存channel的声明方式为指定make函数的第二个参数,该参数为channel缓存的容量。
    通过内置len函数可获取chan元素个数,通过cap函数可获取chan的缓存长度

单项channel :单向channel为只读/只写channel,单向channel,在编译时,可进行检测。

func testSingal(ch chan<- int) <- chan int {
  // 定义工作逻辑
}

其中 chan<- int表示只写channel, <-chan int表示只读channel,此类函数/方法声明可防止channel滥用,在编译时可以检测出。

1.3 channel创建

channel使用内置的make函数创建,如下,声明类型为int的channel:

// 非缓存channel
ch := make(chan int)
// 缓存channel
bch := make(chan int, 2)

channel和map类似,make创建了底层数据结构的引用,当赋值或参数传递时,只是拷贝了一个channel的引用,其指向同一channel对象,与其引用类型一样,channel的空值也为nil。使用==可以对类型相同的channel进行比较,只有指向相同对象或同为nil时,结果为true。

1.4 channel的读写操作

channel在使用前,需要初始化,否则永远阻塞。

ch := make(chan int)

// 写入channel
ch <- x

// 从channel中读取
y <- ch

// 从channel中读取
z := <- ch
1.5 channel的关闭

golang提供了内置的close函数,对channel进行关闭操作。

// 初始化channel
ch := make(chan int)
// 关闭channel ch
close(ch)

关于channel的关闭,需要注意以下事项:

  • 关闭未初始化的channle(nil)会panic
  • 重复关闭同一channel会panic
  • 向以关闭channel发送消息会panic
  • 从已关闭channel读取数据,不会panic,若存在数据,则可以读出未被读取的消息,若已被读出,则获取的数据为零值,可以通过ok-idiom的方式,判断channel是否关闭
  • channel的关闭操作,会产生广播消息,所有向channel读取消息的goroutine都会接受到消息
package main

import fmt

func main() {
    // 初始化channel
    ch := make(chan int, 3)
    // 发送消息
    ch <- 1
    ch <- 2
    # 关闭channel
    close(ch)
    // 循环读取
    for c := range ch {
      fmt.Println(c)
    }
}
1.6 两类channel
ch := make(chan int, 1)

有缓存的channel使用环形数组实现,当缓存未满时,向channel发送消息不会阻塞,当缓存满时,发送操作会阻塞,直到其他goroutine从channel中读取消息;同理,当channel中消息不为空时,读取消息不会阻塞,当channel为空时,读取操作会阻塞,直至其他goroutine向channel发送消息。


ch := make(chan int)
// 阻塞,因channel ch为空
<- ch
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

# 阻塞,因缓存已满
ch <- 4
<- ch

二、channel的用法

2.1 goroutine通信

看下面《effctive go》中的例子:
主goroutine会阻塞,直至执行sort的goroutine完成

// 初始化chan
c := make(chan int)
# 使用goroutine执行list.Sort(),完毕后,发送信号
go func() {
  list.Sort()
  c <- 1
}()
// 处理其他事务
doSomething()
// 读取chan消息
<-c
2.2 range遍历

channel也可以使用range取值,并且会一直从chanel中读取数据,直至goroutine关闭该channel,循环才会结束,如下所示。

// 初始化channel
ch := make(chan int, 5)
go func(){
  for i := 0; i < 5; i ++ {
    ch <- i
  }
}()
for i := range ch {
  fmt.Println(i)
}

等同于

// 初始化channel
ch := make(chan int, 5)
go func(){
  for i := 0; i < 5; i ++ {
    ch <- i
  }
}()
for {
  i, ok := <- ch
  if !ok {
    break
  }
  fmt.Println(i)
}
2.3 配合select使用

select用法类似IO多路复用,可同时监听多个channel的消息,如下所示:

select {
    case <- a;
      fmt.Println("testa")
    case <- b;
      fmt.Println("testb")
    case c <- 3;
      fmt.Println("testc")
    default:
      fmt.Println("testdefault")
}

select有以下特性:

  • select可同时监听多个channel的读/写
  • 执行select时,若只有一个case通过,则执行该case
  • 若有多个,则随机执行一个case
  • 若所有都不满足,则执行default,若无default,则等待
  • 可使用break跳出select

三、channel使用场景

3.1 设置超时时间
// 初始化channel,数据类型为struct{}
ch := make(chan struct{})
// 以goroutine方式处理func
go func(){
 // 处理逻辑
 // 传递ch,控制goroutine
}(ch)

timeout := time.After(1 * time.Sencond)
select {
  case <- ch:
      fmt.Printfln("任务完成.")
  case <- timeout:
      fmt.Printfln("时间已到.")
}
3.2 控制channel

在某些应用场景,工作goroutine一直处理事务,直到收到退出信号

mch := make(chan struct{})
quit := make(chan struct{})
for {
  select {
    case <- mch:
        // 正常工作
        work()
    case <- quit:
        // 退出前,处理收尾工作
        doFinish()
        return
  }
}

四、channel原理赏析

4.1 channel结构体

以下源码基于go 1.13.1,其主要实现在src/runtime/chan.go中,在介绍源码前,需要介绍channel最主要的结构体hchan,其定义如下所示:

type hchan struct {
  qcount   uint            // 当前队列中剩余元素个数,即len
  dataqsiz uint            // 环形队列长度,即可以存放的元素个数,cap
  buf      unsafe.Pointer  // 环形队列指针:队列缓存,头指针,环形数组实现
  elemsize uint16          // 每个元素的大小
  closed   uint32          // 关闭标志位
  elemtype *_type          // 元素类型
  sendx    uint            // 队列下标,指示元素写入时存放到队列中的位置
  recvx    uint            // 队列下标,指示元素从队列的该位置读出
  recvq    waitq           // 等待读消息的goroutine队列
  sendq    waitq           // 等待写消息的goroutine队列

  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  //
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock mutex              // 该锁保护hchan所有字段
}
// sending/receiving等待队列的链表实现
type waitq struct {
  first *sudog
  last  *sudog
}

hchan类型

一个channel只能传递一种类型的值,类型信息存储在hchan数据结构体中,_type结构体中包含elemtypeelemsize等。

  • elemetype代表类型,用于数据传递过程中的赋值
  • elemesize代码类型大小,用于在buf中定位元素位置

hchan环形队列
hchan内部实现了一个环形队列作为缓冲区,队列的长度是创建channel时指定的。下图展示了一个可缓存6个元素的channel的示意图:

buf

  • dataqsiz指示队列长度为6,即可缓存6个元素
  • buf指向队列的内存,队列中还剩余两个元素
  • qcount表示队列中还有两个元素
  • sendx指示后续写入的数据存储的位置,取值[0,6)
  • recvx指示从该位置读取数据,取值[0,6)

hchan等待队列
从channel读消息,如果channel缓冲区为空或者没有缓存区,当前goroutine会被阻塞。
向channel读消息,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会封装成sudog,加入到channel的等待队列中:

  • 因读消息阻塞的goroutine会被channel向channel写入数据的goroutine唤醒
  • 因写消息阻塞的goroutine会从channel读消息的goroutine唤醒

waitq

一般情况下,recvq和sendq至少一个为空,只有一个例外,即同一个goroutine使用select语句向channel一边写数据,一个读数据。

// sudog将*g封装到等待链表中
//(M)sudogs <-> (N) g
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
  // The following fields are protected by the hchan.lock of the
  // channel this sudog is blocking on. shrinkstack depends on
  // this for sudogs involved in channel ops.

  g *g

  // isSelect indicates g is participating in a select, so
  // g.selectDone must be CAS'd to win the wake-up race.
  isSelect bool
  next     *sudog
  prev     *sudog
  elem     unsafe.Pointer // data element (may point to stack)

  // The following fields are never accessed concurrently.
  // For channels, waitlink is only accessed by g.
  // For semaphores, all fields (including the ones above)
  // are only accessed when holding a semaRoot lock.

  acquiretime int64
  releasetime int64
  ticket      uint32
  parent      *sudog // semaRoot binary tree
  waitlink    *sudog // g.waiting list or semaRoot
  waittail    *sudog // semaRoot
  c           *hchan // channel
}

和其他一样,sudog也实现二级缓存复用结构。

runtime2.go

type p struct {
    // proceresice new(p)时指向sudogbuf
    sudogcache []*sudog
    sudogbuf   [128]*sudog
}
type schedt struct {
     // Central cache of sudog structs.
    sudoglock  mutex
    sudogcache *sudog
}
func acquireSudog() *sudog {
  // 获取当前m
  mp := acquirem()
  pp := mp.p.ptr()
  // 如果当前p为空
  if len(pp.sudogcache) == 0 {
    lock(&sched.sudoglock)
    // First, try to grab a batch from central cache.
    // 从全局转移一批到当前p
    for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
      s := sched.sudogcache
      sched.sudogcache = s.next
      s.next = nil
      pp.sudogcache = append(pp.sudogcache, s)
    }
    unlock(&sched.sudoglock)
    // 如果还为空,则创建
    if len(pp.sudogcache) == 0 {
      pp.sudogcache = append(pp.sudogcache, new(sudog))
    }
  }
  // 从尾部获取,同时调整p的缓存
  n := len(pp.sudogcache)
  s := pp.sudogcache[n-1]
  pp.sudogcache[n-1] = nil
  pp.sudogcache = pp.sudogcache[:n-1]
  if s.elem != nil {
    throw("acquireSudog: found s.elem != nil in cache")
  }
  releasem(mp)
  return s
}

//go:nosplit
func releaseSudog(s *sudog) {
  // 判断结构体是否为空
  if s.elem != nil {
    throw("runtime: sudog with non-nil elem")
  }
  if s.isSelect {
    throw("runtime: sudog with non-false isSelect")
  }
  if s.next != nil {
    throw("runtime: sudog with non-nil next")
  }
  if s.prev != nil {
    throw("runtime: sudog with non-nil prev")
  }
  if s.waitlink != nil {
    throw("runtime: sudog with non-nil waitlink")
  }
  if s.c != nil {
    throw("runtime: sudog with non-nil c")
  }
  gp := getg()
  if gp.param != nil {
    throw("runtime: releaseSudog with non-nil gp.param")
  }
  mp := acquirem() // avoid rescheduling to another P
  pp := mp.p.ptr()
  // 如果p已满,则转移到全局
  if len(pp.sudogcache) == cap(pp.sudogcache) {
    // Transfer half of local cache to the central cache.
    var first, last *sudog
    for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
      n := len(pp.sudogcache)
      p := pp.sudogcache[n-1]
      pp.sudogcache[n-1] = nil
      pp.sudogcache = pp.sudogcache[:n-1]
      if first == nil {
        first = p
      } else {
        last.next = p
      }
      last = p
    }
    lock(&sched.sudoglock)
    last.next = sched.sudogcache
    sched.sudogcache = first
    unlock(&sched.sudoglock)
  }
  pp.sudogcache = append(pp.sudogcache, s)
  releasem(mp)
}

转载: https://juejin.im/entry/5da165c9f265da5b8f107dbf

浪漫家园,没事就来逛逛
原文地址:https://www.cnblogs.com/lovezbs/p/13126703.html