goreplay~http输出工作线程

http输出工作线程

NewHTTPOutput 默认情况

    if config.WorkersMin <= 0 {
        config.WorkersMin = 1
    }
    if config.WorkersMin > 1000 {
        config.WorkersMin = 1000
    }
    if config.WorkersMax <= 0 {
        config.WorkersMax = math.MaxInt32 // idealy so large
    }
    if config.WorkersMax < config.WorkersMin {
        config.WorkersMax = config.WorkersMin
    }
    if config.WorkerTimeout <= 0 {
config.WorkerTimeout = time.Second * 2
}

配置后启动httpclient,然后

    o.client = NewHTTPClient(o.config)
    o.activeWorkers += int32(o.config.WorkersMin)
    for i := 0; i < o.config.WorkersMin; i++ {
        go o.startWorker()
    }

启动多个发送进程

func (o *HTTPOutput) startWorker() {
    for {
        select {
        case <-o.stopWorker:
            return
        case msg := <-o.queue:
            o.sendRequest(o.client, msg)
        }
    }
}

执行发送

func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
    if !isRequestPayload(msg.Meta) {
        return
    }
    uuid := payloadID(msg.Meta)
    start := time.Now()
    resp, err := client.Send(msg.Data)
    stop := time.Now()

    if err != nil {
        Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
        return
    }
    if resp == nil {
        return
    }

    if o.config.TrackResponses {
        o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
    }

    if o.elasticSearch != nil {
        o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
    }
}

发送细节,各种配置生效点

func (c *HTTPClient) Send(data []byte) ([]byte, error) {
    var req *http.Request
    var resp *http.Response
    var err error

    req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
    if err != nil {
        return nil, err
    }
    // we don't send CONNECT or OPTIONS request
    if req.Method == http.MethodConnect {
        return nil, nil
    }

    if !c.config.OriginalHost {
        req.Host = c.config.url.Host
    }

    // fix #862
    if c.config.url.Path == "" && c.config.url.RawQuery == "" {
        req.URL.Scheme = c.config.url.Scheme
        req.URL.Host = c.config.url.Host
    } else {
        req.URL = c.config.url
    }

    // force connection to not be closed, which can affect the global client
    req.Close = false
    // it's an error if this is not equal to empty string
    req.RequestURI = ""

    resp, err = c.Client.Do(req)
    if err != nil {
        return nil, err
    }
    if c.config.TrackResponses {
        return httputil.DumpResponse(resp, true)
    }
    _ = resp.Body.Close()
    return nil, nil
}

master工作进程,超时设置生效等

func (o *HTTPOutput) workerMaster() {
    var timer = time.NewTimer(o.config.WorkerTimeout)
    defer func() {
        // recover from panics caused by trying to send in
        // a closed chan(o.stopWorker)
        recover()
    }()
    defer timer.Stop()
    for {
        select {
        case <-o.stop:
            return
        default:
            <-timer.C
        }
        // rollback workers
    rollback:
        if atomic.LoadInt32(&o.activeWorkers) > int32(o.config.WorkersMin) && len(o.queue) < 1 {
            // close one worker
            o.stopWorker <- struct{}{}
            atomic.AddInt32(&o.activeWorkers, -1)
            goto rollback
        }
        timer.Reset(o.config.WorkerTimeout)
    }
}
原文地址:https://www.cnblogs.com/it-worker365/p/15113764.html