Go语言中的并发操作

并发与并行

并发:同一时间段内,执行多个任务(你在用微信和两个女朋友聊天)

并行:同一时刻,执行多个任务(你和你朋友都在用微信和女朋友聊天)

Go语言中的并发通过goroutine实现。goroutine类似于线程,属于用户态线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。

Go语言还提供channel在多个goroutine间进行通信。goroutinechannel是Go语言秉承的CSP(Communicating Sequential Process)并发模式的重要实现基础。

Goroutine

goroutine的概念类似于线程,但goroutine是由Go运行时(runtime)调度和管理的。Go程序会智能的将goroutine中的任务合理地分配给每个CPU。

在Go语言编程中,不需要自己去写进程,线程,协程,当需要让某个任务并发执行的时候,只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就行。

使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

启动单个goroutine

启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

func hello(){
    fmt.Println("hello Goroutine")
}

func main(){
    hello()
    fmt.Println("this is a main goroutine")
}

这个示例中hello()函数和下面的语句是串行的,执行的结果是打印完hello Goroutine后打印this is a main goroutine

接着我们在调用hello()函数前面加上关键字go,也就是启动一个goroutine去执行hello()函数。

func hello(){
    fmt.Println("hello Goroutine")
}

func main(){
    go hello()
    fmt.Println("this is a main goroutine")
}

这一次的执行结果只打印了this is a main goroutine,并没有打印hello Goroutine。为什么呢?

在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束。

所以我们要想办法让main()函数等一等hello()函数,最简单粗暴的方式就是time.Sleep了。

示例1:

func hello(){
    fmt.Println("hello Goroutine")
}

func main(){
    go hello()
    fmt.Println("this is a main goroutine")
    time.Sleep(time.Second)
}

示例2:

func newTask(){
    for{
	fmt.Println("我是子goroutine")
	time.Sleep(time.Second)
    }
}

func main() {
	go newTask()

	// 主goroutine赖着不死。如果主goroutine死了,子goroutine也得陪葬
	for{
	    fmt.Println("this is a main goroutine")
	    time.Sleep(time.Second)
	}
}
出让资源

通过runtime.Gosched()出让资源,让其他goroutine优先执行。

func main() {
	go func(){
		for i := 0; i < 5; i++{
			fmt.Println("go")
		}
	}()
	for i:=0;i<2;i++{
        // 让出时间片,让别的goroutine先执行
		runtime.Gosched()
		fmt.Println("hello")
	}
}

通过执行上面的代码,我们会发现先会打印5次go,接着打印两次hello

自杀

通过runtime.Goexit()实现自杀,自杀前会执行提前定义的defer语句,同时调用它的goroutine也会跟着自杀。

func test(){
	// 遗嘱:临终前说的话
	defer fmt.Println("这是test的遗嘱")
	// 自杀,触发提前执行遗嘱,暴毙,后面的日志不好过了,调用它的goroutine也暴毙
	runtime.Goexit()
	// 自杀了,后面的日子不好过了
	fmt.Println("生活承诺的很多美好事情。。。(不会打印)")
}

func wildMan(){
	for i:=0;i<6;i++{
		fmt.Println("我是野人,我不喜欢约束,我讨厌制约我的主goroutine")
		time.Sleep(time.Second)
	}
}

func main() {
	// 一个会暴毙的goroutine
	go func(){
		fmt.Println("这里包含一个会暴毙的goroutine")
		test()  // runtime.Goexit()
		fmt.Println("这句应该不能出现")
	}()

	// 一个讨厌主goroutine的野人goroutine,主goroutine结束后,会把它一起带走
	go wildMan()
	for i:=0;i<=3;i++{
		time.Sleep(time.Second)
	}
}

执行结果:

这里包含一个会暴毙的goroutine
我是野人,我不喜欢约束,我讨厌制约我的主goroutine
这是test的遗嘱
我是野人,我不喜欢约束,我讨厌制约我的主goroutine
我是野人,我不喜欢约束,我讨厌制约我的主goroutine
我是野人,我不喜欢约束,我讨厌制约我的主goroutine

我们发现,主goroutine结束后,会带走未结束的子goroutine

