Go part 8 并发编程,goroutine, channel

并发

并发是指的多任务,并发编程含义比较广泛,包含多线程、多进程及分布式程序,这里记录的并发是属于多线程编程

Go 从语言层面上支持了并发的特性,通过 goroutine 来完成,goroutine 类似于线程,可以根据需要来创建多个 goroutine 来并发工作

goroutine 是在运行时调度完成,而线程是由操作系统调度完成

Go 还提供 channel 在多个 goroutine 间进行通信,goroutine 和 channel 是 Go 秉承 CSP(Communicating Sequential Process)并发模式的重要实现基础

goroutine(轻量级线程)

使用者分配足够多的任务,系统能自动的把任务分配到 CPU 上,让这些任务尽量并发运作,这种机制在 Go 中被称为 goroutine

goroutine 的概念类似于线程,Go 程序会自动的将 goroutine 的任务合理的分配给每个 CPU

Go 程序从 main 包的 main() 函数开始,在程序启动时,就会为 main() 函数创建一个默认的 goroutine

创建 goroutine

为一个普通函数创建 goroutine 的格式:

被调函数的返回值会被忽略

go 函数名( 参数列表 )

demo:使用 go 关键字为普通函数、匿名函数、闭包函数创建累加器的 goroutine(一个 goroutine 必定对应一个函数)

package main
import (
	"fmt"
	"time"
)

func accumulator(num int){
	for {
		num ++
	    time.Sleep(time.Second)
		fmt.Println(num)
	}
}

func closureAccumulator(num int) func() {
	return func(){
		for {
			num ++
			time.Sleep(time.Second)
			fmt.Printf("闭包函数:%v
", num)
		}
	}
}

func main(){
	//并发
	go accumulator(0)

	//匿名函数实现并发
	go func() {
		var num int
		for {
			num ++
			time.Sleep(time.Second)
			fmt.Printf("匿名函数:%v
", num)
		}
	}()

	//闭包实现并发
	go closureAccumulator(0)()

	//不让 main 包中 goroutine 停止
	for {time.Sleep(time.Second)}
}

运行结果:
1
匿名函数:1
闭包函数:1
闭包函数:2
匿名函数:2
2
闭包函数:3
匿名函数:3
3
...

  

调整并发的运行性能

在 Go 程序运行时(runtime)实现了一个小型的任务调度器,这套调度器的工作原理类似于操作系统调度线程

Go 程序可以高效的将 CPU 资源分配给每一个任务,传统逻辑中,开发者需要维护线程池中线程与CPU核心数量的关系,同样,Go 中也可以通过 runtime.GOMAXPROCS() 函数做到

runtime.GOMAXPROCS(逻辑CPU数量)

这里的逻辑CPU数量可以有如下几种数值:
<1:不修改任何数值
=1:单核心执行
>1:多核并发执行

一般情况下,可以使用 runtime.NumCPU() 查询 CPU 的数量,并使用runtime.GOMAXPROCS() 函数进行设置,例如:

runtime.GOMAXPROCS(runtime.NumCPU())

并行与并发的区别

在说并发概念时,总会涉及另外一个概念并行, 下面解释下并发和并行之间的区别

  • 并发(concurrency):把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行(做一会数学,然后做一会语文,然后都做好了)
  • 并行(parallelism):把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行(眼睛看着屏幕,手指敲键盘,这个过程是并行的)

在 GOMAXPROCS 数量与任务数量相等时,可以做到并行执行,但一般情况下都是并发执行

管道(Chan)

单纯的函数并发执行是没有意义的,函数与函数间需要交换数据才能体现并发执行函数的意义,虽然可以使用共享内存进行数据交换,但当多个 goroutine 共存的情况下容易发生竞态问题,为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题

Go 语言提倡使用通信的方式代替共享内存,这里的通信方法就是使用管道(channel),channel 就是一种队列一样的结构,如下图所示:

管道的特性

goroutine 之间通过管道就可以通信,在任何时候,同时只能有一个 goroutine 访问管道进行发送和获取数据

管道像一个传送带或者队列,遵循先进先出(first in first out)的规则,保证收发数据的顺序

创建管道

ch1 := make(chan int)                 // 创建一个整型类型的通道
ch2 := make(chan interface{})         // 创建一个空接口类型的通道, 可以存放任意格式
type Equip struct{ /* 一些字段 */ }
ch3 := make(chan *Equip)             // 创建Equip指针类型的通道, 可以存放*Equip

  

使用管道发送和接收数据

1)发送数据

// 创建一个空接口通道
ch := make(chan interface{})
// 将0放入通道中
ch <- 0
// 将hello字符串放入通道中
ch <- "hello"

把数据往通道中发送时,如果没有 goroutine 进行接收,那么发送会持续阻塞

Go 程序运行时会智能的发现永远无法发送成功的语句,并做出提示 fatal error: all goroutines are asleep - deadlock!

也就是说所有的 goroutine 中的 channel 并没有形成发送和接收对应的代码

2)接收数据

