sync:与golang的并发息息相关的包

楔子

我们知道golang除了兼顾了开发速度和运行效率之外,最大的亮点就是在语言层面原生支持并发,也就是通过所谓的goroutine。不过既然是并发,那么就势必会面临很多问题。比如:资源竞争,多个goroutine同时访问一个资源会发生竞争从而产生意想不到的结果,那么这时候我们会通过加锁来解决;主goroutine不能先退出,这时候我们会等待子goroutine。还有单例模式,以及对象池等等。那么golang是如何实现的呢?就是通过下面我们要介绍的sync包。

sync.Mutex

sync.Mutex称为互斥锁,主要就是解决资源竞争的问题,这个时候我们通过对会发生资源竞争的部分进行加锁来解决。因为锁只有一把,只能一个goroutine执行完毕把锁释放,其他的goroutine才有机会执行。

我们先来看看不加锁的版本

package main

import (
	"fmt"
	"time"
)

var num = 0

func add() {
	i := 0
	for ; i < 100000; i++ {
		num++
	}
}

func sub() {
	i := 0
	for ; i < 100000; i++ {
		num--
	}
}

func main() {

	go add()
	go sub()
	time.Sleep(time.Second * 3)
	fmt.Println(num)  //1314
}
//如果发现结果正常(为0的话),那就多执行几遍

此时由于没有加锁,那么两个goroutine都访问num,那么得到的结果是会有误差的,理论上应该是0才对。下面我们就来加锁:

package main

import (
	"fmt"
	"sync"
	"time"
)

var num = 0

func add(lock *sync.Mutex) {
	i := 0
	for ; i < 100000; i++ {
		//表示上锁,在该锁为释放的话,那么其他goroutine使用lock.Lock()的时候就会阻塞
		lock.Lock()
		num++
		//不要忘记把锁释放,否则其他goroutine在尝试获取锁的时候都会阻塞
		lock.Unlock()
	}
}

func sub(lock *sync.Mutex) {
	i := 0
	for ; i < 100000; i++ {
		lock.Lock()
		num--
		lock.Unlock()
	}
}

func main() {

	//这个sync.Mutex是一个结构体,传递的时候是值传递,因此我们需要传递指针
	var lock = new(sync.Mutex)
	go add(lock)
	go sub(lock)
	time.Sleep(time.Second * 3)
	fmt.Println(num)  //0
}

sync.RWMutex

Mutex表示互斥锁,RWMutex表示读写互斥锁(简称:"读写锁")。为什么会有读写互斥锁,因为我们不希望多个goroutine对同一个资源进行修改,但如果是读取的话还是可以的。而Mutex比较严格,只要我上锁了,那么其它人就无法访问了,无论你是读还是写,必须等我完事之后你才可以开始。于是就有了RWMutex,读写锁的话,就可以解决这一问题。读写锁,可以设置为读锁,也可以设置为写锁。

  • 写锁:设置为写锁,那么此时就等同于互斥锁,其他goroutine不可读也不可写
  • 读锁:设置为读锁,那么其它goroutine可以读,但是不可以写。
// Lock 将 rw 设置为写锁定状态,禁止其他goroutine读取或写入。
func (rw *RWMutex) Lock()

// Unlock 解除 rw 的写锁定状态,如果 rw 未被写锁定,则该操作会引发 panic。
func (rw *RWMutex) Unlock()

// RLock 将 rw 设置为读锁定状态,禁止其他goroutine写入,但可以读取。
func (rw *RWMutex) RLock()

// Runlock 解除 rw 的读锁定状态,如果 rw 未被读锁定,则该操作会引发 panic。
func (rw *RWMutex) RUnlock()

先设置为写锁

package main

import (
	"fmt"
	"sync"
	"time"
)

var num = 0

func add(lock *sync.RWMutex) {
	defer lock.Unlock()
	lock.Lock()
	time.Sleep(1e9)
	fmt.Println("add")
}

