sql 数据定时发送webhook golang 服务

目的很简单,主要是基于cron表达式定时获取sql 数据库数据(sql server,mysql,pg,clickhouse)同时通过webhook 发送到外部接口

几个需求说明

  • 应该基于配置管理,而不是代码写死的
  • 支持多数据库同时运行(减少运行的实例)
  • 支持sql 数据的处理(对于不用webhook 的数据可能不一样,我们需要处理下)
  • job 支持灵活的cron 表达式
  • 应该轻量,简单,容易使用

设计

  • 简单说明
    参考了一个sql2slack 的服务,基于golang 编写,使用hcl 进行配置管理,同时基于js 引擎处理数据,同时为了方便数据处理
    提供了内置underscore ,对于cron 的处理基于golang 版本的cron表达式引擎 ,一些改进:基于hcl v2 版本,支持多js 引擎
    (otto以及goja 基于配置指定),调整webhook 消息的发送,支持发送任意消息,同时调整cron支持秒的处理
  • job 格式说明
    基于hcl ,Name 为label,实际上后边可以调整下,将job 扩展为webhook以及db 模式的,
 
type Job struct {
  Name            string        `hcl:",label"`
  Driver          string        `hcl:"driver"`
  DSN             string        `hcl:"dsn"`
  Query           string        `hcl:"query"`
  Webhook         string        `hcl:"webhook"`
  Schedule        string        `hcl:"schedule"`
  MessageString   string        `hcl:"message"`
  MessageCompiled executor.JSVM `hcl:"-"`
  Conn            *sqlx.DB      `hcl:"-"`
  EngineName      string        `hcl:"jsengine"`
  JSVM            string        `hcl:"-"`
  Stmnt           *sqlx.Stmt    `hcl:"-"`
}

参考hcl配置

job tst {
    webhook = "http://127.0.0.1:4195"
    driver = "mysql"
    dsn = "demo:demo@tcp(127.0.0.1:3306)/demo"
    jsengine = "otto"
    query = <<SQL
        SELECT users.* FROM users
    SQL
    schedule = "* * * * * *"
    message = <<JS
        if ( $rows.length < 1 ) {
            return
        }
        log("this is a demo")
        var msg =  "";
         _.chain($rows).pluck('name').each(function(name){
            msg += name+"--------demo--from otto----";
        })
         var info = {
            msgtype: "text",
            text: {
                content: msg
            }
        }
        log(JSON.stringify(info))
        send(JSON.stringify(info))
    JS
}
  • 代码结构
├── Dockerfile
├── Makefile
├── README.md
├── cmd
│   ├── cli
│   │   ├── Dockerfile
│   │   ├── Makefile
│   │   ├── README.md
│   │   └── main.go
│   └── server
│       ├── Dockerfile
│       ├── Makefile
│       ├── README.md
│       └── main.go
├── demo.hcl
├── demo2.hcl
├── docker-compose.yaml
├── go.mod
├── go.sum
├── pkg
│   ├── agent
│   ├── buildinfo
│   │   └── version.go
│   ├── commands
│   │   ├── cli.go
│   │   └── server.go
│   ├── executor
│   │   └── jsengine.go
│   ├── jobs
│   │   └── job.go
│   ├── npm
│   │   └── bindata.go
│   ├── storage
│   └── webhooks
├── underscore-min.js
└── webhook.yaml
  • 代码说明
    核心是 jsengine.go以及job.go,jsengine.go 包含了js 引擎的处理,job.go 主要是对于hcl 配置的解析以及cron 的处理
    job.go
    为了方便使用js engine 暴露了log $rows 以及send 发送,可以扩展,同时解析job
 