管道的收发操作在不同的两个 goroutine 间进行(就像有生产者就必须有消费者一样),每次只能接收一个元素(类似于往队列里面放数据,然后另一方进行消费)

阻塞接收数据

data := <-ch

demo:

func main(){
	var ch chan int = make(chan int)
	go func (){
		ch <- 1
	}()

	data := <- ch
	fmt.Println(data)
}

非阻塞接收数据

data, ok := <-ch

demo:

func main(){
	var ch chan int = make(chan int)
	go func (){
		ch <- 1
	}()

	data, ok := <- ch
	fmt.Println(ok, data)
}

运行结果:
true 1

接收任意数据,忽略接收数据

<-ch

demo:

func main(){
	var ch chan int = make(chan int)
	go func (){
		ch <- 1
	}()

	<- ch
}

循环接收

通过 for range 语句进行多个元素的接收操作:

for data := range ch {
}

demo:

package main
import "fmt"

func creater(ch chan int){
	for i:=0; i<=10; i++ {
		ch <- i
	}
}

func main(){
	var ch chan int = make(chan int)
	go creater(ch)
	for data := range ch{
		fmt.Print(data)
		if data == 10 {
			break
		}

	}
}

运行结果:
012345678910

 

3)并发打印的例子

demo:main 中的 goroutine 往 chan 中放数据,开启另外一个 goroutine 往文件中写数据,文件写入完成之后通知 main 中的 goroutine,最后 main 中的 goroutine 打印 写入完成

package main
import (
	"fmt"
)

func printer(ch chan int){
	for data := range ch{
		if data == 0 {
			break
		}
		fmt.Println("假装写入到文件,数据是:", data)
	}
	//返回数据输入端,打印完了
	fmt.Println("写入完了哈")
	ch <- 1
}

func main(){
    var ch chan int = make(chan  int)
    go printer(ch)
    //输送数据
    for i:=3; i>=0; i-- {
    	ch <- i
	}

    //接收任意一个数据,如果接收到,表示写入完成
	<- ch
    fmt.Println("收到了,write complete")
}

运行结果:
假装写入到文件,数据是: 3
假装写入到文件,数据是: 2
假装写入到文件,数据是: 1
写入完了哈
收到了,write complete

  

管道中的单行道

可以在声明的时候约束其操作方向,如 只生产,只消费,这种被约束方向的通道称为单向通道

单向通道有利于代码接口的严谨性

单向通道的定义:

1)只生产(消费的时候会报错)

func main (){
	var chWriteOnly chan<- string = make(chan<- string)

	go func() {
		chWriteOnly <- "hello world ~"
	}()

	fmt.Println(<- chWriteOnly)
}

运行结果:
invalid operation: <-chWriteOnly (receive from send-only type chan<- string)

  

2)只消费(生产的时候会报错)

func main (){
	var chReadOnly <-chan string = make(<-chan string)

	go func() {
		chReadOnly <- "hello world ~"
	}()

	fmt.Println(<- chReadOnly)
}

运行结果:
invalid operation: chReadOnly <- "hello world ~" (send to receive-only type <-chan string)