func sub(lock *sync.RWMutex) {
	defer lock.Unlock()
	lock.Lock()
	fmt.Println("sub")
}

func main() {

	var lock = new(sync.RWMutex)
	go add(lock)
	//确保add先执行
	time.Sleep(1000)
	go sub(lock)
	time.Sleep(time.Second * 2)
	/*
	add
	sub
	 */
}

设置为写锁,那么由于是add函数先执行,尽管里面出现了sleep,但是add函数先将锁获取了,所以必须要等add先执行完,才可以执行sub。

设置为读锁

package main

import (
	"fmt"
	"sync"
	"time"
)

var num = 0

func add(lock *sync.RWMutex) {
	defer lock.RUnlock()
	lock.RLock()
	time.Sleep(1e9)
	fmt.Println("add")
}

func sub(lock *sync.RWMutex) {
	defer lock.RUnlock()
	lock.RLock()
	fmt.Println("sub")
}

func main() {

	var lock = new(sync.RWMutex)
	go add(lock)
	//确保add先执行
	time.Sleep(1000)
	go sub(lock)
	time.Sleep(time.Second * 2)
	/*
	sub
	add
	 */
}

我们看到设置为读锁,那么sub就不会等待add了,因为大家都是读锁。

sync.Cond

sync.Cond用在goroutine之间,用于协程的挂起和唤醒。这个Cond是需要通过锁才能实现,也就是底层还是使用了锁。调用cond.L.Lock()会进行上锁,但是其它的goroutine同时也是可以获取锁的,因此锁不是唯一的,而一旦调用cond.Wait(),那么程序会阻塞在这里(将当前goroutine加入到等待队列里),比如使用另一个goroutine将其唤醒。唤醒的方式有两种:cond.Signal,唤醒等待队列里面的一个goroutine;cond.Broadcase,唤醒等待队列里面的所有goroutine。

package main

import (
	"fmt"
	"sync"
	"time"
)


func add1(cond *sync.Cond) {
	// 获取锁
	cond.L.Lock()
	defer cond.L.Unlock()
	fmt.Println("add1已成功获取锁")
	//此时程序会卡在这个地方,直到另一个goroutine唤醒
	cond.Wait()
	fmt.Println("add1醒了")
}

func add2(cond *sync.Cond) {
	cond.L.Lock()
	defer cond.L.Unlock()
	fmt.Println("add2已成功获取锁")
	cond.Wait()
	fmt.Println("add2醒了")
}



func main() {

	//我们同样需要传递指针
	var cond = new(sync.Cond)
	//Cond是需要搭配锁来执行的
	cond.L = new(sync.Mutex)
	//或者我们在创建cond的时候直接通过 var cond = sync.NewCond(new(sync.Mutex))
	go add1(cond)
	go add2(cond)

	time.Sleep(time.Second)
	//唤醒一个goroutine
	cond.Signal()
	time.Sleep(time.Second)
    /*
    add1已成功获取锁
    add2已成功获取锁
    add1醒了
    */
}
package main

import (
	"fmt"
	"sync"
	"time"
)


func add1(cond *sync.Cond) {
	// 获取锁
	cond.L.Lock()
	defer cond.L.Unlock()
	fmt.Println("add1已成功获取锁")
	cond.Wait()
	fmt.Println("add1醒了")
}

func add2(cond *sync.Cond) {
	cond.L.Lock()
	defer cond.L.Unlock()
	fmt.Println("add2已成功获取锁")
	cond.Wait()
	fmt.Println("add2醒了")
}



func main() {

	var cond = new(sync.Cond)
	cond.L = new(sync.Mutex)
	go add1(cond)
	go add2(cond)

	time.Sleep(time.Second)
	cond.Broadcast()
	time.Sleep(time.Second)
	/*
	add1已成功获取锁
	add2已成功获取锁
	add1醒了
	add2醒了
	 */
}

sync.WaitGroup