package jobs
import (
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "path/filepath"
    "github.com/dop251/goja"
    "github.com/go-resty/resty/v2"
    "github.com/hashicorp/hcl/v2/hclsimple"
    "github.com/jmoiron/sqlx"
    "github.com/robertkrimen/otto"
    "github.com/robfig/cron/v3"
    "github.com/rongfengliang/sql-server-exporter/pkg/executor"
)
// Job is one type for sql data fetch
type Job struct {
    Name            string        `hcl:",label"`
    Driver          string        `hcl:"driver"`
    DSN             string        `hcl:"dsn"`
    Query           string        `hcl:"query"`
    Webhook         string        `hcl:"webhook"`
    Schedule        string        `hcl:"schedule"`
    MessageString   string        `hcl:"message"`
    MessageCompiled executor.JSVM `hcl:"-"`
    Conn            *sqlx.DB      `hcl:"-"`
    EngineName      string        `hcl:"jsengine"`
    JSVM            string        `hcl:"-"`
    Stmnt           *sqlx.Stmt    `hcl:"-"`
}
// ParseJobs  parseJobs
func ParseJobs(jobsdir string) (map[string]*Job, *cron.Cron, error) {
    var cronhub *cron.Cron = cron.New(cron.WithChain(
        cron.SkipIfStillRunning(cron.DefaultLogger),
        cron.Recover(cron.DefaultLogger),
    ), cron.WithParser(cron.NewParser(
        cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
    )))
    files, err := filepath.Glob(filepath.Join(jobsdir, "*.hcl"))
    if err != nil {
        return nil, nil, err
    }
    result := map[string]*Job{}
    for _, filename := range files {
        var fileJobs struct {
            Jobs []*Job `hcl:"job,block"`
        }
        if err != nil {
            return nil, nil, err
        }
        err := hclsimple.DecodeFile(filename, nil, &fileJobs)
        if err != nil {
            return nil, nil, errors.New("#hcl: " + err.Error())
        }
        for _, job := range fileJobs.Jobs {
            job.MessageCompiled, err = NewJSVM(job.EngineName, job.Name, fmt.Sprintf("(function(){%s})()", job.MessageString))
            if err != nil {
                return nil, nil, errors.New("#javascript: " + err.Error())
            }
            job.Conn, err = sqlx.Connect(job.Driver, job.DSN)
            if err != nil {
                return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())
            }
            job.Stmnt, err = job.Conn.Preparex(job.Query)
            if err != nil {
                return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())
            }
            if job.Webhook == "" {
                return nil, nil, errors.New("#webhook:" + job.Name + ": webhook is required")
            }
            if err := (func(job *Job) error {
                _, err := cronhub.AddFunc(job.Schedule, func() {
                    if err := job.Exec(); err != nil {
                        panic(err)
                    }
                })
                return err
            })(job); err != nil {
                return nil, nil, errors.New("#cron:" + job.Name + ":" + err.Error())
            }
            result[job.Name] = job
        }
    }
    return result, cronhub, nil
}
// NewJSVM  NewJSVM
func NewJSVM(engine string, name, src string) (executor.JSVM, error) {
    var jsjvm executor.JSVM
    switch engine {
    case "goja":
        jsjvm = executor.NewGojaExecutor(src, goja.New())
    case "otto":
        vm := otto.New()
        script, err := vm.Compile(name, src)
        if err != nil {
            return nil, err
        }
        jsjvm = executor.NewOttoExecutor(src, vm, script)
    default:
        return nil, errors.New("not supported js engine")
    }
    return jsjvm, nil
}
// Exec job
func (j *Job) Exec() error {
    rows, err := j.Stmnt.Queryx()
    if err != nil {
        return err
    }
    defer rows.Close()
    var res []map[string]interface{}
    for rows.Next() {
        o := map[string]interface{}{}
        if err := rows.MapScan(o); err != nil {
            return err
        }
        for k, v := range o {
            if nil == v {
                continue
            }
            switch v.(type) {
            case []uint8:
                v = []byte(v.([]uint8))
            default:
                v, _ = json.Marshal(v)
            }
            var d interface{}
            if nil == json.Unmarshal(v.([]byte), &d) {
                o[k] = d
            } else {
                o[k] = string(v.([]byte))
            }
        }
        res = append(res, o)
    }
    msg := ""
    ctx := map[string]interface{}{
        "$rows": res,
        "log":   log.Println,
        "send": func(in ...interface{}) {
            msg += fmt.Sprint(in...) + "
"
        },
    }
    if err := j.MessageCompiled.Execute(ctx); err != nil {
        return err
    }
    _, err = resty.New().R().SetDoNotParseResponse(true).SetHeader("content-type", "application/json").SetBody(msg).Post(j.Webhook)
    return err
}
 

jsengine.go
js 引擎的处理使用了JSVM 接口,同时实现了otto 以及goja 的扩展,都包含了underscore 库

 
package executor
import (
    "github.com/dop251/goja"
    "github.com/dop251/goja_nodejs/require"
    "github.com/robertkrimen/otto"
    "github.com/rongfengliang/sql-server-exporter/pkg/npm"
)
// JSVM  js Engine define
type JSVM interface {
    // Execute job command
    Execute(map[string]interface{}) error
}
// GojaExecutor goja js executor engine
type GojaExecutor struct {
    Src string
    VM  *goja.Runtime
}
// Execute goja execute command
func (goja *GojaExecutor) Execute(context map[string]interface{}) error {
    for k, v := range context {
        goja.VM.Set(k, v)
    }
    _, err := goja.VM.RunString(goja.Src)
    return err
}
// NewGojaExecutor  GojaExecutor
func NewGojaExecutor(src string, vm *goja.Runtime) JSVM {
    registry := require.NewRegistryWithLoader(func(path string) ([]byte, error) {
        return npm.Asset(path)
    })
    m, _ := registry.Enable(vm).Require("underscore-min.js")
    vm.Set("_", m)
    return &GojaExecutor{
        Src: src,
        VM:  vm,
    }
}
// OttoExecutor Otto js executor engine
type OttoExecutor struct {
    Src    string
    VM     *otto.Otto
    Script *otto.Script
}
// Execute goja execute command
func (otto *OttoExecutor) Execute(context map[string]interface{}) error {
    for k, v := range context {
        if err := otto.VM.Set(k, v); err != nil {
            return err
        }
    }
    _, err := otto.VM.Run(otto.Script)
    return err
}
// Execute js exec script method with vm
func Execute(jsvm JSVM, context map[string]interface{}) error {
    return jsvm.Execute(context)
}
// NewOttoExecutor  OttoExecutor
func NewOttoExecutor(src string, vm *otto.Otto, script *otto.Script) JSVM {
    return &OttoExecutor{
        Src:    src,
        VM:     vm,
        Script: script,
    }
}
 

