go——并发(二)

通常程序会被编写为一个顺序执行并完成一个独立任务的代码。

如果没有特别的需求,最好总是这样写代码,因为这种类型的程序通常很容易写,也容易维护。

不过也有一些情况下,并行执行多个任务会有更大的好处。

一个例子是,Web服务需要在各自独立的套接字上同时接收多个数据请求。

每个套接字请求都是独立的,可以完全独立于其它套接字进行处理。

具有并行执行多个请求的能力可以显著提高这类系统的性能。

考虑到这一点,Go语言的语法和运行时直接内置了对并发的支持。

Go语言里的并发指的是能让某个函数独立于其它函数运行的能力

当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元

这个单元会被调度到可用的逻辑处理器上执行。

Go语言运行时的调度器是一个复杂的软件,能管理被创建的所有goroutine并为其分配执行时间

这个调度器在操作系统之上,将操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine

调度器在任何给定的时间,都会全面控制哪个goroutine要在哪个逻辑处理器上运行。

Go语言的并发同步模型来自一个叫作通信顺序进程(Communicating Sequential Processes,CSP)的范型(paradigm)。

CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而不是对数据进行加锁来同步访问

用于在goroutine同步和传递数据的关键数据类型叫做通道(channel)

使用通道可以时编写并发程序更容易,也能够让并发程序出错更少。

1.并发与并行

当运行一个应用程序(如一个IDE或者编辑器)的时候,操作系统会为这个应用程序启动一个进程。

可以将这个进程看作包含了一个应用程序在运行中需要用到和维护的各种资源的容器

上图展示了一个包含所有可能分配的常用资源的进程。

这些资源包括但不限于内层地址空间、文件和设备句柄以及线程。

一个线程是一个执行空间,这个空间会被操作系统调度来运行函数中所写的代码。

每个进程至少有一个线程,每个线程的初始线程被称作主线程

因为执行这个线程的空间是应用程序的本身空间,所以当主线程终止时,应用程序也会终止。

操作系统将线程调度到某个处理器上运行,这个处理器并不一定是进程所在的处理器。

不同操作系统使用的线程调度算法一般不一样,但这种不同会被操作系统屏蔽,并不会展示给程序员。

操作系统会在物理处理器上调度线程来运行,而Go语言在运行时会在逻辑处理器上调度goroutine来运行

每个逻辑处理器都分别绑定到单个操作系统线程

在1.5版本上,Go在运行默认会为每个可用的物理处理器分配一个逻辑处理器。

在1.5之前的版本中,默认给整个应用程序只分配一个逻辑处理器。

这些逻辑处理器会用于执行所有被创建的goroutine

即便只有一个逻辑处理器,Go也可以以神奇的效率和性能并发调度无数个goroutine。

在上图中展示了操作系统线程、逻辑处理器和本地运行队列之间的关系。

如果创建一个gotoutine并准备运行,这个gotoutine就会被放到调度器的全局运行队列中。

之后,调度器就会将这些队列中的goroutine分配给逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中。

本地运行队列中的goroutine会一直等待直到自己被分配的逻辑处理器执行。

有时,正在运行的goroutine需要执行一个阻塞的系统调用,如打开一个文件。

当这类调用发生时,线程和goroutine会从逻辑处理器上分离,该线程会继续阻塞,等待系统调用的返回

于此同时,这个逻辑处理器就失去了用来运行的线程。所以,调度器会创建一个新线程,并将其绑定到该逻辑处理器上。

之后,调度器会从本地运行队列里选择另一个goroutine来运行

一旦被阻塞的系统调用执行完成并返回,对应的goroutine会放回到本地运行队列,而之前的线程会保存好,以便之后可以继续使用。

如果一个goroutine需要做一个网络I/O调用,流程上会有些不一样。

在这种情况下,goroutine会和逻辑处理器分离,并移到继承了网络轮询器的运行时

一旦该轮询器指示了某个网络读或者写操作已经就绪,对应的goroutine就会重新分配到逻辑处理器上来完成操作。

调度器对可以创建的逻辑处理器的数量没有限制,但语言运行时默认限制每个程序最多创建10000个线程。

这个限制值可以通过调用runtime/debug包的SetMaxThreads方法来更改。如果程序试图使用更多的线程,就会崩溃。

并发不是并行。并行是让不同的代码片段同时在不同的物理处理器上执行

并行的关键是同时做很多事情,而并发是指同时管理很多事情,这些事情可能只做了一般就被暂停去做别的事情去了。

在很多情况下,并发的效果比并行好,因为操作系统和硬件的总资源一般很少,但能支持系统同时做很多事情。

