Go语言学习之8 goroutine详解、定时器与单元测试

主要内容:

1.Goroutine
2. Chanel
3. 单元测试

1. Goroutine

     Go 协程(Goroutine)(轻量级的线程,开线程没有数量限制)。
   (1)进程和线程
  A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
  B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
  C. 一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

       例如:ngix是多进程的单线程程序

       内核线程、轻量级进程、用户线程三种线程概念详见:https://blog.csdn.net/gatieme/article/details/51481863 

    (2)并发和并行

        A. 并发是指立即处理多个任务的能力。多线程程序在一个核的cpu上运行(线程之间通过CPU轮询来执行),就是并发。go多线程的切换都是在用户态操作的,不像其他语言先切换到内核态,完成线程切换,然后返回用户态继续执行程序。
        B. 并行是指同时处理多个任务。多线程程序在多个核的cpu上运行,就是并行。
        例如:假如有一个 web 浏览器。这个 web 浏览器有各种组件。其中两个分别是 web 页面的渲染区和从网上下载文件的下载器。假设各个组件也都可以相互独立地运行。当浏览器在单核处理器中运行时,处理器会在浏览器的两个组件间进行上下文切换。它可能在一段时间内下载文件,转而又对用户请求的 web 页面进行渲染。这就是并发。并发的进程从不同的时间点开始,分别交替运行。在这里,就是在不同的时间点开始进行下载和渲染,并相互交替运行的。
       如果该浏览器在一个多核处理器上运行,此时下载文件的组件和渲染 HTML 的组件可能会在不同的核上同时运行。这称之为并行

       Go 编程语言原生支持并发。Go 使用 Go 协程(Goroutine) 和信道(Channel)来处理并发。

        注意:并行不一定会加快运行速度,因为并行运行的组件之间可能需要相互通信。在我们浏览器的例子里,当文件下载完成后,应当对用户进行提醒,比如弹出一个窗口。于是,在负责下载的组件和负责渲染用户界面的组件之间,就产生了通信。在并发系统上,这种通信开销很小。但在多核的并行系统上,组件间的通信开销就很高了。所以,并行不一定会加快运行速度!

       补充:用户线程指的是完全建立在用户空间的线程库,用户线程的建立,同步,销毁,调度完全在用户空间完成,不需要内核的帮助。因此这种线程的操作是极其快速的且低消耗的。

    (3)协程和线程
   协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
   线程:一个线程上可以跑多个协程,协程是轻量量级的线程。一个线程可以跑多个Goroutine。

        Go 协程相比于线程的优势:

  • 相比线程而言,Go 协程的成本极低。堆栈大小只有若干 kb,并且可以根据应用的需求进行增减。而线程必须指定堆栈的大小,其堆栈是固定不变的。
  • Go 协程会复用(Multiplex)数量更少的 OS 线程。即使程序有数以千计的 Go 协程,也可能只有一个线程。如果该线程中的某一 Go 协程发生了阻塞(比如说等待用户输入),那么系统会再创建一个 OS 线程,并把其余 Go 协程都移动到这个新的 OS 线程。所有这一切都在运行时进行,作为程序员,我们没有直接面临这些复杂的细节,而是有一个简洁的 API 来处理并发。
  • Go 协程使用信道(Channel)来进行通信。信道用于防止多个协程访问共享内存时发生竞态条件(Race Condition)。信道可以看作是 Go 协程之间通信的管道.

      GO语言Goroutine与线程的区别:https://baijiahao.baidu.com/s?id=1620972759226100794&wfr=spider&for=pc

    (4)goroutine调度模型

        M 代表内核级线程,一个M就是一个线程,goroutine就是跑在M之上的。
        P 全称是Processor,处理器,它的主要用途就是用来执行goroutine的,所以它也维护了一个goroutine队列,里面存储了所有需要它来执行的goroutine。
        G 就是goroutine实现的核心结构了,G维护了goroutine需要的栈、程序计数器以及它所在的M等信息。
        Sched 结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。

       如果有IO操作时,会新起一个线程等待IO操作的Goroutine

 

       

       Go scheduler: https://www.jianshu.com/p/1911b1229a44
       解释Goroutine浅显易懂:https://www.jianshu.com/p/7ebf732b6e1f
       Go语言 Goroutine 浅析:http://baijiahao.baidu.com/s?id=1587634508058779877&wfr=spider&for=pc
       Goroutine并发调度模型深度解析之手撸一个协程池:
       https://www.jianshu.com/p/fa6d82934cb8?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

       启动一个go协程?

 1 package main 
 2 
 3 import "fmt"
 4 
 5 func test_go() {
 6     fmt.Println("hello world")
 7 }
 8 
 9 func main() {
10     go test_go()
11     fmt.Println("main func finished")
12 }
启动一个go协程

      执行结果:

      

     分析:发现起的go协程go test_go()并没有生效,只打印出 hello world,这是由于,启动一个新的协程时,协程的调用会立即返回。与函数不同,程序控制不会去等待 Go 协程执行完毕。下面使用sleep使主线程处于睡眠之中等待go协程执行结束(实际中该方法不靠谱)。后面会介绍靠谱的方法。

 1 package main 
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 func test_go() {
 8     fmt.Println("hello world")
 9 }