server.go
主要是server 端启动的,包含参数的解析以及加载依赖的job 基于urfave/cli/ 提供cli 的处理

 
package commands
import (
    "fmt"
    "log"
    "os"
    "github.com/rongfengliang/sql-server-exporter/pkg/buildinfo"
    "github.com/rongfengliang/sql-server-exporter/pkg/jobs"
    "github.com/urfave/cli/v2"
)
// Server server
type Server struct {
}
// NewServer return one  Server  Instance
func NewServer() *Server {
    return &Server{}
}
// Run run
func (s *Server) Run() {
    // TODos
    // load jobs create scheduler info
    app := cli.NewApp()
    app.Usage = "basic sql server data fetch service"
    app.Flags = []cli.Flag{
        &cli.StringFlag{
            Name:  "jobs-dir",
            Usage: "set job dirs",
            Value: ".",
        },
    }
    app.Commands = []*cli.Command{{
        Name:    "version",
        Aliases: []string{"v"},
        Usage:   "print application version",
        Action: func(c *cli.Context) error {
            fmt.Println(buildinfo.Version)
            return nil
        },
    }, {
        Name:  "start",
        Usage: "start service",
        Action: func(c *cli.Context) error {
            fmt.Println(c.String("jobs-dir"))
            jobdir := c.String("jobs-dir")
            if jobdir != "" {
                loadJobs, cronhub, err := jobs.ParseJobs(jobdir)
                if err != nil {
                    log.Fatal(err.Error())
                }
                for _, v := range loadJobs {
                    log.Println(v.EngineName)
                }
                cronhub.Run()
            }
            return nil
        },
    }}
    err := app.Run(os.Args)
    if err != nil {
        log.Fatal(err)
    }
} 

server 启动入口
主要是提供了加载sql driver 以及调用server.go 的解析


 
package main
import (
    _ "github.com/ClickHouse/clickhouse-go"
    _ "github.com/denisenkom/go-mssqldb"
    _ "github.com/go-sql-driver/mysql"
    _ "github.com/lib/pq"
    _ "github.com/robertkrimen/otto/underscore"
    "github.com/rongfengliang/sql-server-exporter/pkg/commands"
)
func main() {
    // create Server instance
    s := commands.NewServer()
    s.Run()
}

测试

  • 构建
    以及make,可以参考源码
 
make
  • 运行环境准备
    docker-compose.yaml
 
version: "3"
services:
  webhook:
      image: jeffail/benthos
      volumes:
      - "./webhook.yaml:/benthos.yaml"
      ports:
      - "4195:4195"
  mysql:
    image: mysql:5.7.16
    ports:
      - 3306:3306
    command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
    environment:
      MYSQL_ROOT_PASSWORD: demo
      MYSQL_DATABASE: demo
      MYSQL_USER: demo
      MYSQL_PASSWORD: demo
      TZ: Asia/Shanghai

webhook.yaml

input:
  type: broker
  broker:
    inputs:
      - type: http_server
        http_server:
          path: /
        processors:
          - type: text
            text:
              operator: prepend
              value: "get message: "
output:
  type: stdout
 
 
  • 数据准备
CREATE TABLE `users` (
  `name` varchar(100) ,
  `status` varchar(100)
) ENGINE=InnoDB
INSERT INTO demo.users
(name, status)
VALUES('dalong', '0');
INSERT INTO demo.users
(name, status)
VALUES('demo', '1');
INSERT INTO demo.users
(name, status)
VALUES('rong', '1');
  • 运行效果
./bin/exporter-server start

说明

以上是一个简单的介绍,详细的可以参考github 代码

参考资料

https://github.com/rongfengliang/sql-data-exporter

原文地址:https://www.cnblogs.com/rongfengliang/p/13237049.html