这种“使用较少的资源做更多的事情”的哲学,也是Go语言设计的哲学。

如果希望让goroutine并行,必须使用多于一个逻辑处理器。

当有多个逻辑处理器时,调度器会将goroutine平等分配到每个逻辑处理器上。这会让goroutine在不同的线程上运行。

不过要想真的实现并行的效果,用户需要让自己的程序运行在有多个物理处理器的机器上。

否则,哪怕Go语言运行时使用了多个线程,goroutine依然会在同一个物理处理器上并发执行,达不到并行的效果

上图展示了在一个逻辑处理器上并发运行goroutine和在两个逻辑处理器上并行运行两个并发的goroutine之间的区别。

2.goroutine

 示例1:创建goroutine

//这个程序展示如何创建goroutine
//以及调度器的行为
package main

import (
	"fmt"
	"runtime"
	"sync"
)

//main是程序的入口
func main() {
	//分配一个逻辑处理器给调度器使用
	runtime.GOMAXPROCS(1)

	//wg用来等待程序完成
	//通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。
	var wg sync.WaitGroup
	//计数加2,表示要等待两个goroutine
	wg.Add(2)

	fmt.Println("Start Goroutines")

	//声明一个匿名函数,并创建一个goroutine
	go func() {
		//在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		//显式字母表三次
		for count := 0; count < 3; count++ {
			for char := 'a'; char < 'a'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	go func() {
		defer wg.Done()

		for count := 0; count < 3; count++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	fmt.Println("waiting To finish")
	//等待goroutine结束
	wg.Wait()

	fmt.Println("
Terminating Program")
}

结果展示:

Start Goroutines
waiting To finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v w x y z
a b c d e f g h i j k l m n o p q r s t u v w x y z 
Terminating Program

第一个goroutine完成所有显式所花费的时间太短,以至于在调度器切换到第二个goroutine之前,就完成了所有任务。

GOMAXPROCS函数允许程序更改调度器可以使用的逻辑处理器的数量。

WaitGroup是一个计数信号量,可以用来记录并维护运行的goroutine。如果WaitGroup的值大于0,那么Wait方法就会阻塞。

defer保证每个goroutine一旦完成其工作就调用Done方法。

基于调度器的内部算法,一个正运行的goroutine在工作结束前,可以被停止并重新调度。

调度器这样做的目的是防止某个goroutine长时间占用逻辑处理器。

当goroutine占用时间过长时,调度器会停止当前正运行的goroutine,并给其它可运行的goroutine运行的机会。

上图从逻辑处理器的角度展示了这一场景。在第1步,调度器开始运行goroutine A,而goroutine B在运行队列里等待调度。

之后,在第2步,调度器交换了goroutine A和goroutine B,由于goroutine A并没有完成工作,因此被放回到运行队列。

之后,在第3步,goroutine B完成了它的工作并被销毁。这也让goroutine A继续之前的工作。

可以通过创建一个需要长时间才能完成的其工作的goroutine来看到这个行为。

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var wg sync.WaitGroup

func main() {
	runtime.GOMAXPROCS(1)

	wg.Add(2)

	fmt.Println("create goroutine")
	go printPrime("A")
	go printPrime("B")

	fmt.Println("waiting to finish")
	wg.Wait()
	fmt.Println("Terminating program")

}

func printPrime(prefix string) {
	defer wg.Done()

next:
	for outer := 2; outer < 5000; outer++ {
		for inner := 2; inner < outer; inner++ {
			if outer%inner == 0 {
				continue next
			}
		}
		fmt.Printf("%s:%d
", prefix, outer)
	}
	fmt.Println("completed", prefix)
}


/*
create goroutine
waiting to finish
B:2
B:3
...
B:4787
B:4789
A:2      //切换goroutine
A:3
...
A:3919
A:3923
B:4793   //切换goroutine
B:4799
...
B:4993
B:4999
completed B
A:3929   //切换goroutine
A:3931
...
A:4999
completed A
Terminating program
*/

  

3.竞争状态

如果两个或者多个goroutine在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争状态。

竞争状态的存在是让并发程序变得复杂的地方,十分容易引起潜在的问题。

对一个共享资源的读和写操作必须是原子化的,换句话说,同一时刻只能有一个goroutine对共享资源进行读和写操作。

//如何在程序里造成竞争状态
//实际上不希望出现这种情况
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	//counter是所有goroutine都要增加其值的变量
	counter int

	//wg用来等待程序结束
	wg sync.WaitGroup
)

func main() {
	//计数加2,要等待两个goroutine
	wg.Add(2)

	//创建两个goroutine
	go inCounter(1)
	go inCounter(2)

	//等待goroutine结束
	wg.Wait()
	fmt.Println("Final Counter:", counter)
}

//inCounter增加包里counter变量的值
func inCounter(id int) {
	//在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		//捕获counter的值
		value := counter

		//当前goroutine从线程退出,并放回队列
		runtime.Gosched()

		//增加本地value变量的值
		value++

		//将值保存回counter
		counter = value

	}
}  

结果:

Final Counter: 2

每个goroutine都会覆盖另一个goroutine的工作。这种覆盖发生在goroutine切换的时候。

一种修正代码、消除竞争状态的办法是,使用Go语言提供的锁机制,来锁住共享资源,从而保证goroutine的同步状态。

4.锁住共享资源

Go语言提供了传统的同步goroutine的机制,就是对共享资源加锁。

如果需要顺序访问一个整型变量或者一段代码,atomic和sync包里的函数提供了很好的解决方法。

(1)原子函数

//展示如何使用atomic包来提供对数值类型的安全访问
package main

import (
	"fmt"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	//counter是所有goroutine都要增加其值的变量
	counter int64

	//wg用来等待程序结束
	wg sync.WaitGroup
)

func main() {
	//计数加2,要等待两个goroutine
	wg.Add(2)

	//创建两个goroutine
	go inCounter(1)
	go inCounter(2)

	//等待goroutine结束
	wg.Wait()
	fmt.Println("Final Counter:", counter)
}

//inCounter增加包里counter变量的值
func inCounter(id int) {
	//在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		atomic.AddInt64(&counter, 1)

		runtime.Gosched()
	}
}

/*
结果:
Final Counter: 4
*/

  

AddInt64这个函数会同步整型值得加法,,方法是强制同一时刻只能有一个goroutine运行并完成这个加法操作。

当goroutine试图去调用任何原子函数时,这些goroutine都会自动根据所引用得变量做同步处理。

另外两个有用得原子函数是LoadInt64和StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式。

//这个示例程序展示如何使用atomic包里的Store和Load类型函数来提供对数值类型的安全访问。
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	//shutdown是通知正在执行的goroutine停止工作的标志
	shutdown int64

	//wg用来等待程序结束
	wg sync.WaitGroup
)


func main() {

	wg.Add(2)

	//创建两个goroutine
	go doWork("A")
	go doWork("B")

	//给定goroutine执行时间
	time.Sleep(1 * time.Second)

	//该停止工作了,安全设置shutdown标志
	fmt.Println("Shutdown Now")
	atomic.StoreInt64(&shutdown, 1)


	wg.Wait()
}

//doWork用来模拟执行工作的goroutine
//检测之前的shutdown标志来决定是否提前终止
func doWork(name string) {
	defer wg.Done()

	for {
		fmt.Printf("Doing %s Work
", name)
		time.Sleep(250 * time.Millisecond)

		//要停止工作了吗?
		if atomic.LoadInt64(&shutdown) == 1 {
			fmt.Printf("Shutting %s Down
", name)
			break
		}
	}
}

  

在上面这个例子中,启动了两个goroutine,并完成了一些工作。在各自循环的每次迭代之后,goroutine会使用LoadInt64来检查shutdown变量的值。

这个函数会安全地返回shutdown变量地一个副本。如果这个副本地值为1,goroutine就会跳出循环并终止。

main函数使用StoreInt64函数来安全地修改shutdown变量地值。如果哪个doWork goroutine试图在main函数调用StoreInt64的同时调用LoadInt64函数,

那么原子函数会将这些调用互相同步,保证这些操作都是安全的,不会进入竞争状态。

(2)互斥锁

另一种同步访问共享资源的方式是使用互斥锁(mutex)。

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个goroutine可以执行这个临界区代码。

//展示互斥锁,定义临界区
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	counter int
	wg      sync.WaitGroup
	//mutex用来定义一段代码临界区
	mutex sync.Mutex
)

func main() {
	wg.Add(2)

	go inCounter(1)
	go inCounter(2)

	wg.Wait()
	fmt.Println("Final Counter:", counter)
}

func inCounter(id int) {

	defer wg.Done()

	for count := 0; count < 2; count++ {
		//同一时刻只允许一个goroutine进入
		//定义临界区
		mutex.Lock()
		{
			value := counter

			runtime.Gosched()

			value++

			counter = value
		}
		mutex.Unlock()
		//释放锁,允许其它正在等待的goroutine进入临界区
	}
}

/*
结果:
Final Counter: 4
*/

函数Lock()和Unlock()定义临界区,会将其中的代码保护起来。

原文地址:https://www.cnblogs.com/yangmingxianshen/p/10102302.html