go 使用线程池做请求限流

描述:说到请求限流,一般都会用到MQ,无论何种MQ,都需要生产者和消费者才能发挥MQ的强大作用。但在对接项目,可能就会出现对接方不能够配合使用MQ的情况。此时,使用线程池做限流也是一种可行的思路。

流程:

1.需手动实现一个线程池。说到线程池,要考虑的因素有:核心线程数,任务队列,最大线程数,线程空闲时间,保留策略。

①开启线程池,接受任务,每接受一个任务创建一条线程。

②当线程数达到核心线程数时,之后的任务放入任务队列中。建议使用阻塞队列,防止内存溢出。

③当任务队列饱和,会在线程池中创建额外的线程来处理任务,直至达到最大线程数。

④当在线程池中的这部分额外线程处于空闲状态,并且达到线程空闲时间的要求,这部分线程会被销毁。

⑤当达到最大线程数,依然有后续的任务要处理,此时就要对这部分任务的去留做出决策。提供三种保留策略:

Ⅰ.直接丢弃,不予处理。

Ⅱ.开辟脱离线程池的线程来处理。

Ⅲ.将任务队列中等待时间久的任务丢弃,加入后续任务。

2.请求限流,先要了解server的运行原理

①服务端需要有一个监听器用来监听请求连接。当客户端发送来一个请求,服务端会先和客户端建立tcp连接。

②开辟一条线程用来单独处理这条tcp连接中发送来的http请求,直至http请求读取完毕,返回响应。我们要执行请求限流的操作便在此处进行,详细操作看代码。

//线程池
package myroutine

import (
	"fmt"
	"strconv"
)

/**
 * @ Author      : jgbb
 * @ Date        : Created in 2019/9/4 13:19
 * @ Description : TODO 线程池
 * @ Modified by :
 * @ Version     : 1.0
 */

func Init(poolSize int,name string) *RoutinePool{
	pool := &RoutinePool{
		Queue:make(chan func()),
		PoolSize:poolSize,
		Name:name,
	}
	defer pool.ExeTask()
	return pool
}

type RoutinePool struct {
	//缓存任务
	Queue chan func()
	PoolSize int
	Name string
}


// 添加任务到线程池
func (pool *RoutinePool) AddTask(task func()){
	pool.Queue <- task
}

//执行任务
func (pool *RoutinePool) ExeTask(){
	counter := make(chan int)
	for i:=0;i<pool.PoolSize;i++ {
		go func() {
			j := <- counter//哪条线程
			var count int64= 0//计数(线程跑了多少次)
			var stdout =pool.Name+"	线程"+strconv.Itoa(j)+"	"
			for task := range pool.Queue{
				count++
				fmt.Printf("%p	%s
",pool,stdout+strconv.FormatInt(count,10))

				task()
			}
		}()

		counter <- i
	}
}

  

//对server源码修改
const(
DefaultPoolSize = 10
)
//每个请求对应的线程池
var PoolMap  = make(map[string]*myroutine.RoutinePool)

//golang源码
func (srv *Server) Serve(l net.Listener) error {
	defer l.Close()
	if fn := testHookServerServe; fn != nil {
		fn(srv, l)
	}
	var tempDelay time.Duration // how long to sleep on accept failure

	if err := srv.setupHTTP2_Serve(); err != nil {
		return err
	}

	srv.trackListener(l, true)
	defer srv.trackListener(l, false)

	baseCtx := context.Background() // base is always background, per Issue 16220
	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
	for {
		rw, e := l.Accept()
		if e != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
			if ne, ok := e.(net.Error); ok && ne.Temporary() {
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond
				} else {
					tempDelay *= 2
				}
				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				}
				srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
				time.Sleep(tempDelay)
				continue
			}
			return e
		}
		tempDelay = 0
		c := srv.newConn(rw)
                
                /***********************修改开始*******************************/
		//将c.server(ctx)的处理过程放入线程池中
		//首先需要请求path,根据path获取对应的线程池
		c.r = &connReader{conn: c}
		c.r.setReadLimit(c.server.initialReadLimitSize()) //若不setReadLimit,无法读取到缓冲流中的数据
		c.bufr = newBufioReader(c.r)//用来读取流
		s,err := c.bufr.Peek(100)//缓冲流使用peek(),游标不会进行计数,这样才能流中的数据在后面的处理中复用。否则后续读取流会从游标开始
		news := make([]byte,0)
		for i:=0;i<100;i++ {
			news = append(news,s[i])
			if s[i] == 10 {
                                //10表示换行符,到此获取到所需信息
				break
			}
		}
		if err != nil {
			fmt.Errorf("my err:%v",err)
		}
		newss := string(news)
                //请求path当作线程池名称
		poolName := newss[strings.Index(newss,"/"):strings.LastIndex(newss," ")]


		c.setState(c.rwc, StateNew) // before Serve can return
		//go c.serve(ctx) //源码

		//放入线程池处理请求
		putPoolMap(poolName).AddTask(func() {
			c.serve(ctx)
		})
                /***********************修改结束*******************************/
	}
}

//生成线程池
//-参数1:线程池大小
//-参数2:线程池名称
func PutPoolMap(poolSize int,name string) *myroutine.RoutinePool{
	if _,ok := PoolMap[name]; !ok {
                //如果不存在对应的线程池,则生成一个
		PoolMap[name] = myroutine.Init(poolSize,name)
	}
        //返回对应的线程池
	return PoolMap[name]
}

//默认使用此方法生成线程池
//-参数1:线程池名称
func putPoolMap(name string) *myroutine.RoutinePool{
	return PutPoolMap(DefaultPoolSize,name)
}        

  

原文地址:https://www.cnblogs.com/asceticmonks/p/13266310.html