定义一个不能生产,只能消费的 chan 是毫无意义的

3)time包中的单向通道

time 包中的计时器会返回一个 timer 实例,代码如下:

timer := time.NewTimer(time.Second)

timer 的 Timer 类型定义如下:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

C 通道的类型就是一种只能接收的单向通道。如果此处不进行通道方向约束,一旦外部向通道发送数据,将会造成其他使用到计时器的地方逻辑产生混乱

因此,单向通道有利于代码接口的严谨性

带缓冲的管道

带缓冲管道和无缓冲管道在特性上是相同的,无缓冲管道可以看作是长度为 0 的缓冲管道

为管道增加一个有限大小的存储空间形成带缓冲的管道,在写入时无需等待获取方接收即可完成发送过程,并不会阻塞,只有当存储空间满时才会阻塞;同理,如果管道中有数据,接收时将不会发生阻塞,直到通道中没有数据时,通道才会阻塞

无缓冲管道是 保证收发过程同步,类似于快递员给你电话让你下楼取快递,整个递交快递的过程是同步发生的,你和快递员不见不散,但这样做快递员就必须等待所有人下楼取快递才能完成所有投递工作;

带缓冲的管道,异步收发过程,类似于快递员将快递放入快递柜,通知用户来取,效率可以有明显的提升

1)创建带缓冲的管道(类似于定义队列的长度

func main(){
	var ch chan string = make(chan string, 3)
	ch <- "hello"
	ch <- "how are you"
	ch <- "how do you do"
	//打印管道的长度
	fmt.Println(len(ch))
}

运行结果:
3

  

2)阻塞条件

  • 被生产填满时,尝试再次生产数据会发生阻塞
  • 管道为空时,尝试消费数据会发生阻塞

为什么要限制管道的长度,而不提供无限长度的管道?

channel 是在两个 goroutine 间的通信,使用 goroutine 的代码必然有一方生产数据,一方消费数据。当生产数据一方的数据供给速度大于消费方的数据处理速度时,如果通道不限制长度,那么内存将不断膨胀直到应用崩溃,因此生产者和消费者需要达到一个平衡

管道的多路复用(同时生产和消费多个管道的数据)

多路复用是通信和网络中的专业术语,通常表示在一个信道上传输多路信号或数据流的过程和技术

比如电话就是一种多路复用的设备,可以在说话的同时听到对方讲话,一条信道上可以同时接收和发送数据,同样的,网线、光纤也都是基于多路复用模式来设计的,网线、光纤不仅支持同时收发数据,还支持多个人同时收发数据

使用管道时,想同时接收多个管道的数据是一件困难的事情,管道在接收数据时,如果没有数据消费就会发生阻塞,虽然可以使用轮询的方式来处理,但运行性能会非常差

for{
    // 尝试接收ch1通道
    data, ok := <-ch1
    // 尝试接收ch2通道
    data, ok := <-ch2
    // 接收后续通道
    …
}

  

Go 中提供了 select 关键字(类似于 nginx 中事件通知的机制),可以同时响应多个管道的操作,select 的每个 case 都对应一个管道的收发过程,当收发完成时,就会触发 case 中响应的语句,多次收发操作在 select 中挑选一个进行响应

select 多路复用中可以接收的样式
操   作语句示例
接收任意数据 case <- ch;
接收变量 case d :=  <- ch;
发送数据 case ch <- 100;

 demo:这里还有点疑问?

package main
import "fmt"

func main() {

	var ch1 chan int = make(chan int, 6)
	var ch2 chan string = make(chan string, 6)
	ch1 <- 100
	ch1 <- 200
	ch2 <- "hello world"

	select {
	case ch1 <- 100:
		fmt.Println("111")

	case strData := <-ch2:
		fmt.Println(strData)

	default:
		fmt.Println("do nothing ")
	}
}

运行结果:
111  或  hello world

  

end ~

每天都要遇到更好的自己.
原文地址:https://www.cnblogs.com/kaichenkai/p/11075722.html