goreplay~文件输出解析

这个类本身没太多东西,直接看,

package main

import (
    "bufio"
    "compress/gzip"
    "errors"
    "fmt"
    "io"
    "log"
    "os"
    "path/filepath"
    "runtime/debug"
    "sort"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/buger/goreplay/size"
)

var dateFileNameFuncs = map[string]func(*FileOutput) string{
    "%Y":  func(o *FileOutput) string { return time.Now().Format("2006") },
    "%m":  func(o *FileOutput) string { return time.Now().Format("01") },
    "%d":  func(o *FileOutput) string { return time.Now().Format("02") },
    "%H":  func(o *FileOutput) string { return time.Now().Format("15") },
    "%M":  func(o *FileOutput) string { return time.Now().Format("04") },
    "%S":  func(o *FileOutput) string { return time.Now().Format("05") },
    "%NS": func(o *FileOutput) string { return fmt.Sprint(time.Now().Nanosecond()) },
    "%r":  func(o *FileOutput) string { return string(o.currentID) },
    "%t":  func(o *FileOutput) string { return string(o.payloadType) },
}

// FileOutputConfig ...
type FileOutputConfig struct {
    FlushInterval     time.Duration `json:"output-file-flush-interval"`
    SizeLimit         size.Size     `json:"output-file-size-limit"`
    OutputFileMaxSize size.Size     `json:"output-file-max-size-limit"`
    QueueLimit        int           `json:"output-file-queue-limit"`
    Append            bool          `json:"output-file-append"`
    BufferPath        string        `json:"output-file-buffer"`
    onClose           func(string)
}

// FileOutput output plugin
type FileOutput struct {
    sync.RWMutex
    pathTemplate   string
    currentName    string
    file           *os.File
    QueueLength    int
    chunkSize      int
    writer         io.Writer
    requestPerFile bool
    currentID      []byte
    payloadType    []byte
    closed         bool
    totalFileSize  size.Size

    config *FileOutputConfig
}

// NewFileOutput constructor for FileOutput, accepts path
func NewFileOutput(pathTemplate string, config *FileOutputConfig) *FileOutput {
    o := new(FileOutput)
    o.pathTemplate = pathTemplate
    o.config = config
    o.updateName()

    if strings.Contains(pathTemplate, "%r") {
        o.requestPerFile = true
    }

    if config.FlushInterval == 0 {
        config.FlushInterval = 100 * time.Millisecond
    }

    go func() {
        for {
            time.Sleep(config.FlushInterval)
            if o.IsClosed() {
                break
            }
            o.updateName()
            o.flush()
        }
    }()

    return o
}

func getFileIndex(name string) int {
    ext := filepath.Ext(name)
    withoutExt := strings.TrimSuffix(name, ext)

    if idx := strings.LastIndex(withoutExt, "_"); idx != -1 {
        if i, err := strconv.Atoi(withoutExt[idx+1:]); err == nil {
            return i
        }
    }

    return -1
}

func setFileIndex(name string, idx int) string {
    idxS := strconv.Itoa(idx)
    ext := filepath.Ext(name)
    withoutExt := strings.TrimSuffix(name, ext)

    if i := strings.LastIndex(withoutExt, "_"); i != -1 {
        if _, err := strconv.Atoi(withoutExt[i+1:]); err == nil {
            withoutExt = withoutExt[:i]
        }
    }

    return withoutExt + "_" + idxS + ext
}

func withoutIndex(s string) string {
    if i := strings.LastIndex(s, "_"); i != -1 {
        return s[:i]
    }

    return s
}

type sortByFileIndex []string

func (s sortByFileIndex) Len() int {
    return len(s)
}

func (s sortByFileIndex) Swap(i, j int) {
    s[i], s[j] = s[j], s[i]
}

func (s sortByFileIndex) Less(i, j int) bool {
    if withoutIndex(s[i]) == withoutIndex(s[j]) {
        return getFileIndex(s[i]) < getFileIndex(s[j])
    }

    return s[i] < s[j]
}

