gorutine管理(context)

前言

如有父gorutine在后台启动了1个gorutine(日志采集模块一直taill日志文件的内容是否新增然后发送到kafka),父gorutine突然得知这个日志路径变了。
由于开启的这个日志采集子gorutine是在后台一直执行的......总不能重启线上服务/重新加载配置更不能os.Exit(),那么父gorutine如何让这个一直忙着干活的子gorutine退出呢
我们就可以让子gorutine携带1个Context,子gorutine携带了这个Context,父gorutine就可以通过这个Context达到跟踪、退出子gorutines的目的。
 
Go语言官方为我们提供了这个Context,中文可以称之为“上下文”。 
context官方提供用于帮助我们以一种优雅的方式通过父gorutine追踪、退出无法自行退出的子gorutines内置包

 

问题

当1个子gorutine已经被开启的时候,我们如何在不结束自己(父gorutine)的前提下,结束这个衍生的子gorutine呢?
package main

import (
	"fmt"
	"time"
)

import (
	"sync"
)

var wg sync.WaitGroup


//toSchool 子gorutine
func toSchool() {
	defer wg.Done()
	for {
		fmt.Println("walking~~~~")
		time.Sleep(time.Second * 1)
	}
}

func main() {
	wg.Add(1)
	//当1个子gorutine开启的时候....
	go toSchool()
	fmt.Println("台风来了!")
	//我们如何在不结束自己的前提下结束这些衍生的子gorutine呢?
	wg.Wait()
}

  

自由解决方案

既然是context是golang官方提出的标准方案,相对而言也会有自由解决方案。

1.通过全局变量控制子gorutine退出

package main

import (
	"fmt"
	"time"
	"sync"
)

var wg sync.WaitGroup
var canToSignel=true
func toSchool() {
	defer wg.Done()
	//不断地监测这个信号
	for canToSignel {
		fmt.Println("I'm on the way walking to scholl~~~~")
		time.Sleep(time.Second * 1)
	}
}

func main() {
	wg.Add(1)
	go toSchool()
	time.Sleep(time.Second*10)
	fmt.Println("The typhoon is coming!")
	//修改全局变量(信号)
	canToSignel=false
	wg.Wait()
}

  

  

2.通过全局channel控制gorutine退出

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

//全局channel
var signalChan = make(chan bool, 1)

func child() {
	defer wg.Done()
	for {
		fmt.Println("我是child gorutine")
		time.Sleep(time.Second * 2)
		//检测全局channel中是否有消息推送
		select {
		case <-signalChan:
			return
		default:
		}
	}
}

func main() {
	wg.Add(1)
	go child()
	time.Sleep(time.Second * 10)
	signalChan <- true
	wg.Wait()

}

or

//通过往channel中发送信号的方式
var canToChannel = make(chan bool, 1)

func toSchool() {
	defer wg.Done()
	//不断地监测这个信号
	select {
	case <-canToChannel:
		break
	default:
		fmt.Println("I'm on the way walking to scholl~~~~")
		time.Sleep(time.Second * 1)
	}
}

func main() {
	wg.Add(1)
	go toSchool()
	time.Sleep(time.Second * 10)
	fmt.Println("The typhoon is coming!")
	//提交退出信号
	canToChannel<-true
	wg.Wait()
}

  

退出和追踪衍生gorutines的方式有很多,我们不使用go内置的context也能完全解决这一问题,但是每个程序员使用的解决方案不同的话,就会增加代码的阅读难度。

 

官方解决方案(context)

Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.

Incoming requests to a server should create a Context, and outgoing calls to servers should accept a Context.

The chain of function calls between them must propagate the Context,

optionally replacing it with a derived Context

created using WithCancel, WithDeadline, WithTimeout, or WithValue.

When a Context is canceled, all Contexts derived from it are also canceled.

什么是context?

context.Context是一个接口,该接口定义了四个需要实现的方法。具体签名如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

context可以定义Context类型,专门用来简化 对于处理1个请求的N个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

