golang异步任务

golang异步任务

1. 文件目录

├── go.mod
├── main.go
└── workpool
    └── workpool.go

2. workpool/workpool.go文件

package workpool

import (
	"errors"
	"fmt"
	"log"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	ErrCapacity = errors.New("Thread Pool At Capacity")
)

type (
	// poolWork is passed into the queue for work to be performed.
	poolWork struct {
		work          PoolWorker // The Work to be performed.
		resultChannel chan error // Used to inform the queue operaion is complete.
	}

	// WorkPool implements a work pool with the specified concurrency level and queue capacity.
	WorkPool struct {
		shutdownQueueChannel chan string     // Channel used to shut down the queue routine.
		shutdownWorkChannel  chan struct{}   // Channel used to shut down the work routines.
		shutdownWaitGroup    sync.WaitGroup  // The WaitGroup for shutting down existing routines.
		queueChannel         chan poolWork   // Channel used to sync access to the queue.
		workChannel          chan PoolWorker // Channel used to process work.
		queuedWork           int32           // The number of work items queued.
		activeRoutines       int32           // The number of routines active.
		queueCapacity        int32           // The max number of items we can store in the queue.
	}
)

// PoolWorker must be implemented by the object we will perform work on, now.
type PoolWorker interface {
	DoWork(workRoutine int)
}

// init is called when the system is inited.
func init() {
	log.SetPrefix("TRACE: ")
	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}

// New creates a new WorkPool.
func New(numberOfRoutines int, queueCapacity int32) *WorkPool {
	workPool := WorkPool{
		shutdownQueueChannel: make(chan string),
		shutdownWorkChannel:  make(chan struct{}),
		queueChannel:         make(chan poolWork),
		workChannel:          make(chan PoolWorker, queueCapacity),
		queuedWork:           0,
		activeRoutines:       0,
		queueCapacity:        queueCapacity,
	}

	// Add the total number of routines to the wait group
	workPool.shutdownWaitGroup.Add(numberOfRoutines)

	// Launch the work routines to process work
	for workRoutine := 0; workRoutine < numberOfRoutines; workRoutine++ {
		go workPool.workRoutine(workRoutine)
	}

	// Start the queue routine to capture and provide work
	go workPool.queueRoutine()

	return &workPool
}

// Shutdown will release resources and shutdown all processing.
func (workPool *WorkPool) Shutdown(goRoutine string) (err error) {
	defer catchPanic(&err, goRoutine, "Shutdown")

	writeStdout(goRoutine, "Shutdown", "Started")
	writeStdout(goRoutine, "Shutdown", "Queue Routine")

	workPool.shutdownQueueChannel <- "Down"
	<-workPool.shutdownQueueChannel

	close(workPool.queueChannel)
	close(workPool.shutdownQueueChannel)

	writeStdout(goRoutine, "Shutdown", "Shutting Down Work Routines")

	// Close the channel to shut things down.
	close(workPool.shutdownWorkChannel)
	workPool.shutdownWaitGroup.Wait()

	close(workPool.workChannel)

	writeStdout(goRoutine, "Shutdown", "Completed")
	return err
}

// PostWork will post work into the WorkPool. This call will block until the Queue routine reports back
// success or failure that the work is in queue.
func (workPool *WorkPool) PostWork(goRoutine string, work PoolWorker) (err error) {
	defer catchPanic(&err, goRoutine, "PostWork")

	poolWork := poolWork{work, make(chan error)}

	defer close(poolWork.resultChannel)

	workPool.queueChannel <- poolWork
	err = <-poolWork.resultChannel

	return err
}

// QueuedWork will return the number of work items in queue.
func (workPool *WorkPool) QueuedWork() int32 {
	return atomic.AddInt32(&workPool.queuedWork, 0)
}

// ActiveRoutines will return the number of routines performing work.
func (workPool *WorkPool) ActiveRoutines() int32 {
	return atomic.AddInt32(&workPool.activeRoutines, 0)
}

// CatchPanic is used to catch any Panic and log exceptions to Stdout. It will also write the stack trace.
func catchPanic(err *error, goRoutine string, functionName string) {
	if r := recover(); r != nil {
		// Capture the stack trace
		buf := make([]byte, 10000)
		runtime.Stack(buf, false)

		writeStdoutf(goRoutine, functionName, "PANIC Defered [%v] : Stack Trace : %v", r, string(buf))

		if err != nil {
			*err = fmt.Errorf("%v", r)
		}
	}
}

