并发模式-goroutine

1,runner

runner包用于展示如果使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用runner包来终止程序。当开发需要调度后台处理的程序的时候,这种模式

会很有用。这个程序可能会作为cron作业执行,或者在基于定时任务的云环境里执行

D:gocode est unner unner.go

package runner

import (
    "errors"
    "os"
    "os/signal"
    "time"
)

//Runner在给定的超时时间内执行一组任务
//并且在操作系统发送中断信号时候结束这些任务
type Runner struct {
    //interrupt 通道报告从操作系统
    //发送的信号
    interrupt chan os.Signal

    //complete通道报告处理任务已经完成
    complete  chan error

    //timeout报告处理任务已经超时
    timeout <- chan time.Time

    //tasks持有一组以索引顺序依次执行的
    //函数
    tasks []func(int)
}

var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")


/*
返回一个新准备使用的Runner
 */
func New(d time.Duration) *Runner{
    return &Runner{
        //1个长度的通道 为了不阻塞
        interrupt:make(chan os.Signal,1),
        complete:make(chan error),
        timeout:time.After(d),
    }
}

/*
将一个任务附加到Runner上,这个任务是一个接收一个int类型的的id作为
参数的函数
 */
func (r *Runner)Add(tasks ...func(int)){
    r.tasks = append(r.tasks,tasks...)
}

//start 执行所有任务 并监视通道事件
func(r *Runner)Start() error{
    //我们希望接收所有中断信号
    signal.Notify(r.interrupt,os.Interrupt)
    //用不同的协程执行不同的任务
    go func() {
        r.complete <- r.run()
    }()
    select {
    //当任务处理完成时发出的信号
    case  err  := <- r.complete:
        return err
        //当任务处理程序运行超时发出的信号
    case <- r.timeout:
        return ErrTimeout
    }

}

//执行每个已经注册的任务
func(r *Runner) run() error{
    for  id,task := range r.tasks{
        //检测操作系统中的中断信号
        if r.gotInterrupt() {
            return ErrInterrupt
        }
        //执行已经注册的任务、
        task(id)
    }
    return nil
}


//验证是否接收到了中断信号
func(r *Runner) gotInterrupt() bool{
    select {
    case <- r.interrupt:
        //停止接收后续的任何信号
        signal.Stop(r.interrupt)
        return true
    default:
        return false
    }
}

D:gocode estlisten20.go

package main

import (
    "log"
    "os"
    "test.com/runner"
    "time"
)

//规定了必须在3秒内完成任务
const timeout  =  3*time.Second

func main(){
    log.Println("starting work.")

    r := runner.New(timeout)

    r.Add(createTask(),createTask(),createTask(),createTask())

    if  err :=r.Start();err != nil{
        switch err {
        case runner.ErrTimeout:
            log.Println("任务超时")
            os.Exit(1)
        case runner.ErrInterrupt:
            log.Println("强制退出程序")
            os.Exit(2)
        }
    }

    log.Println("运行完成.")

}


/*
返回一个根据id 休眠指定秒数的实例任务
 */
func createTask() func(id int){
    return func(id int){
        log.Printf("processor - task #%d",id)
        time.Sleep(time.Duration(id)*time.Second)
    }
}

2020/04/25 17:24:12 starting work.
2020/04/25 17:24:12 processor - task #0
2020/04/25 17:24:12 processor - task #1
2020/04/25 17:24:13 processor - task #2
2020/04/25 17:24:15 任务超时

2020/04/25 17:24:26 starting work.
2020/04/25 17:24:26 processor - task #0
2020/04/25 17:24:26 processor - task #1
2020/04/25 17:24:27 强制退出程序

原文地址:https://www.cnblogs.com/sunlong88/p/12774084.html