创建使用context

Background()和TODO() (根节点)

Go内置两个函数:Background()TODO(),这两个函数分别返回一个实现了Context接口的backgroundtodo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。

Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context

TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。(必须要传递1个context类型的参数)

todo本质上也是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context

WithCancel(取消/退出)
主要用在父gorutine,控制子gorutine退出。
cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
NOTE: acquiring the child's lock while holding parent's lock.
特性:一旦 根节点的gorutine执行cancel() 关闭的时候,它的所有后代都将被关闭。
 
ps:
可以把WithCancel这种context和gorutine一起封装到同1个struct里面。your kown for canceling.
//1个具体的日志收集任务(TaillTask)
type TaillTask struct {
	path     string
	topic    string
	instance *tail.Tail
	//exit task
	ctx  context.Context
	exit context.CancelFunc
}

more

package taillog

import (
	"context"
	"github.com/hpcloud/tail"
	"fmt"
	"jd.com/logagent/kafka"
)

//1个具体的日志收集任务(TaillTask)
type TaillTask struct {
	path     string
	topic    string
	instance *tail.Tail
	//exit task
	ctx  context.Context
	exit context.CancelFunc
}

//实例化1个具体的日志收集任务(TaillTask)
func (T *TaillTask) NewTaillTask(path, topic string)(task *TaillTask,err error){
	ctx, cancel := context.WithCancel(context.Background())
	task=&TaillTask{path: path, topic: topic, ctx: ctx, exit: cancel,}
	//taill 文件配置
	config := tail.Config{
		ReOpen:    true,                                 //重新打开文件
		Follow:    true,                                 //跟随文件
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读
		MustExist: false,                                //文件不存在不报错
		Poll:      true,
	}
	//给task任务填充taill(1个具体打开文件的taillobj)
	task.instance, err = tail.TailFile(task.path, config)
	if err != nil {
		fmt.Println("文件打开失败", err)

	}
	//直接去采集日志
	go task.run()
	return
}