10 
11 func main() {
12     go test_go()
13     time.Sleep(time.Second)
14     fmt.Println("main func finished")
15 }
sleep阻塞主线程等待go协程执行结束

      执行结果:

      

      启动多个go协程?

 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func numbers() {  
 9     for i := 1; i <= 5; i++ {
10         time.Sleep(250 * time.Millisecond)
11         fmt.Printf("%d ", i)
12     }
13 }
14 func alphabets() {  
15     for i := 'a'; i <= 'e'; i++ {
16         time.Sleep(400 * time.Millisecond)
17         fmt.Printf("%c ", i)
18     }
19 }
20 func main() {  
21     go numbers()
22     go alphabets()
23     time.Sleep(3000 * time.Millisecond)
24     fmt.Printf("
main terminated")
25 }
启动多个go协程

      读者可以自行分析该程序的时间片打印输出。

    (5)如何设置golang运行的cpu核数
         1.5之前go需要手动设置程序执行的内核数,1.5之后go自动设置

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "runtime"
 6 )
 7 
 8 func main() {
 9     num := runtime.NumCPU()  //查看有几个内核
10     fmt.Printf("cpu num:%d
", num) 
11     runtime.GOMAXPROCS(1)    //设置有程序用几个内核执行
12 }
获取CPU核数并设置执行程序的核数

    (6)不同goroutine之间进行通讯

        A:全局变量和锁同步

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6     "time"
 7 )
 8 
 9 var (
10     m    = make(map[int]uint64)
11     lock sync.Mutex
12 )
13 
14 type task struct {
15     n int
16 }
17 
18 func calc(t *task) {
19     var sum uint64
20     sum = 1
21     for i := 1; i < t.n; i++ {
22         sum *= uint64(i)
23     }
24 
25     fmt.Println(t.n, sum)
26     lock.Lock()  //加锁,不然多个协程修改全局变量会存在竞争
27     m[t.n] = sum
28     lock.Unlock()
29 }
30 
31 func main() {
32     for i := 0; i < 16; i++ {
33         t := &task{n: i}
34         go calc(t)
35     }
36 
37     time.Sleep(10 * time.Second)
38     lock.Lock()
39     for k, v := range m {
40         fmt.Printf("%d! = %v
", k, v)
41     }
42     lock.Unlock()
43 }
全局变量和锁同步

       B:Channel

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func write(ch chan int) {
 9     for i := 0; i < 100; i++ {
10         ch <- i
11         fmt.Println("put data:", i)
12     }
13 }
14 
15 func read(ch chan int) {
16     for {
17         var b int
18         b = <-ch
19         fmt.Println(b)
20         time.Sleep(time.Second)
21     }
22 }
23 
24 func main() {
25     intChan := make(chan int, 10)
26     go write(intChan)
27     go read(intChan)
28 
29     time.Sleep(10 * time.Second)
30 }
channel write and read

    (7)goroutine中使用recover

        如果某个goroutine出现panic,为了不使程序崩溃挂掉,可以在该goroutine中使用recover(类似于python中的try……except)捕获该panic。

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func test() {
 9     defer func() { //defer必须放置在最前面,才能捕获后面所有的panic,程序退出时执行defer
10         err := recover() //捕获goroutine错误
11         if err != nil {
12             fmt.Println(err)
13         }
14     }()
15 
16     var p *int
17     *p = 20 //panic
18 }
19 
20 func main() {
21     go test()
22     time.Sleep(time.Second)
23     fmt.Println("main progress exit")
24 }
example
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "runtime"
 6     "time"
 7 )
 8 
 9 func test() {
10 
11     defer func() {
12         if err := recover(); err != nil {  //处理panic,calc依然可以正常执行
13             fmt.Println("panic:", err)
14         }
15     }()
16 
17     var m map[string]int  //panic: assignment to entry in nil map
18     m["stu"] = 100
19 }
20 
21 func calc() {
22     for {
23         fmt.Println("i'm calc")
24         time.Sleep(time.Second)
25     }
26 }
27 
28 func main() {
29     num := runtime.NumCPU()
30     runtime.GOMAXPROCS(num - 1)
31     go test()
32     for i := 0; i < 2; i++ {
33         go calc()
34     }
35 
36     time.Sleep(time.Second * 10000)
37 }
recover示例2

