【Go】并发问题 channel

并行与并发

并发:同一时间段内执行多个任务;(你用微信和两个女朋友聊天)

并行:同一时刻执行多个任务;(你和你的朋友都在和同一个人聊天)

Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。

Go语言还提供channel在多个goroutine间进行通信。goroutinechannel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

goroutine

​ 在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换,这一切通常会耗费程序员大量的心智。那么能不能有一种机制,程序员只需要定义很多个任务,让系统去帮助我们把这些任务分配到CPU上实现并发执行呢?

​ Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

​ 在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。

使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

goroutnine示例:

package main

import (
	"fmt"
	"time"
)

// 程序启动之后会创建一个主 Goroutine去执行;
func main() {
	//单独开启一个goroutine 去执行hello函数(任务)

	//第一种情况会10个里面会出现重复:
	//其原因是因为函数传值时:是"闭包",外层的for循环累加;取到的i会变化;
	fmt.Println("第一种情况")
	
	for i := 0; i < 10; i++ {
		go func() {
			fmt.Println(i)
		}()
	}

	time.Sleep(time.Second)

	//第二种情况其实是解决第一种情况所出现的办法
	fmt.Println("第二种情况")
	for i := 0; i < 10; i++ {
		go func(i int) {
			fmt.Println(i)
		}(i)
	}
}

启动多个goroutine

在Go语言中实现并发就是这样简单,我们还可以启动多个goroutine。让我们再来一个例子: (这里使用了sync.WaitGroup来实现goroutine的同步)

package main

import (
   "fmt"
   "math/rand"
   "sync"
   "time"
)

// waitGroup

//随机数的用法
//1、创建随机种子
//2、rand.Int() -- return int64 的随机数
//3、rand.Intn(x) -- return %x  的随机数

//func f() {
// rand.Seed(time.Now().UnixNano())
// for i := 0; i < 5; i++ {
//    r1 := rand.Int()    // int64
//    r2 := rand.Intn(10) // [0,10)
//    fmt.Println(r1, "  ", r2)
// }
//}

var wg sync.WaitGroup

func f1(i int) {
   //任务解决
   defer wg.Done()
   rand.Seed(time.Now().UnixNano())
   time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
   fmt.Println("###:", i)
}
func main() {

   //测试随机数的用法
   //f()

   //利用WaitGroup判断所有的goroutine都结束;
   for i := 0; i < 10; i++ {
      //添加任务
      wg.Add(1)
      go f1(i)
   }
   //等待任务全部清零 wg 的计数器清零;
   wg.Wait()
}

goroutine与线程

可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB);

一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;

P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。

我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:

package main

import (
	"fmt"
	"runtime"
	"sync"
)

// GOMAXPROCS

var wg sync.WaitGroup

func a() {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		fmt.Printf("A: %d
", i)
	}
}

func b() {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		fmt.Printf("B: %d
", i)
	}
}
func main() {
	//默认CPU的逻辑核心数,默认跑满整个CPU
	runtime.GOMAXPROCS(1)
	wg.Add(2)
	go a()
	go b()
	wg.Wait()
}

Go语言中的操作系统线程和goroutine的关系:

  1. 一个操作系统线程对应用户态多个goroutine。
  2. go程序可以同时使用多个操作系统线程。
  3. goroutine和OS线程是多对多的关系,即m:n。

Goroutine. 调度模型

GMP

M:N :把M个goroutine分配给n个操作系统线程

goroutine初始栈的初始大小是:2K


channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

channel是一种类型,一种引用类型。声明通道类型的格式如下:

var 变量 chan 元素类型

举几个例子:

var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

创建channel

通道是引用类型,通道类型的空值是nil

var ch chan int
fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化之后才能使用。

创建channel的格式如下:

make(chan 元素类型, [缓冲大小])

channel的缓冲大小是可选的。

举几个例子:

ch4 := make(chan int)
ch5 := make(chan bool)  
ch6 := make(chan []int)
package main

import (
	"fmt"
	"sync"
)

// 1. 生成100个数 放进 ch1
// 2. 从 ch1 中取数 平方后放进 ch2
// 3. 从 ch2 中打印

var wg sync.WaitGroup
var once sync.Once

