Golang并发编程-传统的同步工具"锁"实战篇

        Golang并发编程-传统的同步工具"锁"实战篇

                               作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

 

  我们为了解决go程同步的问题我们使用了channel,但是GO也提供了传统的同步工具(锁)。它们都在GO的标准库代码包sync和sync/atomic中。接下来我们就一起学习一下吧~

 

一.传统的同步工具"锁"概述

1>.锁的作用

  什么是锁:
    就是某个go程(线程)在访问某个资源时先锁住,防止其它go程的访问,等访问完毕解锁后其他go程再来加锁进行访问。这和我们生活中加锁使用公共资源相似,例如:公共卫生间。

  锁的作用:
    为了在并发编程的时候,让数据一致。

2>.死锁问题

  死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。

  在使用锁的过程中,很容易造成死锁,在开发中应该尽量避免死锁。
package main

import (
    "fmt"
)

func main() {

    //注意,无缓冲区channel在读端和写段都准备就绪的时候不阻塞
    s1 := make(chan int)

    /**
    主线程写入:
        主Go程再写入数据时,但此时并没有读端准备就绪,因此代码会在该行阻塞哟~我们称之"死锁"
        在开发中一定使用锁机制的时候一定要注意避免"死锁"现象哟~
    */
    s1 <- 5

    /**
    子线程读取:
        通过上面的解释,相比大家心里也清楚,代码在上一行已经阻塞了,压根就没有机会执行到当前行,即没有开启子Go程.
    */
    go func() {
        fmt.Println(<-s1)
    }()
}
"死锁"案例代码

3>.死锁案例解决方案

package main

import (
    "fmt"
    "time"
)

func main() {

    //注意,无缓冲区channel在读端和写段都准备就绪的时候不阻塞
    s1 := make(chan int)

    /**
    子线程读取:
        先开启一个子Go程用于读取无缓冲channel中的数据,此时由于写端未就绪因此子Go程会处于阻塞状态,但并不会影响主Go程,因此代码可以继续向下执行哟~
    */
    go func() {
        fmt.Println(<-s1)
    }()

    /**
    主线程写入:
        此时读端(子Go程)处于阻塞状态正在准备读取数据,主Go程在写入数据时,子Go程会立即消费掉哟~
    */
    s1 <- 5

    for {
        time.Sleep(time.Second)
    }
}

二.互斥锁 

1>.什么是互斥锁

  每个资源都对应于一个可称为"互斥锁"的标记,这个标记用来保证在任意时刻,只能有一个go程(线程)访问该资源。其它的go程只能等待。

  互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库sync中的Mutex结构体类型表示。sync.Mutex类型只有两个公开的指针方法,Lock和Unlock。Lock锁定当前的共享资源,Unlock进行解锁。
  温馨提示:
    在使用互斥锁时,一定要注意:对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助defer,锁定后,立即使用defer语句保证互斥锁及时解锁

2>.互斥锁的案例

package main

import (
    "fmt"
    "sync"
    "time"
)

var mutex sync.Mutex //定义互斥锁

func MyPrint(data string) {
    mutex.Lock()         //添加互斥锁
    defer mutex.Unlock() //使用结束时自动解锁

    for _, value := range data { //迭代字符串的每个字符并打印
        fmt.Printf("%c", value)
        time.Sleep(time.Second) //模拟Go程在执行任务
    }
    fmt.Println()
}

func Show01(s1 string) {
    MyPrint(s1)
}

func Show02() {
    MyPrint("Jason Yin")
}

func main() {
    /**
    虽然我们在主Go中开启了2个子Go程,但由于2个子Go程有互斥锁的存在,因此一次只能运行一个Go程哟~
    */
    go Show01("尹正杰")
    go Show02()
    time.Sleep(time.Second * 30) //主Go程设置充足的时间让所有子Go程执行完毕~因为主Go程结束会将所有的子Go程杀死。
}

 