2. 信道(Channel

     Channel 可以想像成 Go 协程之间通信的管道。如同管道中的水会从一端流到另一端,通过使用信道,数据也可以从一端发送,在另一端接收。

    (1)channel概念

  • 类似unix中管道(pipe)
  • 先进先出
  • 线程安全,多个goroutine同时访问,不需要加锁
  • channel是有类型的, 一个整数的channel只能存放整数

    (2) channel声明 

var 变量名 chan 类型,例如:
var test chan int 
var test chan string 
var test chan map[string]string 
var test chan stu   //stu是一个结构体

     注意:所有信道都关联了一个类型。信道只能运输这种类型的数据,而运输其他类型的数据都是非法的。

 1 package main 
 2 
 3 import "fmt"
 4 
 5 func main() {
 6     var ch chan int
 7     // ch = make(chan int, 1)
 8     ch<-1
 9     var num int
10     num = <-ch
11     fmt.Println(num)
12 }
声明未初始化信道

     运行结果:出现死锁

     

    (3)channel初始化

使用make进行初始化,例如: 
var test chan int
test = make(chan int, 10) 
var test chan string
test = make(chan string, 10)

     上面的程序声明了信道但是未初始化,去掉上面程序的注释初始化信道,执行结果:输出1
     注意:chan T 表示 T 类型的信道。
                信道的零值为 nil。信道的零值没有什么用,应该像对 map 和切片所做的那样,用 make 来定义信道。
     例如:

 1 package main
 2 
 3 import "fmt"
 4 
 5 func main() {  
 6     var ch chan int
 7     if ch == nil {
 8         fmt.Println("channel a is nil, going to define it")
 9         ch = make(chan int)
10         fmt.Printf("Type of a is %T", ch)
11     }
12 }
信道的声明

     快速声明一个信道:

ch := make(chan int)

    (4)channel基本操作

       信道旁的箭头方向指定了是发送数据还是接收数据

  • 从channel读取数据:

          var testChan chan int
          testChan = make(chan int, 10)
          var a int
          a = <- testChan  //箭头对于 testChan 来说是向外指的,因此我们读取了信道 testChan 的值,并把该值存储到变量 a 中。

  • 从channel写 入数据:

          var testChan chan int
          testChan = make(chan int, 10)
          var a int = 10
          testChan <- a   //箭头指向了 testChan,因此我们在把数据写入信道 testChan。
    (5)带缓冲区的channel

        对于无缓冲信道的发送和接收过程是阻塞的。而对于有缓冲信道,只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

ch := make(chan type, capacity)
有缓冲信道:capacity 应该大于 0
无缓冲信道:capacity为0,或者不设置capacity则容量默认也为 0
  • testChan只能放 一个元素:

   var testChan chan int
   testChan = make(chan int)
   var a int
   a = <- testChan

  • testChan是带缓冲区的chan, 一次可以放10个元素:

     var testChan chan int
     testChan = make(chan int, 10)
     var a int = 10
     testChan <- a

 1 package main
 2 
 3 import (  
 4     "fmt"
 5 )
 6 
 7 func main() {  
 8     chStr := make(chan string, 2)
 9     chStr <- "zhangsan"
10     chStr <- "lisi"
11     fmt.Println(<-chStr)
12     fmt.Println(<-chStr)
13 
14     chInt := make(chan int, 2)
15     chInt <- 10
16     chInt <- 20
17     fmt.Println(<-chInt)
18     fmt.Println(<-chInt)
19 }
带缓冲区的channel
 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func write(ch chan int) {  
 9     for i := 0; i < 5; i++ {
10         ch <- i //当存入第三个数时会阻塞住,直到信道ch里面有数据被取走
11         fmt.Printf("Write %d to ch
", i)
12     }
13     close(ch) //关闭信道ch
14 }
15 func main() {  
16     ch := make(chan int, 2) //信道一次最多存入两个数
17     go write(ch)  
18     time.Sleep(2 * time.Second) //等待
19     for v := range ch {
20         fmt.Printf("read value %d from ch
", v)
21         time.Sleep(time.Second)
22 
23     }
24 }
25 
26 // 执行结果:
27 // Write 0 to ch
28 // Write 1 to ch  //先往信道里面写入两个数,阻塞
29 // read value 0 from ch //取走一个
30 // Write 2 to ch  //立即往信道写入一个数
31 // read value 1 from ch
32 // Write 3 to ch
33 // read value 2 from ch
34 // Write 4 to ch
35 // read value 3 from ch
36 // read value 4 from ch
带缓冲区的chennel2
 1 package main
 2 
 3 import "fmt"
 4 
 5 type student struct {
 6     name string
 7 }
 8 
 9 func main() {
10 
11     var stuChan chan interface{}
12     stuChan = make(chan interface{}, 10)
13 
14     stu := student{name: "stu01"}
15 
16     stuChan <- &stu
17 
18     var stu01 interface{}
19     stu01 = <-stuChan
20 
21     var stu02 *student
22     stu02, ok := stu01.(*student) //stu01转为*student类型
23     if !ok {
24         fmt.Println("can not convert")
25         return
26     }
27 
28     fmt.Println(stu02)
29 }
example
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6     "time"
 7 )
 8 
 9 var wg sync.WaitGroup
10 
11 func consumer(goods chan string) {
12     for i := 0; i < 10; i++ {
13         g, ok := <-goods
14         if !ok {
15             fmt.Println("produce done ", g)
16         }
17         fmt.Println("consumer ", g)
18         time.Sleep(20*time.Millisecond)
19     }
20 
21     wg.Done()
22 }
23 
24 func produce(goods chan string) {
25     for i := 0; i < 10; i++ {
26         g := fmt.Sprintf("baozi%d", i)
27         goods <- g
28         fmt.Println("produce ", g)
29         time.Sleep(10*time.Millisecond)
30     }
31     close(goods) //生产完毕
32 
33     wg.Done()
34 }
35 
36 func main() {
37     var goods chan string
38     goods = make(chan string, 10)
39     
40     wg.Add(2)
41     go produce(goods)
42     go consumer(goods)
43 
44     wg.Wait()
45 }
生产者消费者模型
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func write(ch chan int) {
 9     for i := 0; i < 100; i++ {
10         ch <- i
11         fmt.Println("put data:", i)
12     }
13 }
14 
15 func read(ch chan int) {
16     for {
17         var b int
18         b = <-ch
19         fmt.Println(b)
20         time.Sleep(time.Second)
21     }
22 }
23 
24 func main() {
25     intChan := make(chan int, 10)
26     go write(intChan)
27     go read(intChan)
28 
29     time.Sleep(10 * time.Second)
30 }
read and write channel

      死锁问题:

      从带缓冲区的chennel2例子中我们可以看出,起初向新道里面写入两个数(已满),再写入时只有当信道里面有数据被取走才可以继续写入数据,如果没有数据被取走,则无法向信道里面写入数据,会出现死锁,使程序panic掉,例如

 1 package main
 2 
 3 import (  
 4     "fmt"
 5 )
 6 
 7 func main() {  
 8     ch := make(chan string, 2)
 9     ch <- "zhansan"
10     ch <- "lisi"
11     ch <- "wangwu"
12     fmt.Println(<-ch)
13     fmt.Println(<-ch)
14 }
带缓冲区死锁问题

     长度和容量:

 1 package main
 2 
 3 import (  
 4     "fmt"
 5 )
 6 
 7 func main() {  
 8     ch := make(chan string, 3)
 9     ch <- "zhangsan"
10     ch <- "lisi"
11     fmt.Println("capacity is", cap(ch))  //3
12     fmt.Println("length is", len(ch))  //2
13     fmt.Println("read value", <-ch)  //zhangsan
14     fmt.Println("new length is", len(ch))  //new length is 1
15 }
带缓冲区信道长度和容量测试

    从程序结果可以看出,带缓冲区信道容量是make出来可以存入信道的最大数据量,而信道长度是指信道的实际存入数据量,当从信道中取走数据,则信道长度会减小。

    (6)channel阻塞

        信道的发送与接收默认是阻塞的。
        当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它 Go 协程从信道读取到数据,才会解除阻塞。与此类似,当读取信道的数据时,如果没有其它的协程把数据写入到这个信道,那么读取过程就会一直阻塞着。
        这样的作用?信道的这种特性能够帮助 Go 协程之间进行高效的通信,不需要用到其他编程语言常见的显式锁或条件变量。

 1 package main
 2 
 3 import (  
 4     "fmt"
 5 )
 6 
 7 func go_test(done chan bool) {  
 8     fmt.Println("Hello world")
 9     done <- true //将true写入信道done
10 }
11 func main() {  
12     done := make(chan bool) //定义一个信道,里面只能存档放bool型
13     go go_test(done)
14     <-done  //从信道done中读取并丢弃数据true
15     fmt.Println("main function")
16 }
信道的读取与写入
 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func go_test(done chan bool) {  
 9     fmt.Println("go_test go routine is going to sleep")
10     time.Sleep(4 * time.Second)
11     fmt.Println("go_test go routine awake and going to write to done")
12     done <- true
13 }
14 func main() {  
15     done := make(chan bool)
16     fmt.Println("Main going to call go_test go goroutine")
17     go go_test(done)
18     <-done //此时会阻塞主程序main,等待信道done中有数据写入,如果没有就会出现死锁,程序panic掉
19     fmt.Println("Main received data")
20 }
信道阻塞示例

        执行结果:

        

        计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加?

        实现方法:在一个协程中计算该数的每位数平方和,在另一个协程中计算每位数立方之和,在主程序中等待上面两个协程的计算结果并计算最终结果。

 1 package main 
 2 
 3 import (
 4     "fmt"
 5 )
 6 
 7 //计算num每位平方之和
 8 func calcSquare(num int, squareCh chan int) {
 9     sum := 0
10     for num != 0 {
11         digit := num%10
12         num /= 10
13         sum += digit * digit
14     }
15     fmt.Printf("Sum of squares is %d
", sum)
16     squareCh <- sum
17 }
18 
19 //计算num每位立方之和
20 func calcCube(num int, cubeCh chan int) {
21     sum := 0
22     for num != 0 {
23         digit := num%10
24         num /= 10
25         sum += digit * digit * digit
26     }
27     fmt.Printf("Sum of cubes is %d
", sum)
28     cubeCh <- sum
29 }
30 
31 func main() {
32     var finalSum int = 0
33     squCh := make(chan int, 1)
34     cuCh := make(chan int, 1)
35 
36     num := 123
37 
38     go calcSquare(num, squCh)
39     go calcCube(num, cuCh)
40 
41     squareSum, cubeSum := <-squCh, <-cuCh  //阻塞并获取结果
42     finalSum = squareSum + cubeSum
43     fmt.Printf("result is %d
", finalSum)
44 }
获取某数每位的平方和与立方和之和

    (7)死锁

  • 当 Go 协程给一个信道发送数据时,如果没有其他 Go 协程来接收该信道里面的数据。则程序就会在运行时触发 panic,形成死锁。
  • 同理,当有 Go 协程等着从一个信道接收数据时,如果其他 Go 协程没有向该信道写入数据,则程序就会触发 panic,形成死锁。
1 package main
2 
3 func main() {  
4     ch := make(chan int)
5     ch <- 10
6 }
写入管道没有被读取形成死锁
1 package main
2 
3 func main() {  
4     ch := make(chan int)
5     rch := <-ch
6 }
没有数据写入管道但是去读取形成的死锁

    (8)单向信道

     Go 的信道可以在声明时约束其操作方向,如只发送或是只接收。这种被约束方向的信道被称做单向信道。

单向信道的声明格式
只能发送的信道类型为chan<-,只能接收的信道类型为<-chan,格式如下:
var 信道实例 chan<- 元素类型 // 只能发送信道
var 信道实例 <-chan 元素类型 // 只能接收信道

元素类型:信道包含的元素类型。
信道实例:声明的信道变量。
 1 package main
 2 
 3 import "fmt"
 4 
 5 func sendData(sendch chan<- int) {  
 6     sendch <- 10
 7 }
 8 
 9 func main() {  
10     sendch := make(chan<- int)
11     go sendData(sendch)
12     receiveData := <-sendch //error 企图去接受信道sendch里面的数据
13     fmt.Println(receiveData)
14 }
只能发送的信道
1 package main
2 
3 func main() {
4     ch := make(<-chan int)
5     ch <- 10  //error
6 }
只能接收信道

    (9)信道的转换

        单向信道的作用? 对于信道转换(Channel Conversion),把一个双向信道转换成只能发送信道(send-only)或者接收(Receive Only)信道都是行得通的,但是反过来就不行。

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 //send-only
 9 func sendData(sendch chan<- int) {  
10     sendch <- 10
11 }
12 
13 //Receive Only
14 func receiveData(receivech <-chan int) {  
15     rec := <-receivech
16     fmt.Println(rec)
17 }
18 
19 func main() {  
20     ch := make(chan int) //双向信道
21     go sendData(ch)
22     go receiveData(ch)
23     time.Sleep(time.Second)
24 }
信道转换

    (10)关闭信道和使用 for range 遍历信道

  • 数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。
  • 当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭。
v, ok := <- ch

      注意:上面的语句里,如果成功接收信道所发送的数据,那么 ok 等于 true。而如果 ok 等于 false,说明我们试图读取一个关闭的通道。从关闭的信道读取到的值会是该信道类型的零值。例如,当信道是一个 int 类型的信道时,那么从关闭的信道读取的值将会是 0。

 1 package main
 2 
 3 import (  
 4     "fmt"
 5 )
 6 
 7 func producer(ch chan int) {  
 8     for i := 0; i < 6; i++ {
 9         ch <- i
10     }
11     close(ch)
12 }
13 
14 func main() {  
15     ch := make(chan int)
16     go producer(ch)
17     for {
18         v, ok := <-ch
19         if ok == false {
20             fmt.Println("chnnel closed ", v, ok)  //chnnel closed  0 false
21             break
22         }
23         fmt.Println("Received ", v, ok)
24     }
25 }
close 信道
 1 package main
 2 
 3 import (  
 4     "fmt"
 5 )
 6 
 7 func producer(ch chan int) {  
 8     for i := 0; i < 6; i++ {
 9         ch <- i
10     }
11     close(ch)
12 }
13 
14 func main() {  
15     ch := make(chan int)
16     go producer(ch)
17 
18     for v := range ch {
19         fmt.Println("Received ", v)
20     }
21 }
for range遍历信道

     注意:Go 语言中 range 关键字用于 for 循环中迭代数组(array)、切片(slice)、通道(channel)或集合(map)的元素。在数组和切片中它返回元素的索引和索引对应的值,在集合中返回 key-value 对的 key 值。

     重新实现计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加?

 1 package main 
 2 
 3 import (
 4     "fmt"
 5 )
 6 
 7 func calcDigit(num int, digCh chan int) {
 8     for num != 0 {
 9         digit := num%10
10         digCh <- digit
11         num /= 10
12     }
13     close(digCh)
14 }
15 
16 //计算num每位平方之和
17 func calcSquare(num int, squareCh chan int) {
18     dch := make(chan int, 1)
19     sum := 0
20     go calcDigit(num, dch)
21     for v := range dch {
22         sum += v * v
23     }
24     fmt.Printf("Sum of squares is %d
", sum)
25     squareCh <- sum
26 }
27 
28 //计算num每位立方之和
29 func calcCube(num int, cubeCh chan int) {
30     dch := make(chan int, 1)
31     sum := 0
32     go calcDigit(num, dch)
33     for v := range dch {
34         sum += v * v * v
35     }
36     fmt.Printf("Sum of cubes is %d
", sum)
37     cubeCh <- sum
38 }
39 
40 func main() {
41     var finalSum int = 0
42     squCh := make(chan int, 1)
43     cuCh := make(chan int, 1)
44 
45     num := 123
46 
47     go calcSquare(num, squCh)
48     go calcCube(num, cuCh)
49 
50     squareSum, cubeSum := <-squCh, <-cuCh  //阻塞并获取结果
51     finalSum = squareSum + cubeSum
52     fmt.Printf("result is %d
", finalSum)
53 }
Code

     (11)WaitGroup

          sync 包里的WaitGroup主要用于线程的同步。WaitGroup在WaitGroup这个结构体所实现的方法中,常用的是Add,Done,Wait解释如下:

//A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for.
//Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have
//finished.
//A WaitGroup must not be copied after first use. // Add adds delta, which may be negative, to the WaitGroup counter. // If the counter becomes zero, all goroutines blocked on Wait are released. // If the counter goes negative, Add panics. func (wg *WaitGroup) Add(delta int) // Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) } // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait()

       WaitGroup 使用计数器来工作(WaitGoup.Add(i))。当我们调用 WaitGroup 的 Add 并传递一个 int 时,WaitGroup 的计数器会加上 Add 的传参。要减少计数器,可以调用 WaitGroup 的 Done() 方法(WaitGroup.Done())。Wait() 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。

 1 package main 
 2 
 3 import (
 4     "fmt"
 5     "time"
 6     "sync"
 7 )
 8 
 9 func go_test(num int, wg *sync.WaitGroup) {
10     fmt.Printf("Start goroutine %d
", num)
11     time.Sleep(time.Second)
12     fmt.Printf("end goroutine %d
", num)
13     wg.Done()
14 }
15 
16 func main() {
17     var wg sync.WaitGroup
18     for i := 0; i < 10; i++ {
19         wg.Add(1)
20         go go_test(i, &wg)
21     }
22     wg.Wait() //等待所有协程执行结束
23 
24     fmt.Println("main finished")
25 }
WaitGroup示例

     (12)工作池的实现

           缓冲信道的重要应用之一就是实现工作池。
           一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
           下面看两个例子(重点):
            例1: 计算在10000个数里面有多少个素数?
            思路:1)设置一个输入型缓冲信道 intChan 存放这10000个数,每一个数相当于一个任务,10000个数就有一万个任务;
                       2)起多个协程来计算,比如100个,则每个协程相当于处理10000/100=100个任务,并设置一个信道 exitChan用来标记哪个协程处理结束;
                       3)设置一个输出型缓冲信道 resultChan 将 2)的计算结果存入到该信道中。

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func calc(taskChan chan int, resChan chan int, exitChan chan bool) {
 9     for v := range taskChan {
10         flag := true
11         for i := 2; i < v; i++ {
12             if v%i == 0 {
13                 flag = false
14                 break
15             }
16         }
17 
18         if flag {
19             resChan <- v
20         }
21     }
22 
23     fmt.Println("exit")
24     exitChan <- true
25 }
26 
27 func main() {
28     startTime := time.Now()  //开始时间
29 
30     intChan := make(chan int, 10000)
31     resultChan := make(chan int, 10000)
32     exitChan := make(chan bool, 10)
33 
34     go func() {
35         for i := 0; i < 10000; i++ {
36             intChan <- i
37         }
38 
39         close(intChan)
40     }()
41 
42     for i := 0; i < 10; i++ {
43         go calc(intChan, resultChan, exitChan)
44     }
45 
46     //等待所有计算的goroutine全部退出
47     go func() {
48         for i := 0; i < 10; i++ {
49             <-exitChan
50             fmt.Println("wait goroutine ", i, " exited")
51         }
52         close(resultChan)
53     }()
54 
55     for v := range resultChan {
56         fmt.Println(v)
57     }
58 
59     endTime := time.Now() //结束时间
60     diff := endTime.Sub(startTime)  //耗时
61     fmt.Println("total time taken ", diff.Seconds(), "seconds")
62 }
计算素数示例

           例2:计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)
           思路如下:
           1)创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道;
           2)将作业添加到该输入型缓冲信道中;
           3)作业完成后,再将结果写入一个输出型缓冲信道;
           4)从输出型缓冲信道读取并打印结果。

 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "math/rand"
 6     "sync"
 7     "time"
 8 )
 9 