func (o *FileOutput) filename() string {
    o.RLock()
    defer o.RUnlock()

    path := o.pathTemplate

    for name, fn := range dateFileNameFuncs {
        path = strings.Replace(path, name, fn(o), -1)
    }

    if !o.config.Append {
        nextChunk := false

        if o.currentName == "" ||
            ((o.config.QueueLimit > 0 && o.QueueLength >= o.config.QueueLimit) ||
                (o.config.SizeLimit > 0 && o.chunkSize >= int(o.config.SizeLimit))) {
            nextChunk = true
        }

        ext := filepath.Ext(path)
        withoutExt := strings.TrimSuffix(path, ext)

        if matches, err := filepath.Glob(withoutExt + "*" + ext); err == nil {
            if len(matches) == 0 {
                return setFileIndex(path, 0)
            }
            sort.Sort(sortByFileIndex(matches))

            last := matches[len(matches)-1]

            fileIndex := 0
            if idx := getFileIndex(last); idx != -1 {
                fileIndex = idx

                if nextChunk {
                    fileIndex++
                }
            }

            return setFileIndex(last, fileIndex)
        }
    }

    return path
}

func (o *FileOutput) updateName() {
    name := filepath.Clean(o.filename())
    o.Lock()
    o.currentName = name
    o.Unlock()
}

// PluginWrite writes message to this plugin
func (o *FileOutput) PluginWrite(msg *Message) (n int, err error) {
    if o.requestPerFile {
        o.Lock()
        meta := payloadMeta(msg.Meta)
        o.currentID = meta[1]
        o.payloadType = meta[0]
        o.Unlock()
    }

    o.updateName()
    o.Lock()
    defer o.Unlock()

    if o.file == nil || o.currentName != o.file.Name() {
        o.closeLocked()

        o.file, err = os.OpenFile(o.currentName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
        o.file.Sync()

        if strings.HasSuffix(o.currentName, ".gz") {
            o.writer = gzip.NewWriter(o.file)
        } else {
            o.writer = bufio.NewWriter(o.file)
        }

        if err != nil {
            log.Fatal(o, "Cannot open file %q. Error: %s", o.currentName, err)
        }

        o.QueueLength = 0
    }

    var nn int
    n, err = o.writer.Write(msg.Meta)
    nn, err = o.writer.Write(msg.Data)
    n += nn
    nn, err = o.writer.Write(payloadSeparatorAsBytes)
    n += nn

    o.totalFileSize += size.Size(n)
    o.QueueLength++

    if Settings.OutputFileConfig.OutputFileMaxSize > 0 && o.totalFileSize >= Settings.OutputFileConfig.OutputFileMaxSize {
        return n, errors.New("File output reached size limit")
    }

    return n, err
}

func (o *FileOutput) flush() {
    // Don't exit on panic
    defer func() {
        if r := recover(); r != nil {
            Debug(0, "[OUTPUT-FILE] PANIC while file flush: ", r, o, string(debug.Stack()))
        }
    }()

    o.Lock()
    defer o.Unlock()

    if o.file != nil {
        if strings.HasSuffix(o.currentName, ".gz") {
            o.writer.(*gzip.Writer).Flush()
        } else {
            o.writer.(*bufio.Writer).Flush()
        }

        if stat, err := o.file.Stat(); err == nil {
            o.chunkSize = int(stat.Size())
        } else {
            Debug(0, "[OUTPUT-HTTP] error accessing file size", err)
        }
    }
}

func (o *FileOutput) String() string {
    return "File output: " + o.file.Name()
}

func (o *FileOutput) closeLocked() error {
    if o.file != nil {
        if strings.HasSuffix(o.currentName, ".gz") {
            o.writer.(*gzip.Writer).Close()
        } else {
            o.writer.(*bufio.Writer).Flush()
        }
        o.file.Close()

        if o.config.onClose != nil {
            o.config.onClose(o.file.Name())
        }
    }

    o.closed = true
    return nil
}

// Close closes the output file that is being written to.
func (o *FileOutput) Close() error {
    o.Lock()
    defer o.Unlock()
    return o.closeLocked()
}

// IsClosed returns if the output file is closed or not.
func (o *FileOutput) IsClosed() bool {
    o.Lock()
    defer o.Unlock()
    return o.closed
}
原文地址:https://www.cnblogs.com/it-worker365/p/15113819.html