golang的channel实现

golangchannel实现位于src/runtime/chan.go文件。golang中的channel对应的结构是:

// Invariants:
//  At least one of c.sendq and c.recvq is empty,
//  except for the case of an unbuffered channel with a single goroutine
//  blocked on it for both sending and receiving using a select statement,
//  in which case the length of c.sendq and c.recvq is limited only by the
//  size of the select statement.
//
// For buffered channels, also:
//  c.qcount > 0 implies that c.recvq is empty.
//  c.qcount < c.dataqsiz implies that c.sendq is empty.

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //  
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

注:源码中的raceenabled表示是否启用race detector, 所以,如果你只需要了解channel的机制,可以忽略raceenabled的相关代码。当raceenabledfalse时,相关函数位于runtime/race0.go文件,如果raceenabledtrue,相关函数代码位于runtime/race.go文件。具体race detector的作用可以参考:

https://blog.golang.org/race-detector

对于这个结构中的成员,其中qcount表示当前buffer中的元素个数,这些元素可以被recv函数(x <- c) 立刻读取; dataqsiz表示buf可以存储的最大个数,这里的buf使用环形队列,dataqsiz就是make函数创建chan时传入的大小;buf指向环形队列,如果dataqsiz为0,或者元素大小为0,buf会象征性地指向一段地址;elemsize表示单个元素的大小;sendx表示send函数(c <- x)处理的buf位置;recvx标识recv函数处理到的buf位置;recvq表示等待recv的go程相关信息;sendq表示等待send的go程相关信息。 lock为保护结构体成员的锁。

下面给出waitq结构体的定义,以及其中成员sudog的定义,其中结构体g (type g struct)表示一个go程信息,用于go程的管理:

type waitq struct { 
    first *sudog 
    last  *sudog 
}

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//  
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.
    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    ...

    c           *hchan // channel
}

在讲解函数之前,给出一些常量的说明:

  1. maxAlign 用于对齐,在内存中,结构体按照一定字节对齐,访问速度会更快,常见的结构体对齐要求是8字节对齐。
  2. hchanSize 表示,如果hchan8字节对齐的话,那么chan所占的大小,源码中的:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

等价于:

const hchanSize = unsafe.Sizeof(hchan{})+(maxAlign-1) - ((unsafe.Sizeof(hchan{})+(maxAlgn-1))%maxAlign

简单举例如下,如果unsafe.Sizeof(hchan{})为5,则hchanSize为8, 如果unsafe.Sizeof(hchan{})为8,则hchanSize为8,如果unsafe.Sizeof(hchan{})为10, 则unsafe.Sizeof(hchan{})为16,简单说,就是补足到8的倍数。

下面给出构建chan的函数:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // check size and type information 
  ...

  switch {
  // 特殊情况,例如元素个数为0,元素大小为0等
  ...
  default:
    // Elements contain pointers.
    c = new(hchan)
    c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
  }

  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

 这个函数比较简单,就是构建hchan中的数据成员,在源码中,对于元素个数为0,元素大小为0,以及元素不为指针进行了特殊处理,如果想要理解chan的实现,这些细节可以暂时不关注。

下面先给出在send和recv中使用的等待和唤醒函数:

// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G’s stack, as it may be moved between
// the call to gopark and the call to unlockf.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
    ...
}

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason string, traceEv byte, traceskip int) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceSkip)
}

 下面给出chansend函数的实现 (c <- x):

// block 用来表示是否阻塞, ep是要处理的元素(elemenet pointer)
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  if c == nil {
    if !block {
      return false
    }
    gopark(nil, nil, “chan send (nil chan)”, traceEvGoStop, 2)
    throw(“unreachable”)
  }
  ...
  // Fast path: check for failed non-blocking operation without acquiring the lock.
  if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
  }
  ...
  lock(&c.lock)

  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError(“send on closed channel”)
  }

  if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }

  if c.qcount < c.dataqsiz {
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    ...
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }

  if !block {
    unlock(&c.lock)
    return false
  }

  // Block on the channel. Some receiver will complete our operation for us.
  mysg := acquireSudog()
  ...
  c.sendq.enqueue(mysg)
  goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend, 3)

  // someone woke us up
  ...
  releaseSudog(mysg)
  return true
}

 由上面的代码可以看出,对于send函数,调用顺序如下:

(1)如果channel为nil,在block为false的情况下,则返回false,否则调用gopark函数。

(2)如果当前队列没有空间,而且没有等待recv的go程,在block为false的情况下,返回false,否则进入下面的流程。

(3)如果有等待recv的go程(说明buf中没有可以recv的值),那么从等待recv的列表中获取第一个go程信息,然后将ep直接发送给那个go程。这里的队列为先入先出队列。

(4)如果buf中有recv的值(说明没有recv的go程在等待),那么将ep写入到buf中,这里也是先写入,先读取。

(5)在block为false的情况下,返回false,否则进入等待,等待recv go程的出现。

上述说明,没有说明如果channel为关闭的情况,如果channel关闭,正常触发panic的异常。