我们看一下上面写的代码,是不是很low呢?因为我们希望主线程等待子协程执行完毕之后再退出,使用的方式是time.Sleep,这是很低级的,当然在介绍语法的时候很方便。但是在项目开发中肯一般不会这么写,而且你也不知道子协程什么时候执行完毕。于是就有了组的概念,sync.WaitGroup是一个结构体,有以下三个方法。

// 计数器增加 delta,delta 可以是负数。
func (wg *WaitGroup) Add(delta int)

// 计数器减少 1
func (wg *WaitGroup) Done()

// 等待直到计数器归零。如果计数器小于 0,则该操作会引发 panic。
func (wg *WaitGroup) Wait()

//所以只要计数器不为0,那么Wait会阻塞
//Add会使计数器增加指定的数值
//Done会使计数器减一

//那么你想到了什么?
//对,假设我们要开20个协程,那么就Add(20)
//每执行一个协程Done()一下
//Wait()不就会等待所有的子协程全部执行完毕吗
package main

import (
	"fmt"
	"sync"
)


func add(wg *sync.WaitGroup, value int) {
	fmt.Printf("satori %d号
", value)
	wg.Done()
}


func main() {

	//我们同样需要传递指针
	var wg = new(sync.WaitGroup)
	wg.Add(10)
	for i:=0;i<10;i++{
		go add(wg, i)
	}
	wg.Wait()
	/*
	satori 4号
	satori 0号
	satori 1号
	satori 3号
	satori 9号
	satori 6号
	satori 5号
	satori 7号
	satori 8号
	satori 2号
	 */
}

sync.Once

sync.Once可以保证一些对象只被实例化一次,或者某个函数只被执行一次。经常用于单例模式、系统初始化等等。再比如channel的Close,对一个通道进行多次Close会引发panic,那么我们通过sync.Once就可以保证channel只会被Close一次。

package main

import (
	"fmt"
	"sync"
)


func foo(){
	fmt.Println(123)
}

func mmp(once *sync.Once){
	once.Do(foo)
}



func main() {
	var once = new(sync.Once)
	for i:=0;i<10;i++{
		once.Do(foo)
		/*
		123
		 */
	}
	//once.Do里面函数只会被执行一次。
	//这里的也不会被执行,因为我们传递的是指针,如果传值的话会进行拷贝,那么还是会执行的,因为不是一个sync.Once对象了
	mmp(once)
}

另外我们发现once.Do里面的函数,是一个函数名,而且参数类型也指明了是一个无参无返回值的函数。那如果需要参数呢?很简单,使用闭包即可。

package main

import (
	"fmt"
	"sync"
)

func girl(name string) func() {
	return func() {
		fmt.Println("i'm a girl named", name)
	}
}

func main() {
	var once = new(sync.Once)
	once.Do(girl("mashiro1"))
	once.Do(girl("mashiro2"))
	once.Do(girl("mashiro3"))
	/*
	i'm a girl named mashiro1
	 */
}

sync.Pool

从名字也能看出来,sync.Pool指的是临时对象池,为了减少GC的负担,我们对于那些可能会后续使用、但是暂时不用的对象放到池子里,当使用的时候,再从池子里面拿出来。

package main

import (
	"bytes"
	"fmt"
	"sync"
)

func main() {
	//Pool是一个结构体,里面有一个New字段,接收一个无参、返回值为interface{}类型的函数
	var pool = sync.Pool{
		New: func() interface{} {
			return &bytes.Buffer{}
		},
	}

	//如果我们创建Pool的时候,指定了函数,那么池子里面就有东西了,就是函数的返回值
	//可以直接使用Get函数获取,但它是一个interface{},所以我们要转换成相应的类型
	//如果初始胡没有指定,那么获取的结果就是nil
	buf := pool.Get().(*bytes.Buffer)
	buf.WriteString("哈哈")
	pool.Put(buf) //调用put函数,可以将对象放回去
	//然后我们再取出来
	buf = pool.Get().(*bytes.Buffer)
	// 成功打印我们写入的内容
	fmt.Println(buf.String())  //哈哈

	//这个pool不一定非要放相同的对象
	var num  = 123
	pool.Put(num)
	//我方进去一个int也是可以的
	num = pool.Get().(int)
	fmt.Println(num) // 123
}

如果初始化的时候,不指定函数。

package main

import (
	"fmt"
	"sync"
)

func main() {
	//Pool是一个结构体,里面有一个New字段,接收一个无参、返回值为interface{}类型的函数
	var pool = sync.Pool{}
	//不指定的话,是一个nil
	fmt.Println(pool.Get()) // <nil>

	//这个时候可以直接put
	//但是我们也可以指定函数
	pool.New = func() interface{} {
		return 123
	}
	fmt.Println(pool.Get().(int)) // 123
}

sync.Map

golang在1.9的时候,引用了sync.Map,它是原生支持并发安全的map。对于普通的map,我们一般够用了,尽管它并不是线程安全的。但是有时我们需要涉及到线程安全的时候,我们可以使用sync.Map。sync.Map和原生的map语法差别较大,但是很好理解。

package main

import (
	"fmt"
	"sync"
)

func main() {
	var m sync.Map
	//设置key  value
	//参数类型都是interface{}
	m.Store("name", "satori")
	m.Store("age", 17)
	m.Store("gender", "f")

	//Load:查找一个key,如果存在那么返回 值和true,否则返回 nil和false
	if value, ok := m.Load("name"); ok {
		fmt.Println(value) // satori
	}

	//LoadOrStore:查找一个key的同时指定一个value
	//如果查找的key存在,那么返回 对应的值和true; 不存在就将该key和指定的value设置进去,返回 设置的值和false
	fmt.Println(m.Load("where"))  // <nil> false
	//设置成功
	fmt.Println(m.LoadOrStore("where", "japan"))  //japan false

	//再次获取,设置成功
	fmt.Println(m.Load("where"))  // japan true
	fmt.Println(m.LoadOrStore("where", "America"))  //japan true

	//遍历,接收一个函数,参数是两个interface{},返回一个bool
	m.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
		/*
		name satori
		age 17
		gender f
		where japan
		 */
	})

	//删除一个元素
	fmt.Println(m.Load("gender"))  // f true
	m.Delete("gender")
	fmt.Println(m.Load("gender"))  // <nil> false
}

