channel 并发编程和超时控制

golang中,通过关键字go可以创建goroutine,一个函数可以被创建多个goroutine,一个goroutine必定对应一个函数

go创建goroutine与channel返回数据

为一个普通函数创建goroutine的写法:

go 函数名(参数列表)

使用go函数是,被调用函数的返回值会被忽略,因此如果需要在goroutine中返回数据,往往需要结合channel使用,通过通道将数据从goroutine中作为返回值传出

例子:

package main

import (
	"fmt"
	"time"
)

func task(task_id int, sleeptime int, resCh chan string) {
	time.Sleep(time.Second * time.Duration(sleeptime))
	resCh <- fmt.Sprintf("任务序号:%d ,执行完成", task_id)
	return
}

func main() {
	startTime := time.Now()
	fmt.Println("子goroutine创建:")
	// 假设有10个任务需要执行,每个任务需要执行,并发执行
	inputs := []int{2, 3, 4, 2, 1, 2, 3, 2, 5, 2}
	resCh := make(chan string, len(inputs)) // 任务数量

	for i, sleeptime := range inputs {
		go task(i, sleeptime, resCh)
	}

	for range inputs {
		fmt.Println(<-resCh)
	}

	endTime := time.Now()
	fmt.Printf("子goroutine运行结束,耗时 %s. 任务数量: %d", endTime.Sub(startTime), len(inputs))
}

输出结果:

子goroutine创建:
任务序号:4 ,执行完成
任务序号:3 ,执行完成
任务序号:0 ,执行完成
任务序号:9 ,执行完成
任务序号:5 ,执行完成
任务序号:7 ,执行完成
任务序号:1 ,执行完成
任务序号:6 ,执行完成
任务序号:2 ,执行完成
任务序号:8 ,执行完成
子goroutine运行结束,耗时 5.014132s. 任务数量: 10

这里有10个任务,同时开启10个goroutine执行,耗时为5秒(受限为最长耗时的一个任务的执行时间)

channel控制并发度(blocking特性)

但是这里可以看到实际开发中存在一个问题,假如任务数量为10000或者更多,同时开启1w个goroutine是不太合理的,可能会占用过多资源,因此我们需要控制并发度。
这里可以利用channel的缓冲机制,当缓冲满了,goroutine就会自动阻塞,直到channel数据可以被读取为止。

package main

import (
	"fmt"
	"time"
)

func task(task_id int, sleeptime int, resCh chan string) {
	time.Sleep(time.Second * time.Duration(sleeptime))
	resCh <- fmt.Sprintf("任务序号:%d ,执行完成", task_id)
	return
}

func main() {
	startTime := time.Now()
	fmt.Println("子goroutine创建:")
	// 假设有10个任务需要执行,每个任务需要执行,并发执行
	inputs := []int{2, 3, 4, 2, 1, 2, 3, 2, 5, 2}
	resCh := make(chan string, len(inputs))

	limitCh := make(chan bool, 2) // 并发度为2
	limitFunc := func(limitCh chan bool, task_id int, sleeptime int, resCh chan string) {
		task(task_id, sleeptime, resCh)
		<-limitCh
	}

	// 限制并发度的关键在于,开启多个任务时,往channel(limitCh)缓冲写入数据,任务执行(子goroutine)完成时读出数据,当channel缓冲满时不能读出就会阻塞任务的执行
	for i, sleeptime := range inputs {
		limitCh <- true
		go limitFunc(limitCh, i, sleeptime, resCh)
	}

	for range inputs {
		fmt.Println(<-resCh)
	}

	endTime := time.Now()
	fmt.Printf("子goroutine运行结束,耗时 %s. 任务数量: %d", endTime.Sub(startTime), len(inputs))
}

当limitCh的缓冲大小设置为2时,结果为:子goroutine运行结束,耗时 14.0578327s. 任务数量: 10
当limitCh的缓冲大小设置为1时,结果为:子goroutine运行结束,耗时 26.0864911s. 任务数量: 10

使用select控制超时

package main

import (
	"fmt"
	"time"
)

func Run(task_id int, sleeptime int, resCh chan string) {
	runCh := make(chan string)
	go task(task_id, sleeptime, runCh)
	select {
	case re := <-runCh:
		// 任务执行正常
		resCh <- re
	case <-time.After(3 * time.Second): // 这里定义3秒超时,也就是说不管任务的sleeptime多长,最久3秒返回,避免长时间阻塞
		re := fmt.Sprintf("任务序号:%d ,执行超时", task_id)
		resCh <- re
	}
}

func task(task_id int, sleeptime int, resCh chan string) {
	time.Sleep(time.Second * time.Duration(sleeptime))
	//fmt.Println("正在执行任务:", task_id)
	resCh <- fmt.Sprintf("任务序号:%d ,执行完成", task_id)
	return
}

func main() {
	startTime := time.Now()
	fmt.Println("子goroutine创建:")
	// 假设有10个任务需要执行,每个任务需要执行,并发执行
	inputs := []int{2, 3, 4, 2, 1, 2, 3, 2, 5, 2}
	resCh := make(chan string, len(inputs))

	limitCh := make(chan bool, 2) // 并发度为2
	limitFunc := func(limitCh chan bool, task_id int, sleeptime int, resCh chan string) {
		Run(task_id, sleeptime, resCh)
		<-limitCh
	}

	// 限制并发度的关键在于,开启多个任务时,往channel(limitCh)缓冲写入数据,任务执行(子goroutine)完成时读出数据,当channel缓冲满时不能读出就会阻塞任务的执行
	for i, sleeptime := range inputs {
		limitCh <- true
		go limitFunc(limitCh, i, sleeptime, resCh)
	}

	for range inputs {
		fmt.Println(<-resCh)
	}

	endTime := time.Now()
	fmt.Printf("子goroutine运行结束,耗时 %s. 任务数量: %d", endTime.Sub(startTime), len(inputs))
}

执行结果:
子goroutine创建:
任务序号:0 ,执行完成
任务序号:1 ,执行超时
任务序号:3 ,执行完成
任务序号:2 ,执行超时
任务序号:4 ,执行完成
任务序号:5 ,执行完成
任务序号:6 ,执行完成
任务序号:7 ,执行完成
任务序号:9 ,执行完成
任务序号:8 ,执行超时
子goroutine运行结束,耗时 12.0359419s. 任务数量: 10
原文地址:https://www.cnblogs.com/chq3272991/p/15390772.html