WorkPool线程池

WorkPool线程池

就是限制函数的运行数量的struct

参考https://jiajunhuang.com/articles/2020_04_21-golang_concurrency.md.html

package pool

type GoPool struct {
	MaxLimit int

	tokenChan chan struct{}
}

type GoPoolOption func(*GoPool)

func WithMaxLimit(max int) GoPoolOption {
	return func(gp *GoPool) {
		gp.MaxLimit = max
		gp.tokenChan = make(chan struct{}, gp.MaxLimit)

		for i := 0; i < gp.MaxLimit; i++ {
			gp.tokenChan <- struct{}{}
		}
	}
}

func NewGoPool(options ...GoPoolOption) *GoPool {
	p := &GoPool{}
	for _, o := range options {
		o(p)
	}

	return p
}

// Submit will wait a token, and then execute fn
func (gp *GoPool) Submit(fn func()) {
	token := <-gp.tokenChan // if there are no tokens, we'll block here

	go func() {
		fn()
		gp.tokenChan <- token
	}()
}

// Wait will wait all the tasks executed, and then return
func (gp *GoPool) Wait() {
	for i := 0; i < gp.MaxLimit; i++ {
		<-gp.tokenChan
	}

	close(gp.tokenChan)
}

func (gp *GoPool) size() int {
	return len(gp.tokenChan)
}

实现原理

池子里面的属性是令牌桶,这个桶子用有缓冲chan来实现,实现了自动阻塞

带有接口那味的实现

workpool.go

package work

// 什么是池子,池子就是他一直有地址空间,你上传就Ok了
// 方法:
// 上传 + 执行任务
// 当前任务空闲
// 属性: 令牌桶
type IWorkPool interface {
	Submit(work IWork)
	GetWorkCount() int
}

type WorkPool struct {
	Tocken chan int
}

func NewWorkPool(limit int) *WorkPool {
	wp := &WorkPool{
		Tocken:make(chan int, limit),
	}
	for i := 0; i < limit; i++ {
		wp.Tocken<- i
	}
	return  wp
}

func (wp *WorkPool) Submit(work IWork) {
	<- wp.Tocken
	go func() {
		work.DoWork()
		wp.Tocken<-1
	}()
}

func (wp *WorkPool) GetWorkCount() int {
	return len(wp.Tocken)
}

work.go

package work

import "fmt"

// 属性:
// work func()
//方法:
// DoWork()
type IWork interface {
	DoWork()
}

// 具体任务
type Work struct {
	work func()
}

func NewWork(fn func()) *Work {
	return &Work{
		work: fn,
	}
}

func (w *Work) DoWork() {
	w.work()
	fmt.Println("任务ok")
}

测试代码

package main

import (
	"go-newbase/work"
	"time"
)

func main() {
	pool := work.NewWorkPool(4)
	for i := 0; i < 100; i++ {
		newWork := work.NewWork(func() {
			time.Sleep(3 * time.Second)
		})
		pool.Submit(newWork)
	}

	//time.Sleep(time.Second * 3)
	time.Sleep(10000 * time.Second)
}
原文地址:https://www.cnblogs.com/maomaomaoge/p/14130026.html