Go语言多线程 (转)

大多数语言使用线程+并发同步访问控制作为并发模型,而 Go 的并发模型由 goroutine 和 channel 组成。线程类似于 goroutine,而并发同步访问控制则类似于 mutex。

Go 并发的理念是:简单,尽量使用 channel,尽情使用 goroutine。

channel 是粘合 goroutine 的胶水,select 则是粘合 channel 的胶水。

线程是cpu调度的最小单位,不同的线程才能同时在多核cpu上同时运行。但线程太占资源,线程调度开销大。go中的goroutine是一个轻量级的线程,执行时只需要4-5k的内存,比线程更易用,更高效,更轻便,调度开销比线程小,可同时运行上千万个并发。
go语言中开启一个goroutine非常简单,go函数名(),就开启了个线程。

什么是 goroutine

Goroutine 可以看作对 thread 加的一层抽象,它更轻量级,可以单独执行。因为有了这层抽象,Gopher 不会直接面对 thread,我们只会看到代码里满天飞的 goroutine。操作系统却相反,管你什么 goroutine,我才没空理会。我安心地执行线程就可以了,线程才是我调度的基本单位。

goroutine 和 thread 的区别

谈到 goroutine,绕不开的一个话题是:它和 thread 有什么区别?

参考资料【How Goroutines Work】告诉我们可以从三个角度区别:内存消耗、创建与销毀、切换。

  • 内存占用

创建一个 goroutine 的栈内存消耗为 2 KB,实际运行过程中,如果栈空间不够用,会自动进行扩容。创建一个 thread 则需要消耗 1 MB 栈内存,而且还需要一个被称为 “a guard page” 的区域用于和其他 thread 的栈空间进行隔离。

对于一个用 Go 构建的 HTTP Server 而言,对到来的每个请求,创建一个 goroutine 用来处理是非常轻松的一件事。而如果用一个使用线程作为并发原语的语言构建的服务,例如 Java 来说,每个请求对应一个线程则太浪费资源了,很快就会出 OOM 错误(OutOfMermoryError)。

  • 创建和销毀

Thread 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池。而 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。

  • 切换

当 threads 切换时,需要保存各种寄存器,以便将来恢复:

16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc.

而 goroutines 切换只需保存三个寄存器:Program Counter, Stack Pointer and BP。

一般而言,线程切换会消耗 1000-1500 纳秒,一个纳秒平均可以执行 12-18 条指令。所以由于线程切换,执行指令的条数会减少 12000-18000。

Goroutine 的切换约为 200 ns,相当于 2400-3600 条指令。

因此,goroutines 切换成本比 threads 要小得多。

默认情况下,调度器仅使用单线程,要想发挥多核处理器的并行处理能力,必须调用runtine.GOMAXPROCS(n)来设置可并发的线程数,也可以通过设置环境变量GOMAXPROCS 

 threads := runtime.GOMAXPROCS(0)

package main

import (
    "runtime"
    "fmt"
)

func main(){
    go sayHello()
    go sayWorld()
    var str string
    fmt.Scan(&str)
}

func sayHello(){
    for i := 0; i < 10; i++{
        fmt.Print("hello ")
        runtime.Gosched()
    }
}

func sayWorld(){
    for i := 0; i < 10; i++ {
        fmt.Println("world")
        runtime.Gosched()
    }
}

启动了两个线程,其中一个线程输出一句后调用Gosched函数,释放CPU权限;之后另一个线程获得CPU权限。这样两个线程交替获得cpu权限,才输出了以上结果。

runtime.Goexit()函数用于终止当前的goroutine,但是defer函数将会继续被调用。

package main

import (
    "runtime"
    "fmt"
)

func test(){
    defer func(){
        fmt.Println(" in defer")
    }()
    for i := 0; i < 10; i++{
        fmt.Print(i)
        if i > 5{
            runtime.Goexit()
        }
    }
}

func main(){
    go test()
    var str string
    fmt.Scan(&str)
}

在这里大家或许有个疑问,下面这两句代码干嘛的呢

 var str string
fmt.Scan(&str)
 
