go应用专题:context应用场景

参考:

https://www.jianshu.com/p/6def5063c1eb(context应用场景)

https://zhuanlan.zhihu.com/p/110085652(深入理解context)

context实现server调度

net/http在实现http server时就用到了context, 下面简单分析一下

1、首先Server在开启服务时会创建一个valueCtx,存储了server的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx。

func (srv *Server) Serve(l net.Listener) error {
    ...
    var tempDelay time.Duration     // how long to sleep on accept failure
    baseCtx := context.Background() // base is always background, per Issue 16220
    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        rw, e := l.Accept()
        ...
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve(ctx)
    }
}

2、建立连接之后会基于传入的context创建一个valueCtx用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。

func (c *conn) serve(ctx context.Context) {
    c.remoteAddr = c.rwc.RemoteAddr().String()
    ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
    ...
    ctx, cancelCtx := context.WithCancel(ctx)
    c.cancelCtx = cancelCtx
    defer cancelCtx()
    ...
    for {
        w, err := c.readRequest(ctx)
        ...
        serverHandler{c.server}.ServeHTTP(w, w.req)
        ...
    }
}

3、读取到请求之后,会再次基于传入的context创建新的cancelCtx,并设置到当前请求对象req上,同时生成的response对象中cancelCtx保存了当前context取消方法。

func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
    ...
    req, err := readRequest(c.bufr, keepHostHeader)
    ...
    ctx, cancelCtx := context.WithCancel(ctx)
    req.ctx = ctx
    ...
    w = &response{
        conn:          c,
        cancelCtx:     cancelCtx,
        req:           req,
        reqBody:       req.Body,
        handlerHeader: make(Header),
        contentLength: -1,
        closeNotifyCh: make(chan bool, 1),

        // We populate these ahead of time so we're not
        // reading from req.Header after their Handler starts
        // and maybe mutates it (Issue 14940)
        wants10KeepAlive: req.wantsHttp10KeepAlive(),
        wantsClose:       req.wantsClose(),
    }
    ...
    return w, nil
}

这样处理的目的主要有以下几点:

一旦请求超时,即可中断当前请求;
在处理构建response过程中如果发生错误,可直接调用response对象的cancelCtx方法结束当前请求;
在处理构建response完成之后,调用response对象的cancelCtx方法结束当前请求。

在整个server处理流程中,使用了一条context链贯穿Server、Connection、Request,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。

ValueContext实现请求中间件

package main

import (
    "net/http"
    "context"
)

type FooKey string

var UserName = FooKey("user-name")
var UserId = FooKey("user-id")

func foo(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        ctx := context.WithValue(r.Context(), UserId, "1")
        ctx2 := context.WithValue(ctx, UserName, "yejianfeng")
        next(w, r.WithContext(ctx2))
    }
}

func GetUserName(context context.Context) string {
    if ret, ok := context.Value(UserName).(string); ok {
        return ret
    }
    return ""
}

func GetUserId(context context.Context) string {
    if ret, ok := context.Value(UserId).(string); ok {
        return ret
    }
    return ""
}

func test(w http.ResponseWriter, r *http.Request) {
    w.Write([]byte("welcome: "))
    w.Write([]byte(GetUserId(r.Context())))
    w.Write([]byte(" "))
    w.Write([]byte(GetUserName(r.Context())))
}

func main() {
    http.Handle("/", foo(test))
    http.ListenAndServe(":8080", nil)
}
在使用ValueCtx的时候需要注意一点,这里的key不应该设置成为普通的String或者Int类型,为了防止不同的中间件对这个key的覆盖。
最好的情况是每个中间件使用一个自定义的key类型,比如这里的FooKey,而且获取Value的逻辑尽量也抽取出来作为一个函数,放在这个middleware的同包中。这样,就会有效避免不同包设置相同的key的冲突问题了。

TimeoutContext实现超时控制

请求端

uri := "https://httpbin.org/delay/3"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
    log.Fatalf("http.NewRequest() failed with '%s'
", err)
}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
    log.Fatalf("http.DefaultClient.Do() failed with:
'%s'
", err)
}
defer resp.Body.Close()

服务端

package main

import (
    "net/http"
    "time"
)

func test(w http.ResponseWriter, r *http.Request) {
    time.Sleep(20 * time.Second)
    w.Write([]byte("test"))
}


func main() {
    http.HandleFunc("/", test)
    timeoutHandler := http.TimeoutHandler(http.DefaultServeMux, 5 * time.Second, "timeout")
    http.ListenAndServe(":8080", timeoutHandler)
}

CancelContext实现任务调度

package main

import (
    "context"
    "sync"
    "github.com/pkg/errors"
)

func Rpc(ctx context.Context, url string) error {
    result := make(chan int)
    err := make(chan error)

    go func() {
        // 进行RPC调用,并且返回是否成功,成功通过result传递成功信息,错误通过error传递错误信息
        isSuccess := true
        if isSuccess {
            result <- 1
        } else {
            err <- errors.New("some error happen")
        }
    }()

    select {//协程会一直等待,直到推出
        case <- ctx.Done():
            // 其他RPC调用调用失败
            return ctx.Err()
        case e := <- err:
            // 本RPC调用失败,返回错误信息
            return e
        case <- result:
            // 本RPC调用成功,不返回错误信息
            return nil
    }
}


func main() {
    ctx, cancel := context.WithCancel(context.Background()) //核心

    // RPC1调用
    err := Rpc(ctx, "http://rpc_1_url")
    if err != nil {
        return
    }

    wg := sync.WaitGroup{}

    // RPC2调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_2_url")
        if err != nil {
            cancel()//发起去掉调度
        }
    }()

    // RPC3调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_3_url")
        if err != nil {
            cancel()
        }
    }()

    // RPC4调用
    wg.Add(1)
    go func(){
        defer wg.Done()
        err := Rpc(ctx, "http://rpc_4_url")
        if err != nil {
            cancel()
        }
    }()

    wg.Wait()
}
 
原文地址:https://www.cnblogs.com/tkzc2013/p/15184368.html