同时如果主goroutine暴毙,会令所有的子goroutine失去牵制,等所有的子goroutine都结束后,程序会崩溃:fatal error: no goroutines (main called runtime.Goexit) - deadlock!

启动多个Goroutine

上面的代码中有演示主goroutine退出,子goroutine也会退出的场景,那么有没有什么办法让所有的goroutine都执行呢?

这里我们使用sync.WaitGroup来实现goroutine的同步

示例一:

var wg sync.WaitGroup

func hello(i int){
	defer wg.Done()  // goroutine结束就登记-1
	fmt.Println("hello Goroutine", i)
}

func main() {
	for i:=0;i<10;i++{
		wg.Add(1)  // 启动一个goroutine就登记加1
		go hello(i)
	}
	wg.Wait()  // 等待所有登记的goroutine都结束
}

多次执行上面代码,会发现每次打印的数字顺序都不一致,这是因为10个goroutine是并发执行的,而goroutine的调度是随机的。

示例二:

var wg sync.WaitGroup

func test(){
	defer wg.Done()
	// 遗嘱:临终前说的话
	defer fmt.Println("这是test的遗嘱")
	// 自杀,触发提前执行遗嘱,暴毙,后面的日志不好过了,调用它的goroutine也暴毙
	runtime.Goexit()
	// 自杀了,后面的日子不好过了
	fmt.Println("生活承诺的很多美好事情。。。(不会打印)")
}

func wildMan(){
	defer wg.Done()
	for i:=0;i<6;i++{
		fmt.Println("我是野人,我不喜欢约束,我讨厌制约我的主goroutine")
		time.Sleep(time.Second)
	}
}

func main() {
	wg.Add(2)
	// 一个会暴毙的goroutine
	go func(){
		fmt.Println("这里包含一个会暴毙的goroutine")
		test()  // runtime.Goexit()
		fmt.Println("这句应该不能出现")
	}()

	// 一个讨厌主goroutine的野人goroutine,主goroutine结束后,会把它一起带走
	go wildMan()
	for i:=0;i<=3;i++{
		time.Sleep(time.Second)
	}
	//runtime.Goexit()
	fmt.Println("主goroutine正常退出,会带走所有的子goroutine")
	wg.Wait()  // 等待所有登记的goroutine都结束
}

Goroutine与线程

可增长的栈

os线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可达到1GB,虽然极少会用到这么大,所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

G:就是goroutine,里面除了存放本goroutine信息外,还有与所在P的绑定信息。

p:管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址以及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等),当自己的队列消费完了就去全局队列里取,如果全局队列也消费完了,会去其他P的队列里抢任务。

M:machine是Go运行时(runtime)对操作系统内核线程的虚拟,M与内核线程一般都是一一映射关系,一个goroutine最终是要放到M上执行的。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

但从线程调度讲,Go语言相比其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的。这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。其一大特点是goroutine的调度在用户态下完成的。不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池,不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多,另一方面充分利用了多核硬件资源,近似的把若干goroutine均分在物理线程上,再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本之前,默认使用的是单核心执行,Go1.5版本之后,默认使用全部的CPU逻辑核心数。

示例1:

func a(){
	for i:=1;i<10;i++{
		fmt.Println("A:", i)
	}
}

