Go之go与channel组合使用

1,等待一个事件
<-ch 将一直阻塞,直到ch被关闭 或者 ch中可以取出值 为止
所以到第17行之后会去执行go后面的func()匿名函数,在里面给ch赋值后(或者close(ch))后,才能继续往后执行
  1. package main
  2. import(
  3. "fmt"
  4. )
  5. func main(){
  6. fmt.Println("Begin doing something!")
  7. ch := make(chan int)
  8. go func(){
  9. fmt.Println("Doing something…")
  10. ch <-22
  11. //close(ch)
  12. }()
  13. <-ch //此处将被阻塞,直到ch被关闭 或 有值可以取出
  14. fmt.Println("Done!")
  15. }
 
2,协同多个Goroutines
同上,close channel还可以用于协同多个Goroutines,比如下面这个例子,我们创建了100个Worker Goroutine,这些Goroutine在被创建出来后都阻塞在"<-start"上,直到我们在main goroutine中给出开工的信号:"close(start)",这些goroutines才开始真正的并发运行起来。
  1. package main
  2. import"fmt"
  3. func worker(start chan bool, index int){
  4. <-start // 从start中取出数据后,调用20行的case语句
  5. fmt.Println("This is Worker:", index)
  6. }
  7. func main(){
  8. start := make(chan bool)
  9. for i :=1; i <=10; i++{
  10. go worker(start, i)
  11. }
  12. //给start赋值10次,让worker方法执行10次
  13. for i :=1; i <=10; i++{
  14. start <-true//给start赋值一次,便执行worker函数一次
  15. }
  16. v :=1
  17. //select 被一直阻塞直到start中数据被取出
  18. select{  //deadlock we expected
  19. case<-start:
  20. fmt.Print(v)
  21. v++
  22. }
  23. }
3,Select
  • select常与for一起使用
  1. for{
  2. select{
  3. case x :=<- somechan:
  4. // … 使用x进行一些操作
  5. case y, ok :=<- someOtherchan:
  6. // … 使用y进行一些操作,
  7. // 检查ok值判断someOtherchan是否已经关闭
  8. case outputChan <- z:
  9. // … z值被成功发送到Channel上时
  10. default:
  11. // … 上面case均无法通信时,执行此分支
  12. }
  13. }
  • 终结woker
 
下面是一个常见的终结sub worker goroutines的方法,
每个worker goroutine通过select监视一个die channel来及时获取main goroutine的退出通知。
  1. package main
  2. import(
  3. "fmt"
  4. "time"
  5. )
  6. func worker(die chan bool, index int){
  7. fmt.Println("Begin: This is Worker:", index)
  8. for{
  9. select{
  10. //case xx:
  11. //做事的分支
  12. case<-die: //到这里就被阻塞,运行main中的close后输出 done
  13. fmt.Println("Done: This is Worker:", index)
  14. return
  15. }
  16. }
  17. }
  18. func main(){
  19. die:= make(chan bool)
  20. for i :=1; i <=10; i++{
  21. go worker(die, i)
  22. }
  23. time.Sleep(time.Second*5)
  24. close(die)
  25. select{}//deadlock we expected
  26. }
 
  • 终结验证
有时候终结一个worker后,main goroutine想确认worker routine是否真正退出了,可采用下面这种方法:
  1. package main
  2. import(
  3. "fmt"
  4. //"time"
  5. )
  6. func worker(die chan bool){
  7. fmt.Println("Begin: This is Worker")
  8. for{
  9. select{
  10. //case xx:
  11. //做事的分支
  12. case<-die://这里等待27行的赋值语句,如果没有赋值,一直阻塞
  13. fmt.Println("Done: This is Worker")
  14. die<-true
  15. return
  16. }
  17. }
  18. }
  19. func main(){
  20. die:= make(chan bool)
  21. go worker(die)
  22. die<-true
  23. istrue :=<-die//这里等待16行的赋值,赋值完毕后程序继续执行
  24. fmt.Println("Worker goroutine has been terminated", istrue)
  25. }
  • 关闭的Channel永远不会阻塞
  1. package main
  2. import"fmt"
  3. func main(){
  4. cb := make(chan bool)
  5. close(cb)//当cb被关闭后,所有的取值操作将不会被阻塞
  6. x :=<-cb
  7. fmt.Printf("%#v ", x)
  8. x, ok :=<-cb
  9. fmt.Printf("%#v %#v ", x, ok)
  10. ci := make(chan int)
  11. close(ci)
  12. y :=<-ci //即使ci被关闭,照样可以从ci中取数据,取得0
  13. fmt.Printf("%#v ", y)
  14. cb <-true
  15. }

false

false false

0

panic: send on closed channel 

19行将报异常

可以看到在一个已经close的unbuffered channel上执行读操作,回返回channel对应类型的零值,比如bool型channel返回false,int型channel返回0。但向close的channel写则会触发panic。不过无论读写都不会导致阻塞。

 

  • (5)关闭带缓存的channel
 
  1. package main
  2. import"fmt"
  3. func main(){
  4. c := make(chan int,3)
  5. c <-15
  6. c <-34
  7. c <-65
  8. close(c)
  9. fmt.Printf("%d ",<-c)//channel被关闭后,照样可以从channel中取出数据,只是不能向其中写数据
  10. fmt.Printf("%d ",<-c)
  11. fmt.Printf("%d ",<-c)
  12. fmt.Printf("%d ",<-c)//当channel中数据全部被取出时,将输出0
  13. c <-1
  14. }
15
34
65
0
panic: runtime error: send on closed channel 
16行将报异常
 