func f1(ch1 chan<- int) {
	defer wg.Done()
	for i := 0; i < 1000; i++ {
		ch1 <- i
	}
	close(ch1)
}

func f2(ch1 <-chan int, ch2 chan<- int) {
	defer wg.Done()
	for {
		i, ok := <-ch1
		if !ok {
			break
		}
		ch2 <- i * i
	}
	once.Do(func() { close(ch2) })
}
func main() {
	ch1 := make(chan int, 1000)
	ch2 := make(chan int, 1000)
	wg.Add(3)

	go f1(ch1)
	go f2(ch1, ch2)
	go f2(ch1, ch2)

	for ret := range ch2 {
		fmt.Println(ret)
	}
}

channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

现在我们先使用以下语句定义一个通道:

ch := make(chan int)

发送

将一个值发送到通道中。

ch <- 10 // 把10发送到ch中

接收

从一个通道中接收值。

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

关闭

我们通过调用内置的close函数来关闭通道。

close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。
package main

import "fmt"

var a []int
var b chan int

//channel作为引用类型
//需要Make初始化才能使用;

//其余另外两个是 : slice , map
func main() {
	fmt.Println(b)        // nil
	b = make(chan int, 1) // 不带缓冲区的 通道
	b <- 10
	t := <-b
	println(t)
	b = make(chan int, 16) // 带有缓冲区的 通道

}

worker pool(goroutine池)

在工作中我们通常会使用可以指定启动的goroutine数量–worker pool模式,控制goroutine的数量,防止goroutine泄漏和暴涨。

一个简易的work pool示例代码如下:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

//使用goroutine和channel实现一个计算int64随机数各位数和的程序。
//1.开启一个goroutine循环生成int64类型的随机数,发送到jobChan
//2.开启24个goroutine从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
//3.主goroutine从resultChan取出结果并打印到终端输出

type job struct {
	value int64
}

type result struct {
	job *job
	sum int64
}

var jobChan = make(chan *job, 100)
var resultChan = make(chan *result, 100)

var wg sync.WaitGroup

func generate_num(ch1 chan<- *job) {
	defer wg.Done()
	for {
		x := rand.Int63()
		newJob := &job{
			value: x,
		}
		ch1 <- newJob
		time.Sleep(time.Millisecond * 500)
	}
}

func summary_num(ch1 <-chan *job, ch2 chan<- *result) {
	defer wg.Done()
	for {
		newjob := <-ch1
		n := newjob.value
		sum := int64(0)
		for n > 0 {
			sum += n % 10
			n = n / 10
		}
		newResult := &result{
			job: newjob,
			sum: sum,
		}
		ch2 <- newResult
	}
}

func main() {
	//1.开启一个goroutine循环生成int64类型的随机数,发送到jobChan

	wg.Add(1)
	go generate_num(jobChan)

	//2.开启24个goroutine从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
	wg.Add(24)
	for i := 0; i < 24; i++ {
		go summary_num(jobChan, resultChan)
	}

	//3.主goroutine从resultChan取出结果并打印到终端输出
	for x := range resultChan {
		fmt.Printf(" value : %d , sum : %d
", x.job.value, x.sum)
	}

	wg.Wait()
}


select多路复用

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。你也许会写出如下代码使用遍历的方式来实现:

for{
    // 尝试从ch1接收值
    data, ok := <-ch1
    // 尝试从ch2接收值
    data, ok := <-ch2
    …
}

这种方式虽然可以实现从多个通道接收值的需求,但是运行性能会差很多。为了应对这种场景,Go内置了select关键字,可以同时响应多个通道的操作。

select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

举个小例子来演示下select的使用:

package main

import "fmt"

//select多路复用
//样例说明:
//1. i = 0 时,当前ch通道为空,A 不能执行,所以只能执行 B
//2. i = 1 时,当前ch通道已满, B 不能执行,所以只能执行 A

func main() {
	ch := make(chan int, 1)
	for i := 0; i < 10; i++ {
		select {
		// case A
		case x := <-ch:
			fmt.Println(x)
		// case B
		case ch <- i:
		}
	}
}

使用select语句能提高代码的可读性。

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个。
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数。

原文地址:https://www.cnblogs.com/Osea/p/14492490.html