10 //任务
11 type Job struct {  
12     id       int
13     randomno int
14 }
15 
16 //结果
17 type Result struct {  
18     job         Job
19     sumofdigits int
20 }
21 
22 var jobs = make(chan Job, 10)  //输入型缓冲信道
23 var results = make(chan Result, 10)  //输出型缓冲信道
24 
25 //计算随机数每位之和
26 func digits(number int) int {  
27     sum := 0
28     no := number
29     for no != 0 {
30         digit := no % 10
31         sum += digit
32         no /= 10
33     }
34     time.Sleep(2 * time.Second)
35     return sum
36 }
37 
38 //工作者
39 func worker(wg *sync.WaitGroup) {  
40     for job := range jobs {
41         output := Result{job, digits(job.randomno)}
42         results <- output
43     }
44     wg.Done()
45 }
46 
47 //创建工作池
48 func createWorkerPool(noOfWorkers int) {  
49     var wg sync.WaitGroup
50     for i := 0; i < noOfWorkers; i++ {
51         wg.Add(1)
52         go worker(&wg)
53     }
54     wg.Wait()  //等待上面起的所有协程计算结束
55     close(results) //计算结束,关闭信道results
56 }
57 
58 //分配任务
59 func allocate(noOfJobs int) {  
60     for i := 0; i < noOfJobs; i++ {
61         randomno := rand.Intn(999)
62         job := Job{i, randomno}
63         jobs <- job
64     }
65     close(jobs) //分配完成,关闭信道
66 }
67 
68 //遍历结果
69 func result(done chan bool) {  
70     for result := range results {
71         fmt.Printf("Job id %d, input random no %d , sum of digits %d
", result.job.id, result.job.randomno, result.sumofdigits)
72     }
73     done <- true
74 }
75 
76 func main() {  
77     startTime := time.Now()
78 
79     noOfJobs := 100
80     go allocate(noOfJobs)
81 
82     done := make(chan bool)
83     go result(done)
84     noOfWorkers := 10
85     createWorkerPool(noOfWorkers)
86     <-done //等待取到所有结果完成
87     
88     endTime := time.Now()
89     diff := endTime.Sub(startTime) //总共耗时
90     fmt.Println("total time taken ", diff.Seconds(), "seconds")
91 }
计算随机数各位之和

     (13)select

           select 语句用于在多个发送/接收信道操作中进行选择。select 语句会一直阻塞,直到发送/接收操作准备就绪。如果有多个信道操作准备完毕,select 会随机地选取其中之一执行。该语法与 switch 类似,所不同的是,这里的每个 case 语句都是信道操作。

           以下描述了 select 语句的语法:

  • 每个case都必须是一个信道
  • 所有channel表达式都会被求值
  • 所有被发送的表达式都会被求值
  • 如果任意某个通信可以进行,它就执行;其他被忽略。
  • 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。否则: 如果有default子句,则执行该语句。如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func server1(ch chan string) {  
 9     time.Sleep(6 * time.Second)
10     ch <- "from server1"
11 }
12 
13 func server2(ch chan string) {  
14     time.Sleep(3 * time.Second)
15     ch <- "from server2"
16 }
17 
18 func main() {  
19     output1 := make(chan string)
20     output2 := make(chan string)
21     go server1(output1)
22     go server2(output2)
23     select {
24     case s1 := <-output1:
25         fmt.Println(s1)
26     case s2 := <-output2:
27         fmt.Println(s2)
28     }
29 }
select example1

       select语法的最后一条作用:在没有 case 准备就绪时,可以执行 select 语句中的默认情况(Default Case)。这通常用于防止 select 语句一直阻塞。

 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func process(ch chan string) {  
 9     time.Sleep(10500 * time.Millisecond)
10     ch <- "process successful"
11 }
12 
13 func main() {  
14     ch := make(chan string)
15     go process(ch)
16     for {
17         time.Sleep(1000 * time.Millisecond)
18         select {
19         case v := <-ch:
20             fmt.Println("received value: ", v)
21             return
22         default:
23             fmt.Println("no value received")
24         }
25     }
26 
27 }
select default

      死锁:

      当select没有匹配到任何一个读取信道的结果,此时要是没有default,则select 语句会一直阻塞,从而发生死锁,导致程序panic掉。

1 package main
2 
3 func main() {  
4     ch := make(chan string)
5     select {
6     case <-ch:
7     }
8 }
缺少default 导致死锁问题

      对于上面问题,如果加上默认情况,则不会出现死锁,会去执行default。select 只含有值为 nil 的信道(比如声明未初始化的信道),也同样会执行默认情况。

 1 package main
 2 
 3 import "fmt"
 4 
 5 func main() {  
 6     ch := make(chan string)
 7     select {
 8     case <-ch:
 9     default:
10         fmt.Println("excute default")
11     }
12 }
执行default
 1 package main
 2 
 3 import "fmt"
 4 
 5 func main() {  
 6     var ch chan string
 7     select {
 8     case v := <-ch:
 9         fmt.Println("received value", v)
10     default:
11         fmt.Println("default case executed")
12 
13     }
14 }
nil default

      如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。

 1 package main
 2 
 3 import (  
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func server1(ch chan string) {  
 9     ch <- "from server1"
10 }
11 
12 func server2(ch chan string) {  
13     ch <- "from server2"
14 }
15 
16 func main() {  
17     output1 := make(chan string)
18     output2 := make(chan string)
19     go server1(output1)
20     go server2(output2)
21     time.Sleep(time.Second) //等待上面两个协程执行结束,下面随机执行
22     select {
23     case s1 := <-output1:
24         fmt.Println(s1)
25     case s2 := <-output2:
26         fmt.Println(s2)
27     }
28 }
随机执行select case

      空select:

      执行空select会出现死锁。

1 package main
2 
3 func main() {  
4     select {}
5 }
空 select

      (14)定时器

  •  定时器的使用
func NewTicker(d Duration) *Ticker
官网对NewTicker的解释:
NewTicker returns a new Ticker containing a channel that will send the time with a period specified by the duration argument. 
It adjusts the intervals or drops ticks to make up for slow receivers. The duration d must be greater than zero; if not, NewTicker
will panic. Stop the ticker to release associated resources.
 1 package main
 2 import (
 3     "fmt"
 4     "time"
 5 )
 6 
 7 func main() {
 8     t := time.NewTicker(time.Second)  //定时器设置为 1s
 9     for v := range t.C {
10         fmt.Println("hello, ", v)
11     }
12 }
example1
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func main() {
 9     ticker := time.NewTicker(time.Second)
10     defer ticker.Stop() //程序结束关闭定时器
11     done := make(chan bool)
12     go func() {
13         time.Sleep(10 * time.Second)
14         done <- true
15     }()
16     for {
17         select {
18         case <-done:
19             fmt.Println("Done!")
20             return
21         case t := <-ticker.C:
22             fmt.Println("Current time: ", t)
23         }
24     }
25 }
官网例子
func (t *Ticker) Stop()
官网解释:
Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine 
reading
from the channel from seeing an erroneous "tick".
  • 一次定时器
func (t Time) After(u Time) bool
After reports whether the time instant t is after u.
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 
 9 func main() {
10     select {
11     case <- time.After(2*time.Second):
12         fmt.Println("after")
13     }
14 }
example
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func main() {
 9     year2000 := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
10     fmt.Println(year2000)
11 
12     year3000 := time.Date(3000, 1, 1, 0, 0, 0, 0, time.UTC)
13     fmt.Println(year3000)
14 
15     isYear3000AfterYear2000 := year3000.After(year2000) // True
16     isYear2000AfterYear3000 := year2000.After(year3000) // False
17 
18     fmt.Printf("year3000.After(year2000) = %v
", isYear3000AfterYear2000)
19     fmt.Printf("year2000.After(year3000) = %v
", isYear2000AfterYear3000)
20 
21 }
官网例子
  • 超时控制

       例子是模拟一个数据库操作,如果在1s内未返回结果,则超时退出。

 1 package main
 2 import (
 3     "fmt"
 4     "time"
 5 )
 6 func queryDb(ch chan int) {
 7     time.Sleep(time.Millisecond) //模拟数据库操作耗时
 8     ch <- 100
 9 }
10 func main() {
11     ch := make(chan int)
12 
13     go queryDb(ch)
14     t := time.NewTicker(time.Second)  //设置超时时间,如果1s内未返回结果则超时退出
15     defer t.Stop() //程序结束关闭定时器
16 
17     select {
18         case v := <-ch:
19         fmt.Println("result", v)
20         case <-t.C:
21         fmt.Println("timeout")
22     }
23 }
超时控制
  • 信号处理
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "os"
 6     "os/signal"
 7     "sync"
 8     "syscall"
 9 )
10 
11 var waitGroup sync.WaitGroup
12 
13 func produce(ch chan<- string, exitChan chan bool) {
14 
15     var i int
16     var exit bool
17     for {
18         str := fmt.Sprintf("hello %d", i)
19         select { //select检测哪个管道可写或者可读
20         case ch <- str:
21         case exit = <-exitChan:
22         }
23         if exit {
24             fmt.Printf("user notify produce exited
")
25             break
26         }
27     }
28     close(ch)
29     waitGroup.Done()
30 }
31 
32 func consume(ch <-chan string) {
33 
34     for {
35         str, ok := <-ch
36         if !ok {
37             fmt.Printf("ch is closed")
38             break
39         }
40         fmt.Printf("value:%s
", str)
41     }
42     waitGroup.Done()
43 }
44 
45 func main() {
46     // 在shell终端输入 kill -SIGUSR2 ID   给程序输入终止信号
47     var ch chan string = make(chan string)
48     var exitChan chan bool = make(chan bool, 1)
49     var sinalChan chan os.Signal = make(chan os.Signal, 1)
50     waitGroup.Add(2)
51     signal.Notify(sinalChan, syscall.SIGUSR2)
52     go produce(ch, exitChan)
53     go consume(ch)
54 
55     <-sinalChan //读取然丢弃
56     exitChan <- true
57     waitGroup.Wait()
58 
59 }
信号处理

3. 单元测试

      go test命令是一个按照一定的约定和组织的测试代码的驱动程序。 在包目录内, 所有以_test.go为后缀名的源文件并不是go build构建包的一部分, 它们是go test测试的一部分。
      在*_test.go文件中, 有三种类型的函数: 测试函数、 基准测试函数、 示例函数。 一个测试函数是以Test为函数名前缀的函数, 用于测试程序的一些逻辑行为是否正确; go test命令会调用这些测试函数并报告测试结果是PASS或FAIL。本次只了解测试函数,后面会专门写博客学习其他类型的函数。
      go test命令会遍历所有的*_test.go文件中符合上述命名规则的函数, 然后生成一个临时的main包用于调用相应的测试函数, 然后构建并运行、 报告测试结果, 最后清理测试中生成的临时文件。

     1)文件名必须以_test.go结尾
     2)使用go test -v执行单元测试

1 package main
2 
3 func add(a, b int) int {
4     return a + b
5 }
6 
7 func sub(a, b int) int {
8     return a - b
9 }
calc.go
 1 package main
 2 
 3 import (
 4     "testing"
 5 )
 6 
 7 func TestAdd(t *testing.T) {
 8     r := add(2, 4)
 9     if r != 6 {
10         t.Fatalf("add(2, 4) error, expect:%d, actual:%d", 6, r)
11     }
12     t.Logf("test add succ")
13 }
14 
15 func TestSub(t *testing.T) {
16     r := sub(2, 4)
17     if r != -2 {
18         t.Fatalf("sub(2, 4) error, expect:%d, actual:%d", -2, r)
19     }
20     t.Logf("test sub succ")
21 }
calc_test.go
 1 package main
 2 
 3 import (
 4     "encoding/json"
 5     "io/ioutil"
 6 )
 7 
 8 type student struct {
 9     Name string
10     Sex  string
11     Age  int
12 }
13 
14 func (p *student) Save() (err error) {
15     data, err := json.Marshal(p)
16     if err != nil {
17         return
18     }
19 
20     err = ioutil.WriteFile("C:/stu.dat", data, 0755)
21     return
22 }
23 
24 func (p *student) Load() (err error) {
25 
26     data, err := ioutil.ReadFile("C:/stu.dat")
27     if err != nil {
28         return
29     }
30 
31     err = json.Unmarshal(data, p)
32     return
33 }
student.go
 1 package main
 2 
 3 import "testing"
 4 import "time"
 5 
 6 func TestSave(t *testing.T) {
 7     stu := &student{
 8         Name: "stu01",
 9         Sex:  "man",
10         Age:  10,
11     }
12 
13     err := stu.Save()
14     if err != nil {
15         t.Fatalf("save student failed, err:%v", err)
16     }
17 
18 }
19 
20 func TestLoad(t *testing.T) {
21 
22     stu := &student{
23         Name: "stu01",
24         Sex:  "man",
25         Age:  10,
26     }
27     err := stu.Save()
28     if err != nil {
29         t.Fatalf("save student failed, err:%v", err)
30     }
31     stu2 := &student{}
32     time.Sleep(10 * time.Second)
33     err = stu2.Load()
34     if err != nil {
35         t.Fatalf("load student failed, err:%v", err)
36     }
37     if stu.Name != stu2.Name {
38         t.Fatalf("load student failed, name not equal")
39     }
40     if stu.Sex != stu2.Sex {
41         t.Fatalf("load student failed, Sex not equal")
42     }
43     if stu.Age != stu2.Age {
44         t.Fatalf("load student failed, Age not equal")
45     }
46 }
student_test.go

    将四个文件放到同一个目录下,执行go test -v,执行结果如下:

参考文献:

  • https://golang.org/pkg/time/
  • https://studygolang.com/articles/12522 (select)
  • https://studygolang.com/articles/12402 (channel)
  • https://studygolang.com/articles/12342 (Go 协程)
  • https://studygolang.com/articles/12341 (并发入门)
  • https://www.cnblogs.com/domestique/p/8410313.html
原文地址:https://www.cnblogs.com/xuejiale/p/10424131.html