下面给出send函数的实现,根据send函数,可以简单了解go程是如何被唤醒的:

// 在上面的函数调用中 unlockf: func() { unlock(&c.lock) }
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ...
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  ...
  goready(gp, skip+1)
}

 其中上面的goready用来唤醒go程,其中gp的含义大致为go pointer,表示一个go程信息的指针。

下面给出recv函数的定义:

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, “chan receive (nil chan)”, traceEvGoStop, 2)
        throw(“unreachable”)
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }
    ...
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        ...
        unlock(&c.lock)
        if ep != nil {
          // memory clear
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }

  if sg := c.sendq.dequeue(); sg != nil {
    // Found a waiting sender. If buffer is zero, receive value
    // directly from sender. Otherwise, receive from head of queue
    // and add sender’s value to the tail of the queue (both map to
    // the same buffer slot because the queue is full).
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }

  if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
      ...
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recv++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }

  if !block {
    unlock(&c.lock)
    return false, false
  }

  // no sender available: block on this channel.
  mysg := acquireSudog()
  ...
  c.recvq.enqueue(mysg)
  goparkunlock(&c.lock, “chan receive”, traceEvGoBlockRecv, 3)

  // someone woke us up
  ...
  releaseSudog(mysg)
  return true, !closed
}                                        

 由上面的代码可以看出,对于recv函数,调用顺序如下:

(1)如果channel为nil,在block为false的情况下,返回(false, false),否则调用gopark函数。

(2)如果buf为空,并且没有等待send的go程,channel没有关闭,在block为false的情况下,返回(false, false), 否则进入下面的流程。

(3)如果当前channel已经关闭,并且buf中没有值,则返回(true, false)。

(4)如果当前有等待send的go程,分为如下两种情况:

  1)buf为空,则直接将send go程发送的信息,发送给调用recv函数的go程

  2)将buf中最先写入的值发送给recv函数的go程,然后将send队列表头的go程中的值写入到buf中

(5)如果当前没有等待send的go程,而且buf中有值,则将第一个写入的值发送给recv函数的go程。

(6)如果当前buf没有值,在block为false的情况下,返回(false, false),否则等待send go程的出现。

从上面的chansend和chanrecv函数的实现来看,可以得出下面的结论:

(1)先调用chansend的go程会第一个被唤醒。

(2)先调用chanrecv的go程会被第一个被唤醒。

(3)先调动chansend的go程写入管道(channel)中的值,会最先从buf中取出。

下面给出close函数的实现:

func closechan(c *hchan) {
  ...
  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError(“close of closed channel”))
  }

  c.closed = 1
  var glist *g

  // release all readers
  for {
    sg := c.recvq.dequeue()
    ...
    gp.schedlink.set(glist)
    glist = gp
  }

  // release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    ...
    gp.schedlink.set(glist)
    glist = gp
  }
  unlock(&c.lock)

  // Ready all Gs now that we’ve dropped the channel lock
  for glist != nil {
    gp := glist
    glist = glist.schedlink.ptr()
    gp.schedlink = 0
    goready(gp, 3)
  }
}

 从closechan的实现来看,closechan会唤醒所有等待向这个channel send和向这个channel recv的go程,但是不会清空buf中的内容。

下面给出一个compiler建立的代码与函数的映射:

// compiler implements
//
//  select {
//  case c <- v:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if selectnbsend(c, v) {
//      ... foo
//  } else {
//      ... bar
//  }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//  select {
//  case v = <-c:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if selectnbrecv(&v, c) {
//      ... foo
//  } else {
//      ... bar
//  }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    selected, _ = chanrecv(c, elem, false)
    return
}

// compiler implements
//
//  select {
//  case v, ok = <-c:
//      ... foo
//  default:
//      ... bar
//  }
//
// as
//
//  if c != nil && selectnbrecv2(&v, &ok, c) {
//      ... foo
//  } else {
//      ... bar
//  }
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
    // TODO(khr): just return 2 values from this function, now that it is in Go.
    selected, *received = chanrecv(c, elem, false)
    return
}

 下面给出操作recvq和sendq队列的函数实现,实际上就是使用链表实现的先入先出队列的实现:

func (q *waitq) enqueue(sgp *sudog) {
    sgp.next = nil
    x := q.last
    if x == nil {
        sgp.prev = nil
        q.first = sgp
        q.last = sgp
        return
    }
    sgp.prev = x
    x.next = sgp
    q.last = sgp
}

func (q *waitq) dequeue() *sudog {
    for {
        sgp := q.first
        if sgp == nil {
            return nil
        }
        y := sgp.next
        if y == nil {
            q.first = nil
            q.last = nil
        } else {
            y.prev = nil
            q.first = y
            sgp.next = nil // mark as removed (see dequeueSudog)
        }

        ...

        return sgp
    }
}

 关于channel实现的简单讲解就到这里了。如果有什么建议或者提议,欢迎提出。

原文地址:https://www.cnblogs.com/albizzia/p/10867724.html