可以看出带缓冲的channel略有不同。尽管已经close了,但我们依旧可以从中读出关闭前写入的3个值。第四次读取时,则会返回该channel类型的零值。向这类channel写入操作也会触发panic。
 
  • range
Golang中的range常常和channel并肩作战,它被用来从channel中读取所有值。下面是一个简单的实例:
  1. package main
  2. import"fmt"
  3. func generator(strings chan string){
  4. strings <-"Five hour's New York jet lag"
  5. strings <-"and Cayce Pollard wakes in Camden Town"
  6. strings <-"to the dire and ever-decreasing circles"
  7. strings <-"of disrupted circadian rhythm."
  8. close(strings)
  9. }
  10. func main(){
  11. strings := make(chan string)
  12. go generator(strings)
  13. for s := range strings {
  14. fmt.Printf("%s ", s)//这里的s 相当于 <- strings,只有赋值之后才能读取到,否则一直阻塞
  15. }
  16. fmt.Printf(" ")
  17. }
4. 隐藏状态
没有缓存 的chan默认为1个单位的缓存
  1. package main
  2. import"fmt"
  3. func newUniqueIDService()<-chan string{
  4. id := make(chan string)
  5. go func(){
  6. var counter int64 =0
  7. for{
  8. id <- fmt.Sprintf("%x", counter)
  9. counter +=1
  10. }
  11. }()
  12. return id
  13. }
  14. func main(){
  15. id := newUniqueIDService()
  16. for i :=0; i <10; i++{
  17. fmt.Println(<-id) //被阻塞,直到id被赋值
  18. }
  19. }
 
newUniqueIDService通过一个channel与main goroutine关联,main goroutine无需知道uniqueid实现的细节以及当前状态,只需通过channel获得最新id即可。
 
 
5,select的default分支的实践用法
  •      读取时为空
  1. idle := make(chan []byte,5)//用一个带缓冲的channel构造一个简单的队列
  2. select{
  3. case<-idle:
  4. //尝试从idle队列中读取
  5. fmt.Println("读取")
  6. default://队列空,分配一个新的buffer
  7. fmt.Println("写入")
  8. }
  •     写入时已满
  1. package main
  2. import(
  3. "fmt"
  4. )
  5. func main(){
  6. idle := make(chan []int,10)//用一个带缓冲的channel构造一个简单的队列
  7. var b =[]int{2,1}
  8. select{
  9. case idle <- b://尝试向队列中插入一个buffer
  10. fmt.Println(idle)
  11. default://队列满?
  12. println("队列满")
  13. }
  14. }
6,Nil Channels
 
  • nil channels阻塞 : 对一个没有初始化的channel进行读写操作都将发生阻塞
 
  1. package main
  2. func main(){
  3. var c chan int
  4. c <-1
  5. }
将发生阻塞
 
  • 循环输出
  1. package main
  2. import"fmt"
  3. import"time"
  4. func main(){
  5. var c1, c2 chan int= make(chan int), make(chan int)
  6. go func(){
  7. time.Sleep(time.Second*5)
  8. c1 <-5
  9. close(c1)
  10. }()
  11. go func(){
  12. time.Sleep(time.Second*7)
  13. c2 <-7
  14. close(c2)
  15. }()
  16. for{
  17. select{
  18. case x :=<-c1://在等待5s后,把值取出,如果close(c1)那么,将一直输出0
  19. fmt.Println(x)
  20. case x :=<-c2://在等待7s后,输出c2的值并退出
  21. fmt.Println(x)
  22. return
  23. }
  24. }
  25. fmt.Println("over")
  26. }
输出的结果是
5
0
0
0
...
7
 
改为交替输出
  1. package main
  2. import"fmt"
  3. import"time"
  4. func main(){
  5. var c1, c2 chan int= make(chan int), make(chan int)
  6. go func(){
  7. time.Sleep(time.Second*5)
  8. c1 <-5
  9. close(c1)
  10. }()
  11. go func(){
  12. time.Sleep(time.Second*7)
  13. c2 <-7
  14. close(c2)
  15. }()
  16. for{
  17. select{
  18. case x, ok :=<-c1://如果c1未被关闭,则输出x,如果x关闭,c1=nil
  19. if!ok {
  20. c1 =nil
  21. }else{
  22. fmt.Println(x)
  23. }
  24. case x, ok :=<-c2://如果c2未被关闭,则输出x,如果x关闭,c2=nil
  25. if!ok {
  26. c2 =nil
  27. }else{
  28. fmt.Println(x)
  29. }
  30. }
  31. if c1 ==nil&& c2 ==nil{
  32. break//如果=nil那么推出
  33. }
  34. }
  35. fmt.Println("over")
  36. }
5
7
over
 
 
7.Timers
  • 超时机制Timeout
带超时机制的select是常规的tip,下面是示例代码,实现30s的超时select:
 
  1. func worker(start chan bool){
  2. timeout := time.After(30* time.Second)
  3. for{
  4. select{
  5. // … do some stuff
  6. case<- timeout:
  7. return
  8. }
  9. }
  10. }
 
  • 心跳HeartBeart
与timeout实现类似,下面是一个简单的心跳select实现:
  1. func worker(start chan bool){
  2. heartbeat := time.Tick(30* time.Second)
  3. for{
  4. select{
  5. // … do some stuff
  6. case<- heartbeat:
  7. //… do heartbeat stuff
  8. }
  9. }
  10. }
 





原文地址:https://www.cnblogs.com/anbylau2130/p/4243735.html