func b(){
	for i:=1;i<10;i++{
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(1)
	go a()
	go b()
	time.Sleep(time.Second)
}

两个任务只有一个逻辑核心,此时完全是做完一个任务后再做另外一个任务。

示例2:

func a(){
	for i:=1;i<10;i++{
		fmt.Println("A:", i)
	}
}

func b(){
	for i:=1;i<10;i++{
		fmt.Println("B:", i)
	}
}

func main() {
	runtime.GOMAXPROCS(2)
	go a()
	go b()
	time.Sleep(time.Second)
}

将逻辑核心数设为2,此时两个任务并发执行。(可用内核越多,并发质量越高)

func main() {
	// 把可用的最大逻辑CPU核心数设为1,返回先前的设置
	previousMaxProcs := runtime.GOMAXPROCS(1)

	// 获得逻辑CPU核心数
	cpu_num := runtime.NumCPU()
	fmt.Println("cpu_num = ", cpu_num)  // 4
	fmt.Println("previousMaxProcs = ", previousMaxProcs)  // 4

	//for{
	//	// 主goroutine打0,子goroutine打1
	//	go fmt.Println(1)
	//	fmt.Println(0)
	//}
}

Go语言中的操作系统线程和goroutine的关系:

* 一个操作系统线程对应用户态多个`goroutine`
* Go程序可以同时使用多个操作系统线程
* `goroutine`和OS线程是多对多的关系,即m:n

channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竟态问题,为了保证数据交换的正确性,必须使用胡吃两堆内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型师CSP(Communicating Sequential Processes),提倡通过通信共享内存,而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channe1是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go语言中的通道(channel)是一种特殊类型,通道像一个传送带或者队列,总是遵循先进先出的规则,保证数据的收发顺序,每一个通道都是一个具体类型的导管,也即是声明channel时候需要为其制定元素类型。

channel类型

channel是一种类型,一种应用类型,声明通道类型的格式如下:

var 变量 chan 元素类型

示例:

var ch1 chan int  // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

创建channel

通道是引用类型,通道类型的控制是nil

var ch chan int
fmt.Println(ch)  // <nil>

声明的通道需要使用make函数初始化后才能使用

创建channel的格式如下:

make(chan 元素类型,[缓冲大小])

channel操作

通道有读、写和关闭三种操作。

读和写都是用<-符号。

// 初始化一个channel
ch := make(chan int)

// write to channel
ch <- 123

// read from channel
x := <- ch
<- ch  // 忽略结果

// close channel
chose(ch)

关于channel的关闭,需要注意:

  • 关闭一个未初始化(nil)的channel会产生panic
  • 重复关闭同一个channel会产生panic
  • 向一个已关闭的channel中发送信息会产生panic
  • 从已关闭的channel读取消息不会产生panic,且能读出channel中还未被读取的消息,若消息均已读出,则会读到类型的零值
  • 从一个已关闭的channel中读取消息永远不会阻塞,并且会返回一个为falseok-idiom,可以用来判断channel是否关闭
  • 关闭channel会产生一个广播机制,所有向channal读取消息的goroutine都会受到消息

channel类型

channel分不带缓冲区的channel和带缓冲区的channel

无缓冲区

无缓冲channel从无缓冲的channel中读取消息会则色,直到有goroutine向该channel中发送消息;同理,向无缓冲区的channel中发送消息也会阻塞,直到有goroutinechannel中读取消息。

使用无缓冲通道进行通道将导致发送和接收的goroutine同步化,因此无缓冲通道也被称为同步通道

func recv(c chan int){
	ret := <-c
	fmt.Println("接收成功",ret)
}

func main() {
	ch := make(chan int)
	go recv(ch)  // 启动goroutine从通道接收值
	ch <- 10  // 发送值
	fmt.Println("发送成功")
}
有缓存的通道

有缓存的channel的声明方式为指定make函数的第二个参数,该参数为channel缓存的容量。

有缓存的channel类似于一个阻塞队列(采用环形数组实现)。当缓存未满时,向channel中发送消息不会阻塞,当缓存满时,发送操作将会阻塞,直到有其他goroutine从中读取消息;相应的,当channel中消息不为空是,读取消息不会出现阻塞,当channel为空时,读取操作会发生阻塞,直到有goroutinechannel中写入消息。

我们可以通过使用内置的len()函数获取通道内元素的数量,使用cap()函数获取通道的容量。

func main() {
	ch := make(chan int, 1) // 创建一个容量为1的有缓存区通道

	ch <- 10
	fmt.Println("len(ch) = ",len(ch))  // len(ch) =  1

	fmt.Println("cap(ch) = ",cap(ch))  // cap(ch) =  1

	fmt.Println("发送成功")
}

for range 从通道循环取值

当通道被关闭时,再往该通道发送值就会引发panic,从该通道取值的操作会先取完同道中的值,然后去到的值一致都是对应类型的零值。如果判断一个通道是否被关闭了呢?

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// 开启goroutine将0-100的数发送到ch1
	go func() {
		for i:=0;i<100;i++{
			ch1 <- i
		}
		close(ch1)
	}()
	// 开启goroutine从ch1中接收值,并将该值的平法发送到ch2中
	go func(){
		for{
			i,ok:=<-ch1  //通道关闭再取值ok=false
			if !ok{
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()

	// 在主goroutine中从ch2中接收值
	for i := range ch2{ // 通道关闭后会推出for range循环
		fmt.Println(i)
	}
}

单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

Go语言中提供了单向通道来处理这种情况

func counter(out chan<- int){
	for i:=0;i<100;i++{
		out <- i
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int){
	for i := range in{
		out <- i * i
	}
	close(out)
}

func printer(in <-chan int){
	for i := range in{
		fmt.Println(i)
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go counter(ch1)
	go squarer(ch2,ch1)
	printer(ch2)
}

其种,chan <- int是一个只写单向通道(只能对其写入int类型值),可以对其执行发送操作,但是不能执行接收操作。<-chan int是一个只读单向通道(只能从其读取int类型值),可以对其执行接收操作但是不能执行发送操作。

在函数传参以及任何赋值操作中可以将双向通道转换为单向通道,但反过来不可以。

channel创建异常总结

channel nil 非空 未满
接收 阻塞 接收值 阻塞 接收值 接收值
发送 阻塞 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功,读取数据后返回零值 关闭成功,返回零值 关闭成功,读完数据后返回零值 关闭成功,读完数据后返回零值

关闭已经关闭的channel也会引发panic

worker pool(goroutine池)

在工作中我们通常会使用可以指定启动的gotoutine数量,worker pool模式,控制goroutine的数量,防止goroutine泄露和暴涨。

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 开启三个goroutine
	for i:=1;i<=3;i++{
		go worker(i,jobs,results)
	}

	// 5个任务
	for i := 1; i <= 5; i ++{
		jobs <- i
	}
	close(jobs)

	// 输出结果
	for a:=1;a<=5;a++{
		<-results
	}
}

select多路复用

在某些场景下我们需要同时从多个通道接收数据,通道在接收数据时,如果没有数据可以接收将会发生阻塞。

for{
    // 尝试从ch1接收值
    data,ok := <- ch1
    // 尝试从ch2接收值
    data,ok := <- ch2
    ...
}

上面的方式虽然可以实现从多个通道接收值的需求,但是运行性能会差很多,为了应对这种场景,Go内置了select关键字,可以同时响应多个通道的操作。

select的使用类似于switch语句,它有一些列case分支和一个默认的分支,每个case会对应一个通道的通信(接收或发送)。select会一致等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。

select{
    case <- ch1:
    	...
    case data:=<-ch2:
    	...
	case ch3<-data:
    	...
    default:
    	默认操作
}

示例:

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
		}
	}
}

使用select语句能提高代码的可读性。

可处理一个或多个channel的发送/接收操作。

如果多个case同时满足,select会随机选择一个

对于没有case的select{}会一致等待,可用于阻塞main函数

死锁问题

同一gotoutine中,使用同一个channel读写

func main(){
    ch := make(chan int) // 这就是在main里面发生死锁情况
    ch <- 6  // 这里发生一直阻塞的情况,执行不到下一句
    <- 6
}

2个以上的goroutine,使用同一个channel通信,读写channel先与goroutine创建

func main(){
    ch := make(chan int)
    ch <- 666  // 这里会一直阻塞,运行不到下面
    go func(){  // 这里创建了子goroutine,但是上面会一直阻塞,运行不到下面
        <- ch 
    }()
}

如果想不成为死锁,那匿名函数子goroutine就要放在ch<-666这条语句的前面。

2个以上的goroutine,使用多个channel通信

A goroutine获取channel 1的同时,尝试使用channel 2,同一时刻,B goroutine获取channel 2同时,尝试使用channel 1

func main(){
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func(){
        for {
            select {  // 这里互相等待造成死锁
                case <-ch1: // 这里ch1有数据读出来才会执行下一句
                	ch2<- 777 
                
            }
        }
    }()
    for{
        select {
            case <-ch2: // 这里ch2有数据读出来才会执行下一句
            	ch1 <- 999
        }
    }
}

读写模式的锁定不要互相阻塞

隐形死锁:系统的两个或多个任务之间互相阻塞对方,形成事实上的死锁局面,然而只要有可运行的协程,编译器就不会显式地报死锁错误.

开发中真正可怕的不是显式死锁,而是隐形死锁

func main(){
    var rwm sync.RWMutex
    ch := make(chan int,0)
    // 子协程负责写入
    go func(){
        // 连锁都抢不到
        rwm.Lock()
        ch <- 123
        rwm.Unlock()
    }()
    go func(){
        // 本协程负责读出
        rwm.RLock()
        // 只要读不到内容就永远阻塞
        x := <- ch
        fmt.Println("读到:",x)
        rwm.RUnlock()
    }()
    for {
        // 通知垃圾回收来清理垃圾(即使不叫也会定时清理)
        runtime.GC()
    }
}

定时器

定时器概述

Go为我们提供了两种方式的计时器:

定时执行任务的计时器和周期性执行任务的计时器

固定时间定时器

func main() {
	// 创建两秒的定时器
	timer := time.NewTimer(2 * time.Second)
	fmt.Println("当前时间:", time.Now())
	//当前时间: 2020-05-13 09:12:41.0018223 +0800 CST m=+0.006835901

	//两秒后,从单向时间管道中读取内容(当前时间)
	// timer.C是一个单向的时间管道
	t := <- timer.C
	fmt.Println("t = ",t)
	// t =  2020-05-13 09:12:43.0313903 +0800 CST m=+2.006835901
}

上面的示例演示了如何使用定时器延时两秒执行一项任务。上面的示例也可以写成下面的形式。

func main() {
	fmt.Println("开始计时")
	// 创建2秒的定时器,得到其单向输出时间管道,阻塞两秒后读出数据
	<- time.After(2 * time.Second)
	fmt.Println("时间到")
}
提前终止计时器

计时器被中途stop掉了,被延时的goroutine将永远得不到执行,

func main() {
	// 创建3秒的定时器
	timer := time.NewTimer(3*time.Second)
	// 3秒后从定时器时间管道中读取时间
	go func(){
		<- timer.C
		fmt.Println("子goroutine可以打印了,因为定时器的时间到了")
	}()
	
	// 停止定时器,停止状态下,计时器失效,被timer.C锁阻塞的子goroutine永远读不出数据
	timer.Stop()

	// 主goroutien为子goroutine留出足够的时间
	time.Sleep(6*time.Second)
	fmt.Println("Game Over")
}

中途重置定时器

下面的例子中,timer在配置为延时10秒执行后,又被重置为1秒,所以其时间延时为一秒。

需要注意的是:如果在reset的一刹那,定时器已经到时或者已被stop掉,则reset是无效的。

func main() {
	// 创建10秒的定时器
	timer := time.NewTimer(10 * time.Second)

	// 重置为1秒
	// 如果已经到时,或者已经stop,则重置失败
	ok := timer.Reset(1 * time.Second)
	fmt.Println("OK = ", ok, time.Now())  
    // OK =  true 2020-05-13 09:43:12.0831215 +0800 CST m=+0.073242201


	// 1秒后即可读出时间
	t := <- timer.C
	fmt.Println("时间到", t)
    // 时间到 2020-05-13 09:43:13.121344 +0800 CST m=+1.074218801
}

周期性执行任务

下面的示例将每隔一秒输出一次当前时间,5次后程序结束

func main() {
	// 创建一秒的秒表
	ticker := time.NewTimer(1 * time.Second)
	i := 0
	for {
		// 从秒表的管道中读出时间
		t := <- ticker.C
		i ++
		fmt.Println("i =", i, t)

		// 停止秒表
		if i == 5{
			ticker.Stop()
			break
		}
	}
	fmt.Println("Game Over")
}

互斥锁

有时候在Go代码中可能存在多个goroutine同时操作一个资源(临界区),这种情况会发生竟态问题(数据竟态)。

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

典型案例如银行账户,银行卡在存取过程中,存折是不允许在同一时间进行存取操作的,例如卡刚刚取走500,在查询余额时恰好存折又存入500,银行卡在查询余额时会误以为银行卡并没有扣款,这显然应该是避免的。所以我们不允许银行卡和存折并发地执行存取操作,必须同步串行有先后地执行存取,这样才不会带来脏读和幻读。

我们可以通过抢互斥锁(sync.Mutex)的方式来强制存取操作同步。

互斥锁原理:对于有必要强制同步串行的任务,我们规定它只有得到互斥锁才有执行权,而全局只有一把互斥锁,谁先抢到谁就获得任务执行权,任务进行的过程中如果有其他协程想要得到执行权,它必须阻塞等待至当前任务协程释放同步锁。

示例一:

func main() {
	// 必须保证并发安全的数据
	type Account struct {
		money float32
	}

	var wg sync.WaitGroup
	account := Account{money: 1000}
	fmt.Println(account)
	
	//资源互斥锁(谁抢到锁,谁先访问资源,其他人阻塞等待)
	//全局就这么一把锁,谁先抢到谁操作,其他人被阻塞直到被释放
	var mt sync.Mutex
	
	wg.Add(2)
	// 银行卡取钱
	go func() {
		defer wg.Done()
		// 拿到互斥锁
		mt.Lock()
		// 加锁的访问
		fmt.Println("取钱前:", account.money)
		account.money -= 500
		time.Sleep(time.Nanosecond)
		fmt.Println("取钱后:", account.money)
		
		// 释放互斥锁
		mt.Unlock()
	}()
	go func(){
		defer wg.Done()
		// 拿到互斥锁(如果别人先抢到,则阻塞等待)
		mt.Lock()
		fmt.Println("存钱前:", account.money)
		account.money += 500
		time.Sleep(time.Nanosecond)
		fmt.Println("存钱后:", account.money)
		// 释放互斥锁
		mt.Unlock()
	}()

	wg.Wait()
}

上面示例中,银行卡无论谁先抢到资源锁,都立刻对同步锁进行锁定(mt.Lock()),在其存取操作没有结束之前,另一个必须阻塞等待直至前者将互斥锁释放(mt.Unlock()

示例二:

在上面的示例中,银行卡和存折的存取操作,必须强制同步,否则形成数据的脏读或幻读。但是如果是查询上个月的银行流水或者仅仅是查询用户名之类的只读操作,则没有强制同步的必要,完全可以并发执行。于是对于上面的例子稍作修改,使得对银行账号的强制同步仅限于存取操作,而对于其他操作则放开权限令其可以被并发执行。

原理很简单,没有必要强制同步的任务,不去抢互斥锁就是了,需要确保同步的任务就先抢锁后执行,其余的则不去抢锁,直接执行。

type Account struct {
	name  string
	money float32
	// 定义该数据的互斥锁
	mt sync.Mutex
}

// 本方法不能被并发执行,并发是安全的
func (a *Account) saveGet(amount float32) {
	// 先将资源锁起来
	a.mt.Lock()
	// 执行操作
	fmt.Println("操作前:", a.money)
	a.money += amount
	fmt.Println("操作后:", a.money)
	<-time.After(3 * time.Second) // 阻塞三秒
	// 释放资源
	a.mt.Unlock()
}

// 本方法可以被并发,不是并发安全的,无此必要
func (a *Account) getName() string {
	return a.name
}

func main() {
	a := Account{
		name:  "张全蛋",
		money: 1000,
	}
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		// 调用一个加锁的方法(同步)
		a.saveGet(500)
		wg.Done()
	}()

	go func() {
		// 调用一个普通的方法
		fmt.Println(a.getName())
		wg.Done()
	}()

	wg.Wait()
}

通过信号量控制并发数

控制并发数属于常用的调度,我们的做法是规定并发的任务都必须现在某个监视管道中进行注册,而这个监视管道的缓存能力是固定的,比如说5,那么注册在该管道中的并发能力也是5

var sema chan int

// 该函数只允许5次并发执行
func f1(i int) int {
	sema <- 1
	<- time.After(2*time.Second)
	<- sema
	return i*i
}

// 信号量:通过控制管道的"带宽"(缓存能力)控制并发数
func main() {
	// 定义信号量为5"带宽"的管道
	sema = make(chan int, 5)
	var wg sync.WaitGroup
	for i:=0;i<100;i++{
		wg.Add(1)
		go func(index int) {
			ret := f1(index)
			fmt.Println(index,ret)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

读写互斥锁

互斥锁是完全互斥锁的,但是有很多实际的场景下是读多写少,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写互斥锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁后,其他的goroutine如果是获得读锁会继续获得锁,如果是获得写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

简而言之,只读模式:多路只读不可写;只写模式:单路只写不可读。

// 定义读写锁
var (
	rwMutex sync.RWMutex
	wg sync.WaitGroup
)


func read(i int){
	rwMutex.RLock()   // 加锁
	fmt.Println(i, "reading...")
	time.Sleep(time.Second)
	fmt.Println(i,"read over")
	rwMutex.RUnlock()
	wg.Done()
}

func write(i int){
	rwMutex.Lock()  // 加锁
	fmt.Println(i,"writing...")
	time.Sleep(time.Second)
	fmt.Println(i,"write over")
	rwMutex.Unlock()
	wg.Done()
}

func main() {
	start := time.Now()
	for i:=0;i<1000;i++{
		wg.Add(1)
		go read(i)
	}
	for i:=0;i<10;i++{
		wg.Add(1)
		go write(i)
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(end.Sub(start))
}

需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。

sync.WaitGroup

在代码中生硬的使用time.Sleep可定时不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步,sync.WaitGroup有以下几个方法:

方法名 功能
(wg *WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少,例如当我们启动了N个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件,只关闭一次通道等。

Go语言中的sync包中提供了一个针对只执行一次场景的结果方案。

sync.Once只有一个Do方法,其签名如下:

func (o *Once) Do(f func()){}

备注:如果要执行的函数f,需要传递参数就需要搭配闭包来使用。

加载配置文件示例

延迟一个开销很大的初始化操作真正用到它的时候在执行是一个很好的实践,因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。

var icons map[string]image.Image

func loadIcons(){
	icons = map[string]image.Image{
		"left": Icon("left.png"),
		"up": Icon("up.png"),
		"right": Icon("right.png"),
		"down": Icon("down.png"),
	}
}

//Icon被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image{
	if icons == nil{
		loadIcons()
	}
	return icons[name]
}

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由的重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

func loadIcons(){
    icons = make(map[string]image.Image)
    icons["left"] = Icon("left.png")
    icons["up"] = Icon("up.png")
    icons["right"] = Icon("right.png")
    icons["down"] = Icon("down.png")
    
}

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的就是添加互斥锁,保证初始化icons的时候不会被其他goroutine操作,但是这样做又会引发性能问题。

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

并发安全的单例模式

下面是借助sync.Once实现的并发安全的单例模式

type singleton struct {}
var instance * singleton
var once sync.Once

func getInstance() *singleton{
	once.Do(func(){
		instance = &singleton{}
	})
	return instance
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

Go语言中内置的map不是并发安全的。

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v
", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

上面的代码开启了少量的几个goroutine的时候可能没有什么问题,当并发多了之后执行上面的代码就会报fatal error:concurrent map writes错误。

像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发版本mapsync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)
			value, _ := m.Load(key)
			fmt.Printf("k=:%v,v:=%v
", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

atomic包

方法 解释
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
写入操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操作

示例:

type Counter interface {
	Inc()
	Load() int64
}

// 普通版
type CommonCounter struct {
	counter int64
}

func (c *CommonCounter) Inc() {
	c.counter++
}

func (c *CommonCounter) Load() int64 {
	return c.counter
}

// 互斥锁版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// 原子操作版
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main() {
	c1 := CommonCounter{} // 非并发安全
	test(&c1)
	c2 := MutexCounter{} // 使用互斥锁实现并发安全
	test(&c2)
	c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
	test(&c3)
}
原文地址:https://www.cnblogs.com/huiyichanmian/p/12889308.html