goreplay~http输出队列

output-http-queue-len

output_http.go 各种初始化

func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
    o := new(HTTPOutput)
    var err error
    config.url, err = url.Parse(address)
    if err != nil {
        log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
    }
    if config.url.Scheme == "" {
        config.url.Scheme = "http"
    }
    config.rawURL = config.url.String()
    if config.Timeout < time.Millisecond*100 {
        config.Timeout = time.Second
    }
    if config.BufferSize <= 0 {
        config.BufferSize = 100 * 1024 // 100kb
    }
    if config.WorkersMin <= 0 {
        config.WorkersMin = 1
    }
    if config.WorkersMin > 1000 {
        config.WorkersMin = 1000
    }
    if config.WorkersMax <= 0 {
        config.WorkersMax = math.MaxInt32 // idealy so large
    }
    if config.WorkersMax < config.WorkersMin {
        config.WorkersMax = config.WorkersMin
    }
    if config.QueueLen <= 0 {
        config.QueueLen = 1000
    }
    if config.RedirectLimit < 0 {
        config.RedirectLimit = 0
    }
    if config.WorkerTimeout <= 0 {
        config.WorkerTimeout = time.Second * 2
    }
    o.config = config
    o.stop = make(chan bool)
    if o.config.Stats {
        o.queueStats = NewGorStat("output_http", o.config.StatsMs)
    }

    o.queue = make(chan *Message, o.config.QueueLen)
    if o.config.TrackResponses {
        o.responses = make(chan *response, o.config.QueueLen)
    }
    // it should not be buffered to avoid races
    o.stopWorker = make(chan struct{})

    if o.config.ElasticSearch != "" {
        o.elasticSearch = new(ESPlugin)
        o.elasticSearch.Init(o.config.ElasticSearch)
    }
    o.client = NewHTTPClient(o.config)
    o.activeWorkers += int32(o.config.WorkersMin)
    for i := 0; i < o.config.WorkersMin; i++ {
        go o.startWorker()
    }
    go o.workerMaster()
    return o
}

队列用在哪儿

func (o *HTTPOutput) startWorker() {
    for {
        select {
        case <-o.stopWorker:
            return
        case msg := <-o.queue:
            o.sendRequest(o.client, msg)
        }
    }
}

// PluginWrite writes message to this plugin
func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
    if !isRequestPayload(msg.Meta) {
        return len(msg.Data), nil
    }

    select {
    case <-o.stop:
        return 0, ErrorStopped
    case o.queue <- msg:
    }

    if o.config.Stats {
        o.queueStats.Write(len(o.queue))
    }
    if len(o.queue) > 0 {
        // try to start a new worker to serve
        if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
            go o.startWorker()
            atomic.AddInt32(&o.activeWorkers, 1)
        }
    }
    return len(msg.Data) + len(msg.Meta), nil
}
原文地址:https://www.cnblogs.com/it-worker365/p/15114622.html