golang 典型并发任务

仅执行一次

场景

懒汉式,线程安全

适用于只执行一次的任务,比如加载配置文件。

code

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	var once sync.Once
	for i := 0; i < 10; i++ {
		once.Do(func() {
			num := rand.Intn(10)
			fmt.Println(num)
		})
	}
}

仅需任意任务完成

场景

这里所有任务都完成了,但是只用了最快的一个结果,所以是所有任务都完成了;

当有一个任务完成时,取消其他任务,因为任务都是有开销的。

code

package main

import (
	"fmt"
	"runtime"
	"time"
)

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func firstResponse() string {
	numOfRunner := 10
	// 使用带缓存的channel,让goroutines不会堵塞
	ch := make(chan string, numOfRunner)
	for i := 0; i < numOfRunner; i++ {
		go func(i int) {
			ret := runTask(i)
			ch <- ret
		}(i)
	}
	return <-ch
}

func main() {
	fmt.Println("Before:", runtime.NumGoroutine())
	fmt.Println(firstResponse())
	time.Sleep(time.Second * 1)
	fmt.Println("After:", runtime.NumGoroutine())

}

所有任务都完成

基于CSP实现

package main

import (
	"fmt"
	"sync"
)

func main() {
	var mutex sync.Mutex
	max := 10000
	ch := make(chan int, max)
	for i := 0; i < max; i++ {
		go func() {
			mutex.Lock()
			ch <- 1
			defer func() {
				mutex.Unlock()
			}()
		}()
	}
	counter := 0
	for i := 0; i < max; i++ {
		counter += <-ch
	}
	fmt.Println("counter:", counter)
}

waitgroup实现

package main

import (
	"fmt"
	"sync"
)

func main() {
	var mutex sync.Mutex
	var wg sync.WaitGroup
	counter := 0
	for i := 0; i < 10000; i++ {
		wg.Add(1) // 每启动一个协程都新增加一个等待
		go func() {
			mutex.Lock()
			defer func() {
				mutex.Unlock()
				wg.Done()
			}()
			counter++
		}(i)
	}
	wg.Wait()
	fmt.Println("counter:", counter)
}

对象池

适合通过复用降低复杂对象的创建和GC的代价

协程安全,会有锁的开销

生命周期受GC影响,不适合做连接池等需要自己管理生命周期的资源的池化。

code

基于buffered channel实现对象池,取用完后放回channel

package object_pool

import (
	"errors"
	"fmt"
	"testing"
	"time"
)

type ReusableObject struct {
	token int
}

type ObjectPool struct {
	bufChan chan *ReusableObject //用于缓冲可重用对象
}

func NewObjectPool(numOfObject int) *ObjectPool {
	objectPool := ObjectPool{}
	objectPool.bufChan = make(chan *ReusableObject, numOfObject)
	for i := 0; i < numOfObject; i++ {
		objectPool.bufChan <- &ReusableObject{
			token: i,
		}
	}
	return &objectPool
}

func (pool *ObjectPool) GetObject(timeout time.Duration) (*ReusableObject, error) {
	select {
	case ret := <-pool.bufChan:
		return ret, nil
	case <-time.After(timeout): //超时控制
		return nil, errors.New("time out")
	}

}

func (pool *ObjectPool) ReleaseObject(object *ReusableObject) error {
	select {
	case pool.bufChan <- object:
		return nil
	default:
		return errors.New("overflow")
	}
}

func TestObjPool(t *testing.T) {
	pool := NewObjectPool(10)
	// 创建对象池后,对象池是满的
	if err := pool.ReleaseObject(&ReusableObject{}); err != nil { //尝试放置超出池大小的对象
		t.Error(err)
	}
	for i := 0; i < 11; i++ {
		if v, err := pool.GetObject(time.Second); err != nil {
			t.Error(err)
		} else {
			fmt.Printf("%T %d
", v, v.token)
			if err := pool.ReleaseObject(v); err != nil {
				t.Error(err)
			}
		}

	}

	fmt.Println("Done")
}

sync.pool 对象生命周期

  • gc会清除sync.pool缓存的对象

  • 对象的有效期是下次gc前 --> gc 执行的时机是什么?

带来的思考

每次获取对象,可能会受锁的限制,所以是创建对象的开销大,还是锁带来的开销大需要根据实际情况权衡。

code

package main

import (
	"fmt"
	"runtime"
	"sync"
)

func SyncPool() {
	pool := &sync.Pool{
        // 当对象池为空时,调用get时会自动New创建一个新的对象,可以理解为默认对象
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int)
	fmt.Println(v)
	pool.Put(3)
	runtime.GC() //GC 会清除sync.pool中缓存的对象
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}

func SyncPoolInMultiGoroutine() {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(1)
	pool.Put(2)
	pool.Put(3)
	pool.Put(4)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			fmt.Println(pool.Get())
			wg.Done()
		}(i)
	}
	wg.Wait()
}

func main() {
	//SyncPool()
	SyncPoolInMultiGoroutine()
}
原文地址:https://www.cnblogs.com/hiyang/p/13055137.html