// writeStdout is used to write a system message directly to stdout.
func writeStdout(goRoutine string, functionName string, message string) {
	log.Printf("%s : %s : %s
", goRoutine, functionName, message)
}

// writeStdoutf is used to write a formatted system message directly stdout.
func writeStdoutf(goRoutine string, functionName string, format string, a ...interface{}) {
	writeStdout(goRoutine, functionName, fmt.Sprintf(format, a...))
}

// workRoutine performs the work required by the work pool
func (workPool *WorkPool) workRoutine(workRoutine int) {
	for {
		select {
		// Shutdown the WorkRoutine.
		case <-workPool.shutdownWorkChannel:
			writeStdout(fmt.Sprintf("WorkRoutine %d", workRoutine), "workRoutine", "Going Down")
			workPool.shutdownWaitGroup.Done()
			return

		// There is work in the queue.
		case poolWorker := <-workPool.workChannel:
			workPool.safelyDoWork(workRoutine, poolWorker)
			break
		}
	}
}

// safelyDoWork executes the user DoWork method.
func (workPool *WorkPool) safelyDoWork(workRoutine int, poolWorker PoolWorker) {
	defer catchPanic(nil, "WorkRoutine", "SafelyDoWork")
	defer atomic.AddInt32(&workPool.activeRoutines, -1)

	// Update the counts
	atomic.AddInt32(&workPool.queuedWork, -1)
	atomic.AddInt32(&workPool.activeRoutines, 1)

	// Perform the work
	poolWorker.DoWork(workRoutine)
}

// queueRoutine captures and provides work.
func (workPool *WorkPool) queueRoutine() {
	for {
		select {
		// Shutdown the QueueRoutine.
		case <-workPool.shutdownQueueChannel:
			writeStdout("Queue", "queueRoutine", "Going Down")
			workPool.shutdownQueueChannel <- "Down"
			return

		// Post work to be processed.
		case queueItem := <-workPool.queueChannel:
			// If the queue is at capacity don't add it.
			if atomic.AddInt32(&workPool.queuedWork, 0) == workPool.queueCapacity {
				queueItem.resultChannel <- ErrCapacity
				continue
			}

			// Increment the queued work count.
			atomic.AddInt32(&workPool.queuedWork, 1)

			// Queue the work for the WorkRoutine to process.
			workPool.workChannel <- queueItem.work

			// Tell the caller the work is queued.
			queueItem.resultChannel <- nil
			break
		}
	}
}

3. main.go 调用

package main
import (
	"bufio"
	"fmt"
	"goroutine-demo/workpool"
	"os"
	"runtime"
	"strconv"
	"time"
)

type MyWork struct {
	Name      string "The Name of a person"
	BirthYear int    "The Yea the person was born"
	WP        *workpool.WorkPool
}

func (workPool *MyWork) DoWork(workRoutine int) {
	fmt.Printf("%s : %d
", workPool.Name, workPool.BirthYear)
	fmt.Printf("*******> WR: %d  QW: %d  AR: %d
", workRoutine, workPool.WP.QueuedWork(), workPool.WP.ActiveRoutines())
	time.Sleep(100 * time.Millisecond)
	//panic("test")
}
func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	workPool := workpool.New(runtime.NumCPU()*3, 10)
	shutdown := false // Just for testing, I Know
	go func() {
		for i := 0; i < 1000; i++ {
			work := &MyWork{
				Name:      "A" + strconv.Itoa(i),
				BirthYear: i,
				WP:        workPool,
			}
			err := workPool.PostWork("name_routine", work)
			if err != nil {
				fmt.Printf("ERROR: %s
", err)
				time.Sleep(100 * time.Millisecond)
			}
			if shutdown == true {
				return
			}
		}
	}()
	fmt.Println("Hit any key to exit")
	reader := bufio.NewReader(os.Stdin)
	reader.ReadString('
')
	shutdown = true
	fmt.Println("Shutting Down
")
	workPool.Shutdown("name_routine")
}

相关链接

https://studygolang.com/articles/14481
https://github.com/goinggo/workpool

【励志篇】: 古之成大事掌大学问者,不惟有超世之才,亦必有坚韧不拔之志。
原文地址:https://www.cnblogs.com/tomtellyou/p/13870226.html