//从tailobj中读取日志内容---->kafka topic方法
func (T *TaillTask)run() {
	fmt.Printf("开始收集%s日志
",T.path)
	for {
		select {
		//父进程调用了cancel
		case <-T.ctx.Done():
			fmt.Printf("taill任务%s%s退出了...
",T.topic,T.path)
			return
		case line := <-T.instance.Lines:
			fmt.Printf("从%s文件中获取到内容%s",T.path,line.Text)
			//taill采集到数据-----channel------>kafka 异步
			kafka.SendToChan(T.topic, line.Text)
		}

	}

}

  

 
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func grandChild(ctx context.Context) {
	defer wg.Done()
	for {
		time.Sleep(time.Second*1)
		fmt.Println("grandchild function ")
		select{
		//<-chan struct{}		
		case <-ctx.Done():
			return
		default:		
		}
	}

}

func child(ctx context.Context) {
	defer wg.Done()
	go grandChild(ctx)
	for {
		time.Sleep(time.Second*5)
		fmt.Println("child function ")
		select{
		//<-chan struct{}		
		case <-ctx.Done():
			return
		default:		
		}
	}
}

func main() {
	//定义1个全局的context类型的变量
	ctx, cancel := context.WithCancel(context.Background())
	wg.Add(2)
	go child(ctx)
	time.Sleep(time.Second * 10)
	//退出
	/*
   cancel closes c.done, cancels each of c's children, and, if removeFromParent is true, removes c from its parent's children.
	*/
	cancel()
	wg.Wait()
}

  

案例2

package main

import (
	"context"
	"fmt"
)

func gen(ctx context.Context) <-chan int {
	//定义1个destnation channel
	dest := make(chan int)
	n := 1
	//匿名函数协程不断得给这个dest channel中输入数字
	go func() {
		for {
			select {
			//context结束该匿名函数协程结束
			case <-ctx.Done():
				return
			case dest <- n:
				n++
			}

		}
	}()
	return dest
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	//调用
	for n := range gen(ctx) {
		fmt.Println(n)
		if n == 5 {
			//main函数结束之后,调用了context取消
			return
		}
	}

}
WithDeadline(绝对超时时间)

当context的截止日过期时, ctx.Done()返回后context deadline exceeded。

import (
	"context"
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func connectMyql(ctx context.Context) {
	defer wg.Done()
	for {
		time.Sleep(time.Second * 1)
		fmt.Println("我连我连...我连莲莲....")
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			return
		default:
		}

	}
}

func main() {
	//设置context 10秒钟之后过期
	d := time.Now().Add(time.Second * 10)
	ctx, cancel := context.WithDeadline(context.Background(), d)
	/*
	尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
	如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
	*/
	defer cancel()
	wg.Add(1)
	go connectMyql(ctx)
	wg.Wait()

}
WithTimeout(相对超时时间)
和WithDeadline 是一对,过期之后context超时context deadline exceeded
 
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func connectMyql(ctx context.Context) {
	defer wg.Done()
	for {
		time.Sleep(time.Second * 1)
		fmt.Println("我连我连...我连莲莲....")
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			return
		default:
		}

	}
}

func main() {
	//设置context 从当前时间开始10秒钟之后过期(决对时间)
	// d := time.Now().Add(time.Second * 10)
	// ctx, cancel := context.WithDeadline(context.Background(), d)
	//设置相对时间 5秒钟后过期(相对时间)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	/*
		尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践
		如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
	*/
	defer cancel()
	wg.Add(1)
	go connectMyql(ctx)
	wg.Wait()

} 

WithValue(传递值)

WithCancel、WithDeadline、WithTimeout,With 这个verb 就是context可以追溯和退出其衍生子gorutines 的关键所在! 在子gorutine开启时就与生俱来一些元数据!

WithValue可以1个gorutin繁衍了N代子gorutines之后,它的后代gorutines都能with(携带)1个固定值,这样我就可以自上而下追溯这个繁衍链了!

这也是微服务链路追踪的核心思想。

func WithValue(parent Context, key, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

WithValue返回父节点的副本,其中与key关联的值为val。

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。我把它当成session来看!

The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context.

所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。

Users of WithValue should define their own types for keys. To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables' static type should be a pointer or interface.

WithValue的用户应该为键定义自己的类型。为了避免在分配给interface{}时进行分配,上下文键通常具有具体类型struct{}。或者,导出的上下文关键变量的静态类型应该是指针或接口

package main

import (
	"context"
	"fmt"
	"sync"

	"time"
)

// context.WithValue

//TraceCode 自定义类型
type TraceCode string

var wg sync.WaitGroup

func worker(ctx context.Context) {
	defer wg.Done()

	key := TraceCode("TRACE_CODE")
	// 在子goroutine中获取trace code,(string)是类型断言!
	traceCode, ok := ctx.Value(key).(string)
	if !ok {
		fmt.Println("invalid trace code")
	}

	for {
		fmt.Printf("worker, trace code:%s
", traceCode)
		// 假设正常连接数据库耗时1秒
		time.Sleep(time.Second * 1)
		// 10秒后自动调用
		select {
		case <-ctx.Done():
			fmt.Println("worker done!")
			return
		default:
		}
	}

}

func main() {
	// 设置1个10秒的超时
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	//在系统的入口中设置trace code传递给后续启动的goroutine实现微服务日志数据聚合
	ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "666")
	wg.Add(1)
	go worker(ctx)
	//主线程等待10秒后
	time.Sleep(time.Second * 10)
	//通知子goroutine结束
	cancel()
	wg.Wait()
	fmt.Println("over")
}

context应用场景(微服务链路追踪)

作为1个微服务架构,微服务之间session不共享的服务端,如何追踪客户端1次request都调用了哪些微服务组件?并且聚合日志。

参考

原文地址:https://www.cnblogs.com/sss4/p/12834302.html