Golang goroutine channel实现并发和并行
1.为什么要使用goroutine
需求:统计1-100000000的数字中的素数,并打印这些素数
素数:除了1和它本身不能被其他数整除的数
实现方法:
1.传统方法,通过一个for循环判断各个数是不是素数
2.使用并发或者并行的方式,将统计素数的任务分配给多个goroutine去完成,这个时候就用到了goroutine
3.goroutine结合channel
2.进程、线程以及并行、并发
1).关于进程和线程
进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都是自己的地址空间。一个进程至少有5种基本状态,它们是:初始化,执行态,等待状态,就绪状态,终止状态。通俗的讲进程就是一个正在执行的程序。
线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位。
一个进程可以创建多个线程,同一个进程中的多个线程可以并发执行,一个程序要运行至少有一个进程。
2).关于并行和并发
并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。
并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。
通俗的讲多线程程序在单核CPU上面运行就是并发,多线程程序在多核CPU上运行就是并行,如果线程数大于CPU核数,则多线程程序在多个CPU上运行既有并行又有并发
3.Golang中的协程(goroutine)以及主线程
golang中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang程序的主线程上可以起多个协程。Golang中多协程可以实现并行或者并发。
协程:可以理解为用户级线程,这个对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang的一大特色就是从语言层面原生支持协程,在函数或者方法前面加go关键字就可以创建一个协程,可以说golang中的协程就是goroutine。
Golang中的多协程有点类似其他语言中的多线程。
多协程和多线程:Golang中每个goroutine(协程)默认占用内存远比Java、C的线程少。os线程(操作系统线程)一般都有固定的栈内存(通常为2MB左右),一个goroutine(协程)占用内存非常小,只有2kb左右,多协程goroutine切换调度开销方面远比线程要少。这也是为什么越来越多大公司使用Golang的原因之一。
4.Goroutine的使用以及sync.WaitGroup
并行执行需求:
在主线程(可以理解成进程)中,开启一个goroutine,该协程每隔50毫秒输出"你好golang"
在主线程中也每隔50毫秒输出"你好golang",输出10次后,退出程序,要求主线程和goroutine同时执行。
sync.WaitGroup可以实现主线程等待协程执行完毕
5.启动多个Goroutine
再Go语言中实现并发很简单,还可以启动多个goroutine。使用sync.WaitGroup来实现等待goroutine执行完毕
wg.Add(1) //协程计数器加1
wg.Done() //协程计数器减1
wg.Wait() //等待协程执行完毕
6.设置Golang并行运行的时候占用的CPU数量
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值时机器上的CPU核心数。例如再一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
Go1.5版本之前,默认使用的时单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。
//获取当前计算机上面的CPU个数
cpuNum := runtime.NumCPU()
fmt.Println("cpuNum=", cpuNum) //cpuNum= 8
//可以自己设置使用多个CPU
runtime.GOMAXPROCS(cpuNum - 1)
fmt.Println("ok")
7.Goroutine统计素数
8.Channel管道
管道是Go语言再语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(Channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
1.channel类型
channel是一种类型,一种引用类型。声明管道类型的格式如下:
var 变量 chan 元素类型
//举例
var ch1 chan int //声明一个传递整型的管道
var ch2 chan bool //声明一个传递布尔型的管道
var ch3 chan []int //声明一个传递int切片的管道
2.创建channel
声明管道后需要使用make函数初始化之后才能使用。
创建channel的格式如下:
make(chan 元素类型,容量)
3.channel操作
管道由发送(send)、接收(receive)和关闭(close)三种操作
发送和接收都使用<-符号。
使用以下语句定义一个管道:
ch := make(chan int, 3)
//举例
//创建一个能存储10个int类型数据的管道
ch1 := make(chan int, 10)
//创建一个能存储4个bool类型数据的管道
ch2 := make(chan bool, 4)
//创建一个能存储3个[]int切片类型数据的管道
ch3 := make(chan []int, 3)
1).发送(将数据放在管道内)
将一个值发送到管道中。
ch <- 10//把10发送到ch中
2).接收(从管道内取值)
从一个管道中接收值。
x := <-ch//从ch中接收值并赋值给变量x
<-ch//从ch中接收值,忽略结果
3).关闭管道
通过调用内置的close函数来关闭管道
close(ch)
关于关闭管道需要注意的是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭管道,管道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭管道不是必须的。
关闭后的管道由以下特点:
1.对一个关闭的管道再发送值会导致panic
2.对一个关闭的管道进行接收会一直获取值直到管道为空
3.对一个关闭的并且没有值的管道执行接收操作会得到对应类型的零值
4.关闭一个已经关闭的管道会导致panic
4.管道阻塞
1).无缓冲的管道:
如果创建管道的时候没有指定容量,那么称这个管道为无缓冲的管道
无缓冲的管道又称为阻塞的管道。
func main() {
ch := make(chan int)
ch<- 10
fmt.Println("发送成功")
}
上面这段代码能够通过编译,但是执行的时候出现以下错误:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
C:/Study/Golang/Demo/Demo18/main.go:212 +0x5f
exit status 2
5.for range从管道循环取值
当向管道中发送完数据时,我们可以通过close函数来关闭管道
当管道被关闭时,再往该管道发送值会引发panic,从该管道取值的操作会先取完管道中的值,再然后取到的值一直都是对应类型的零值。
9.Goroutine结合Channel管道
需求1:定义两个方法,一个方法给管道里面写数据,一个从管道里面读取数据。要求同步进行。
1.开启一个fn1的协程向管道inChan中写入100条数据
2.开启一个fn2的协程读取inChan中写入的数据
3.注意:fn1和fn2同时操作一个管道
4.主线程必须等待操作完成后才可以退出
10.单向管道
有时候会将管道作为参数再多个任务函数间传递,很多时候再不同的任务函数中使用管道都会对其进行限制,如果限制管道在函数中只能发送或只能接收
//在默认情况下,管道是双向
var chan1 chan int //可读可写 chan int
chan1 = make(chan int, 2)
chan1 <- 10
chan1 <- 12
m1 := <-chan1
m2 := <-chan1
fmt.Println(m1, m2) //10 12
//声明为只写 chan<- int
var chan2 chan<- int
chan2 = make(chan int, 3)
chan2 <- 20
// num := <-chan2 //error:invalid operation: <-chan2 (receive from send-only type chan<- int)
fmt.Println("chan2 ", chan2)
//声明为只读 <-chan int
var chan3 <-chan int
// chan3 <- 30 //invalid operation: chan3 <- 30 (send to receive-only type <-chan int)
11.select多路复用
在某些场景下需要同时从多个管道接收数据,这个时候可以用到golang中提供的select多路复用。
通常情况通道在接收数据时,如果没有数据可以接收将会发生阻塞。
select的是u哦那个类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句,具体格式如下:
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3 <-data:
...
default:
默认操作
}
12.Golang并发安全和锁
1.互斥锁
互斥锁时传统并发编程中对共享资源进行访问控制的重要手段,它由标准库sync中的Mutex结构体类型表示。sync.Mutex类型只有两个公开的指针方法,Lock和Unlock。lock锁定当前的共享资源,Unlock进行解锁。
使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁:当互斥锁释放后,等待的共routine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略时随机的。
虽然使用互斥锁能解决资源争夺问题,但是并不完美,通过全局变量加锁同步来实现通讯,并不利于多个协程对全局变量的读写操作。这个时候我们可以通过另一种方式来实现上面的功能管道
2.读取互斥锁
互斥锁的本质时当一个goroutine访问的时候,其他goroutine都不能访问,这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行。
其实,当我们对一个不会变化的数据只做”读“操作的话,是不存在资源竞争问题的。因为数据是不变的,不管怎么读取,多少goroutine同时读取,都是可以的。
所以问题不是出在”读“上,主要是修改,写就是”写“。修改的数据要同步,这样其他goroutine才可以感知到。所以真正的互斥应该是读取和修改,修改和修改之间,读和读是没有互斥操作的必要的。
因此,衍生出另外一种锁,叫做读写锁
读写锁可以让多个读操作并发,同时读取,但是对于写操作时完全互斥的。也就是说,当一个goroutine进行写操作的时候,其他goroutine既不能进行读操作,也不能进行写操作。
Go中的读写锁由结构体类型sync.RWMutex表示。此类型的方法集合中包含两对方法:
一组是对写操作的锁定和解锁,简称”写锁定“和”写解锁“:
func (*RWMutex) Lock)()
func (*RWMutex) Unlock)()
另一组表示对读操作的锁定和解锁,简称为”读锁定“和”读解锁“:
func (*RWMutex) RLock)()
func (*RWMutex) RUnlock)()
13.Goroutine Recover解决协程中出现的Panic
当一个协程出现异常时,不影响其他协程的正常工作
代码:
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func test() {
for i := 0; i < 10; i++ {
fmt.Println("test() 你好golang-", i)
time.Sleep(time.Millisecond * 100)
}
wg.Done() //协程计数器减1
}
func test2() {
for i := 0; i < 10; i++ {
fmt.Println("test2() 你好golang-", i)
time.Sleep(time.Millisecond * 100)
}
wg.Done() //协程计数器减1
}
func printNum(num int) {
defer wg.Done()
for i := 1; i <= 5; i++ {
fmt.Printf("协程%v打印的第%v条数据
", num, i)
}
}
func PrimeNumber(n int) {
for num := (n-1)*30000 + 1; num <= n*30000; num++ {
if num > 1 {
flag := true
for i := 2; i < num; i++ {
if num%i == 0 {
flag = false
break
}
}
if flag {
// fmt.Println(num, "是素数")
}
}
}
wg.Done()
}
//写数据
func fn1(ch chan int) {
for i := 1; i <= 10; i++ {
ch <- i
fmt.Printf("【写入】数据%v成功
", i)
time.Sleep(time.Millisecond * 50)
}
close(ch)
wg.Done()
}
//读数据
func fn2(ch chan int) {
for v := range ch {
fmt.Printf("【读取】数据%v成功
", v)
time.Sleep(time.Millisecond * 50)
}
wg.Done()
}
//向intChan放入1-120000个数
func putNum(intChan chan int) {
for i := 2; i < 120000; i++ {
intChan <- i
}
close(intChan)
wg.Done()
}
//从intChan中取出数据,并判断是否为素数,如果是, 就把得到的素数放在primeChan中
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
for num := range intChan {
flag := true
for i := 2; i < num; i++ {
if num%i == 0 {
flag = false
break
}
}
if flag {
primeChan <- num
}
}
//要关闭primeChan
// close(primeChan) //如果一个channel关闭了就不能给这个channel发送数据了:panic: close of closed channel
//什么时候关闭primeChan
//给exitChan里面放入一条数据
exitChan <- true
wg.Done()
}
//printPrime打印素数
func printPrime(primeChan chan int) {
// for v := range primeChan {
// fmt.Println(v)
// }
wg.Done()
}
func sayHello() {
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 50)
fmt.Println("Hello, world")
}
wg.Done()
}
func test3() {
//使用defer + recover
defer func() {
//捕获test抛出的panic
if err := recover(); err != nil {
fmt.Println("test()发生错误", err)
}
wg.Done()
}()
//定义一个map
var myMap map[int]string
myMap[0] = "Golang" //error,没有初始化map就对它进行赋值:panic: assignment to entry in nil map
// wg.Done()
}
var count = 0
var mutex sync.Mutex //互斥锁
func test4() {
mutex.Lock() //加锁
count++
fmt.Println("the count is : ", count)
time.Sleep(time.Millisecond)
mutex.Unlock() //解锁
wg.Done()
}
var rwMutex sync.RWMutex //读写互斥锁
//写的方法
func write() {
rwMutex.Lock()
fmt.Println("执行写操作")
time.Sleep(time.Second * 2)
rwMutex.Unlock()
wg.Done()
}
//读的方法
func read() {
rwMutex.RLock()
fmt.Println("----执行读操作")
time.Sleep(time.Second * 2)
rwMutex.RUnlock()
wg.Done()
}
func main() {
// wg.Add(1) //协程计数器加1
// go test() //表示开启一个协程
// wg.Add(1) //协程计数器加1
// go test2() //表示开启一个协程
// // for i := 0; i < 10; i++ {
// fmt.Println("main() 你好golang-", i)
// time.Sleep(time.Millisecond * 20)
// }
//当主进程执行的比较快时,即使协程还没有执行完,但是一旦主进程结束,协程也会一起结束。
// time.Sleep(time.Second)//使用time.Sleep使主进程休眠等待协程执行,协程执行时间不可预估,而且这样会让程序变慢
//使用sync.WaitGroup来监听协程执行
// wg.Wait() //等待协程执行完毕
// fmt.Println("主线程退出")
//多次执行上面的代码,发现每次打印的数字的顺序都不一致。这是因为10个goroutine时并发执行的,而goroutine的调度时随机的。
// //获取当前计算机上面的CPU个数
// cpuNum := runtime.NumCPU()
// fmt.Println("cpuNum=", cpuNum) //cpuNum= 8
// //可以自己设置使用多个CPU
// runtime.GOMAXPROCS(cpuNum - 1)
// fmt.Println("ok")
// for i := 1; i <= 6; i++ {
// wg.Add(1)
// go printNum(i)
// }
// wg.Wait()
// fmt.Println("关闭主线程")
//统计1-100中的素数
//for循环实现
// start := time.Now().Unix()
// for num := 2; num <= 120000; num++ {
// flag := true
// for i := 2; i < num; i++ {
// if num%i == 0 {
// flag = false
// break
// }
// }
// if flag {
// fmt.Println(num, "是素数")
// }
// }
// end := time.Now().Unix()
// fmt.Println(end - start) //11, 当要打印的数字范围很大的时候,for循环需要耗费很多的时间
/*
goroutine实现
统计1-120000中的素数
1 协程 统计 1-30000
2 协程 统计 30001 - 60000
3 协程 统计 60001 - 90000
4 协程 统计 90001 - 120000
*/
// start := time.Now().Unix()
// for i := 1; i <= 4; i++ {
// wg.Add(1)
// go PrimeNumber(i)
// }
// wg.Wait()
// end := time.Now().Unix()
// fmt.Println(end - start) //3秒
// fmt.Println("执行完毕")
//Channel管道
// var ch1 chan int //声明一个传递整型的管道
// var ch2 chan bool //声明一个传递布尔型的管道
// var ch3 chan []int //声明一个传递int切片的管道
// //创建一个能存储10个int类型数据的管道
// ch1 := make(chan int, 10)
// //创建一个能存储4个bool类型数据的管道
// ch2 := make(chan bool, 4)
// //创建一个能存储3个[]int切片类型数据的管道
// ch3 := make(chan []int, 3)
// //1.创建channel
// ch := make(chan int, 3)
// //2.给管道里面存储数据
// ch <- 10
// ch <- 21
// ch <- 32
// //3.获取管道里面的内容
// a := <-ch
// <-ch
// c := <-ch
// fmt.Println(a) //10
// fmt.Println(c) //32
// //4.管道的容量和长度
// fmt.Printf("值:%v,容量:%v,长度:%v", ch, cap(ch), len(ch)) //值:0xc000104080,容量:3,长度:0
//5.管道的类型(引用数据类型)
// ch1 := make(chan int, 4)
// ch1 <- 34
// ch1 <- 44
// ch1 <- 54
// ch2 := ch1
// ch2 <- 24
// <-ch1
// <-ch1
// <-ch1
// d := <-ch1
// fmt.Println(d) //24, 更改ch2时,ch1也改变了,channel是引用数据类型
//6.管道阻塞
// ch3 := make(chan int, 1)
// ch3 <- 34
// ch3 <- 44 //all goroutines are asleep - deadlock!
//在没有使用协程的情况下,如果管道数据已经全部取出,载取就会报deadlock
// ch4 := make(chan string, 2)
// ch4 <- "34"
// ch4 <- "44"
// m1 := <-ch4
// m2 := <-ch4
// m3 := <-ch4
// fmt.Println(m1, m2, m3) //fatal error: all goroutines are asleep - deadlock!
// ch5 := make(chan int, 1)
// ch5 <- 13
// <-ch5
// ch5 <- 24
// <-ch5
// ch5 <- 55
// m4 := <-ch5
// fmt.Println(m4) //55
//循环遍历管道数据
//使用for range遍历通道,当通道被关闭的时候就会退出for range,如果没有关闭管道就会报个错误fatal error: all goroutines are asleep - deadlock!
// var ch1 = make(chan int, 10)
// for i := 1; i <= 10; i++ {
// ch1 <- i
// }
// close(ch1) //关闭管道
// //for range循环遍历管道的值, 注意:管道没有key
// for val := range ch1 {
// fmt.Println(val)
// }
//通过for循环遍历管道的时候管道可以不关闭
// var ch2 = make(chan int, 10)
// for i := 1; i <= 10; i++ {
// ch2 <- i
// }
// //for循环遍历管道的值, 注意:管道没有key
// for i := 0; i < 10; i++ {
// fmt.Println(<-ch2)
// }
// var ch = make(chan int, 10)
// wg.Add(1)
// go fn1(ch)
// wg.Add(1)
// go fn2(ch) //管道时安全的,如果写入数据的速度较慢,读取的操作会等待写入后再读取
// wg.Wait()
// fmt.Println("退出")
//Goroutine结合Channel管道
// start := time.Now().Unix()
// intChan := make(chan int, 1000)
// primeChan := make(chan int, 50000)
// exitChan := make(chan bool, 16) //表示primeChan close
// //存放数字的协程
// wg.Add(1)
// go putNum(intChan)
// //统计素数的协程
// for i := 0; i < 16; i++ {
// wg.Add(1)
// go primeNum(intChan, primeChan, exitChan)
// }
// //打印素数的协程
// wg.Add(1)
// go printPrime(primeChan)
// //判断exitChan是否存满值
// wg.Add(1)
// go func() {
// for i := 0; i < 16; i++ {
// <-exitChan
// }
// //关闭primeChan
// close(primeChan)
// wg.Done()
// }()
// wg.Wait()
// end := time.Now().Unix()
// fmt.Println("执行完毕...", end-start)
// //在默认情况下,管道是双向
// var chan1 chan int //可读可写
// chan1 = make(chan int, 2)
// chan1 <- 10
// chan1 <- 12
// m1 := <-chan1
// m2 := <-chan1
// fmt.Println(m1, m2) //10 12
// //声明为只写
// var chan2 chan<- int
// chan2 = make(chan int, 3)
// chan2 <- 20
// // num := <-chan2 //error:invalid operation: <-chan2 (receive from send-only type chan<- int)
// fmt.Println("chan2 ", chan2)
// //声明为只读
// var chan3 <-chan int
// // chan3 <- 30 //invalid operation: chan3 <- 30 (send to receive-only type <-chan int)
//select多路复用:在某些场景下需要同时从多个管道接收数据,这个时候可以用到golang中提供的select多路复用。
// //1.定义一个管道10个int数据
// intChan := make(chan int, 10)
// for i := 0; i < 10; i++ {
// intChan <- i
// }
// //2.定义一个管道5个string数据
// stringChan := make(chan string, 5)
// for i := 0; i < 5; i++ {
// stringChan <- "hello" + fmt.Sprintf("%d", i)
// }
// //使用select来获取channel里面的数据的时候不需要关闭channel
// for {
// select {
// case v := <-intChan:
// fmt.Printf("从intChan中读取数据%d
", v)
// case v := <-stringChan:
// fmt.Printf("从stringChan中读取数据%v
", v)
// default:
// fmt.Println("所有数据获取完毕")
// return //注意退出
// }
// }
//Goroutine Recover解决协程中出现的Panic
// wg.Add(1)
// go sayHello()
// wg.Add(1)
// go test3()
// wg.Wait()
// for i := 0; i < 20; i++ {
// wg.Add(1)
// go test4()
// }
// wg.Wait()
//go build -race main.go 编译后运行查看, 是否有竞争关系
//读写互斥锁
//开启10个协程执行写操作
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
//开启10个协程执行读操作
for i := 0; i < 10; i++ {
wg.Add(1)
go read()
}
wg.Wait()
}