三.读写锁

1>.什么是读写锁

  互斥锁的本质是当一个goroutine访问的时候,其他goroutine都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行。其实,当我们对一个不会变化的数据只做""操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少goroutine同时读取,都是可以的。

  所以问题不是出在""上,主要是修改,也就是""。修改的数据要同步,这样其他goroutine才可以感知到。所以真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操作的必要的。因此,衍生出另外一种锁,叫做读写锁。

  读写锁可以让多个读操作并发,同时读取,但是对于写操作是完全互斥的。也就是说,当一个goroutine进行写操作的时候,其他goroutine既不能进行读操作,也不能进行写操作。

  GO中的读写锁由结构体类型sync.RWMutex表示。此类型的方法集合中包含两对方法:
    一组是对写操作的锁定和解锁,简称"写锁定"和"写解锁":
      func (*RWMutex)Lock()
      func (*RWMutex)Unlock()

    另一组表示对读操作的锁定和解锁,简称为"读锁定"与"读解锁":
      func (*RWMutex)RLock()
      func (*RWMutex)RUnlock()

2>.读写锁的案例

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var (
    number int
    rwlock sync.RWMutex //定义读写锁
)

func MyRead(n int) {
    rwlock.RLock()         //添加读锁
    defer rwlock.RUnlock() //使用结束时自动解锁
    fmt.Printf("[%d] Goroutine读取数据为: %d
", n, number)
}

func MyWrite(n int) {
    rwlock.Lock()         //添加写锁
    defer rwlock.Unlock() //使用结束时自动解锁
    number = rand.Intn(100)
    fmt.Printf("%d Goroutine写入数据为: %d
", n, number)
}

func main() {

    //创建写端
    for index := 201; index <= 205; index++ {
        go MyWrite(index)
    }

    //创建读端
    for index := 110; index <= 130; index++ {
        go MyRead(index)
    }

    for {
        time.Sleep(time.Second)
    }
}

四.条件变量

1>.什么是条件变量

  条件变量:
    条件变量的作用并不保证在同一时刻仅有一个go程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的go程(线程)。
    条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。   GO标准库中的sync.Cond类型代表了条件变量。条件变量要与锁(互斥锁,或者读写锁)一起使用。成员变量L代表与条件变量搭配使用的锁。

2>.条件变量的案例

package main

import (
    "fmt"
    "runtime"
)
import "sync"
import "math/rand"
import "time"

/**
创建全局条件变量
*/
var cond sync.Cond

// 生产者
func producer(out chan<- int, idx int) {
    for {
        /**
        条件变量对应互斥锁加锁,即在生产数据时得加锁。
        */
        cond.L.Lock()

        /**
        产品区满3个就等待消费者消费
        */
        for len(out) == 3 {
            /**
            挂起当前go程, 等待条件变量满足,被消费者唤醒,该函数的作用可归纳为如下三点:
                1>.阻塞等待条件变量满足
                2>.释放已掌握的互斥锁相当于cond.L.Unlock()。注意:两步为一个原子操作。
                3>.当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()
            */
            cond.Wait()
        }

        /**
        产生一个随机数,写入到 channel中(模拟生产者)
        */
        num := rand.Intn(1000)
        out <- num
        fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据
", idx, num, len(out))

        /**
        单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(Go程)发送通知。换句话说,唤醒阻塞的消费者
        */
        //cond.Signal()

        /**
        广播通知,给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知。
        */
        cond.Broadcast()

        /**
        生产结束,解锁互斥锁
        */
        cond.L.Unlock()

        /**
        生产完休息一会,给其他Go程执行机会.
        */
        time.Sleep(time.Second)
    }
}

//消费者
func consumer(in <-chan int, idx int) {
    for {
        /**
        条件变量对应互斥锁加锁(与生产者是同一个)
        */
        cond.L.Lock()

        /**
        产品区为空 等待生产者生产
        */
        for len(in) == 0 {
            /**
            挂起当前go程, 等待条件变量满足,被生产者唤醒
            */
            cond.Wait()
        }

        /**
        将channel中的数据读走(模拟消费数据)
        */
        num := <-in
        fmt.Printf("[%dth] 消费者, 消费数据 %3d,公共区剩余%d个数据
", idx, num, len(in))
        /**
        唤醒阻塞的生产者
        */
        cond.Signal()
        /**
        消费结束,解锁互斥锁
        */
        cond.L.Unlock()

        /**
        消费完休息一会,给其他Go程执行机会
        */
        time.Sleep(time.Millisecond * 500)
    }
}

func main() {

    /**
    设置随机数种子
    */
    rand.Seed(time.Now().UnixNano())

    /**
    产品区(公共区)使用channel模拟
    */
    product := make(chan int, 3)

    /**
    创建互斥锁和条件变量(申请内存空间)
    */
    cond.L = new(sync.Mutex)

    /**
    创建3个生产者
    */
    for i := 101; i < 103; i++ {
        go producer(product, i)
    }

    /**
    创建5个消费者
    */
    for i := 211; i < 215; i++ {
        go consumer(product, i)
    }
    for { // 主go程阻塞 不结束
        runtime.GC()
    }
}

五.waitGroup

1>.什么是waitGroup

  WaitGroup用于等待一组Go程的结束。父线程调用Add方法来设定应等待的Go程的数量。每个被等待的Go程在结束时应调用Done方法。同时,主Go程里可以调用Wait方法阻塞至所有Go程结束。

  实现大致步骤如下:
    1>.创建 waitGroup对象。
      var wg sync.WaitGroup
    
2>.添加 主go程等待的子go程个数。       wg.Add(数量)
    
3>.在各个子go程结束时,调用defer wg.Done()。       将主go等待的数量-1。注意:实名子go程需传地址。
    
4>.在主go程中等待。       wg.wait()

2>.waitGroup案例

package main

import (
    "fmt"
    "sync"
    "time"
)

func son1(group *sync.WaitGroup) {
    /**
    在各个子go程结束时,一定要调用Done方法,它会通知WaitGroup该子Go程执行完毕哟~
    */
    defer group.Done()
    time.Sleep(time.Second * 3)
    fmt.Println("son1子Go程结束...")
}

func son2(group *sync.WaitGroup) {
    /**
    在各个子go程结束时,一定要调用Done方法,它会通知WaitGroup该子Go程执行完毕哟~
    */
    defer group.Done()
    time.Sleep(time.Second * 5)
    fmt.Println("son2子Go程结束")
}

func son3(group *sync.WaitGroup) {
    /**
    在各个子go程结束时,一定要调用Done方法,它会通知WaitGroup该子Go程执行完毕哟~
    */
    defer group.Done()
    time.Sleep(time.Second * 1)
    fmt.Println("son3子Go程结束~~~")
}

func main() {
    /**
    创建 waitGroup对象。
    */
    var wg sync.WaitGroup

    /**
    添加 主go程等待的子go程个数。该数量有三种情况:
        1>.当主Go程添加的子Go程个数和实际子Go程数量相等时,需要等待所有的子Go程执行完毕后主Go程才能正常退出;
        2>.当主Go程添加的子Go程个数和实际子Go程数量不等时有以下2种情况:
            a)小于的情况:只需要等待指定的子Go程数量执行完毕后主Go程就会退出,尽管还有其它的子Go程没有运行完成;
            b)大于的情况:最终会抛出异常"fatal error: all goroutines are asleep - deadlock!"
    */
    wg.Add(2)

    /**
    执行子Go程
    */
    go son1(&wg)
    go son2(&wg)
    go son3(&wg)

    /**
    在主go程中等待,即主Go程阻塞状态
    */
    wg.Wait()
}

原文地址:https://www.cnblogs.com/yinzhengjie2020/p/12702608.html