下面我们来测试一下同时写和删除有什么表现,一个goroutine往map里面写,一个从map里面删除。

package main

import (
	"time"
)

func readMap(m map[int]int){
	for i:=0;i<5;i++{
		time.Sleep(1000)
		delete(m, i)
	}
}

func writeMap(m map[int]int){
	for i:=0;i<6;i++{
		time.Sleep(1000)
		m[i] = 1
	}
}

func main() {
	var m = make(map[int]int)
	go readMap(m)
	go writeMap(m)
	time.Sleep(time.Second)
	/*
	fatal error: concurrent map writes

	goroutine 18 [running]:
	runtime.throw(0x47d496, 0x15)
	...
	...
	 */
}

我们看到对于普通map直接报错了,我们再来试试sync.Map

package main

import (
	"fmt"
	"sync"
	"time"
)

func readMap(m *sync.Map){
	for i:=0;i<5;i++{
		time.Sleep(1000)
		m.Delete(i)
	}
}

func writeMap(m *sync.Map){
	for i:=0;i<6;i++{
		time.Sleep(1000)
		m.Store(i, 0)
	}
}

func main() {
	var m = new(sync.Map)
	go readMap(m)
	go writeMap(m)
	time.Sleep(time.Second)

	//此时则没有任何问题,因此sync.Map是线程安全的。
	m.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
	})
	/*
	2 0
	4 0
	5 0
	 */
	//但是打印的结果貌似不正常,因为我们写了6个,但是删了5个,应该剩下一个啊
	//其实goroutine的执行顺序不确定,有可能删的时候还没有这个key呢,但是等到有的时候就进入下一层循环了。
	//所以上面的结果是正常的。
}
原文地址:https://www.cnblogs.com/traditional/p/12221809.html