这两句代码是等待输入的意思,在这里用来阻止主线程关闭的。如果没有这两句的话,会发现我们的程序瞬间就结束了,而且什么都没有输出。这是因为主线程关闭之后,所有开启的goroutine都会强制关闭,他还没有来得及输出,就结束了。
 
goroutine之间通过channel来通讯,可以认为channel是一个管道或者先进先出的队列。你可以从一个goroutine中向channel发送数据,在另一个goroutine中取出这个值。
生产者/消费者是最经典的使用示例。生产者goroutine负责将数据放入channel,消费者goroutine从channel中取出数据进行处理。
package main

import (
    "fmt"
)

func main(){
    buf:=make(chan int)
    flg := make(chan int)
    go producer(buf)
    go consumer(buf, flg)
    <-flg //等待接受完成
}

func producer(c chan int){
    defer close(c) // 关闭channel
    for i := 0; i < 10; i++{
        c <- i // 阻塞,直到数据被消费者取走后,才能发送下一条数据
    }
}

func consumer(c, f chan int){
    for{
        if v, ok := <-c; ok{
            fmt.Print(v) // 阻塞,直到生产者放入数据后继续读取数据
        }else{
            break
        }
    }
    f<-1 //发送数据,通知main函数已接受完成
}
 
可以将channel指定为单向通信。比如<-chan int仅能接收,chan<-int仅能发送
channle可以是带缓冲的。make的第二个参数作为缓冲长度来初始化一个带缓冲的channel:
向带缓冲的channel发送数据时,只有缓冲区满时,发送操作才会被阻塞。当缓冲区空时,接收才会阻塞。
可以通过以下程序调整发送和接收的顺序调试
package main

import (
    "fmt"
)

func main(){
    c := make(chan int, 2)
    c <- 1
    c <- 2
    fmt.Println(<-c)
    fmt.Println(<-c)
}

如果有多个channel需要监听,可以考虑用select,随机处理一个可用的channel

 
package main

import (
    "fmt"
)

func main(){
    c := make(chan int)
    quit := make(chan int)
    go func(){
        for i := 0; i < 10; i++{
            fmt.Printf("%d ", <-c)
        }
        quit <- 1
    }()
    testMuti(c, quit)
}

func testMuti(c, quit chan int){
    x, y := 0, 1
    for {
        select{
        case c<-x:
            x, y = y, x+y
        case <-quit:
            fmt.Print("
quit")
            return
        }
    }
}

当一个channel被read/write阻塞时,会被一直阻塞下去,直到channel关闭。产生一个异常退出程序。channel内部没有超时的定时器。但我们可以用select来实现channel的超时机制

线程同步

假设现在我们有两个线程,一个线程写文件,一个线程读文件。如果在读文件的同时,写文件的线程向文件中写数据,就会出现问题。为了保证能够正确的读写文件,在读文件的时候,不能进行写入文件的操作,在写入时,不能进行读的操作。这就需要互斥锁。互斥锁是线程间同步的一种机制,用了保证在同一时刻只用一个线程访问共享资源。go中的互斥锁在sync包中。

package main

import (
    "errors"
    "sync"
    "fmt"
)

func main(){
    m := &MyMap{mp:make(map[string]int), mutex:new(sync.Mutex)}
    go SetValue(m)
    go m.Display()
    var str string
    fmt.Scan(&str)
}

type MyMap struct{
    mp map[string]int
    mutex *sync.Mutex
}

func (this *MyMap)Get(key string)(int, error){
    this.mutex.Lock()
    i, ok := this.mp[key]
    this.mutex.Unlock()
    if !ok{
        return i, errors.New("不存在")
    }
    return i, nil
}

func (this *MyMap)Set(key string, val int){
    this.mutex.Lock()
    defer this.mutex.Unlock()
    this.mp[key] = val
}

func (this *MyMap)Display(){
    this.mutex.Lock()
    defer this.mutex.Unlock()
    for key, val := range this.mp{
        fmt.Println(key, "=", val)
    }
}

func SetValue(m *MyMap){
    var a  rune
    a = 'a'
    for i := 0; i< 10; i++{
        m.Set(string(a+rune(i)), i)
    }
}

原文地址:https://www.cnblogs.com/zitjubiz/p/15375046.html