go 并发

占位...

... 又一个模式

package main

import (
	"fmt"
	"runtime"
	"time"
)

// --------------------------- Job ---------------------
type Job interface {
	Do()
}

// --------------------------- Worker ---------------------
type Worker struct {
	JobQueue chan Job
}

func NewWorker() Worker {
	return Worker{JobQueue: make(chan Job)}
}
func (w Worker) Run(wq chan chan Job) {
	go func() {
		for {
			// 不会退出, job chan持续传给 workpool的 workqueue
			// 这里加入队列, chan job 加入 workpool的chan chan job,后边workpool有定义取出动作
			wq <- w.JobQueue // 执select行前,释放占用,这样 workpool中的chan chan job channel 增加一,触发workpool 中的select
			// 将自己的chan job 注册到workpool 中的chan chan job
			select {
			case job := <-w.JobQueue:
				job.Do()
			}
		}
	}()
}

// --------------------------- WorkerPool ---------------------
type WorkerPool struct {
	workerlen   int
	JobQueue    chan Job
	WorkerQueue chan chan Job
}

func NewWorkerPool(workerlen int) *WorkerPool {
	return &WorkerPool{
		workerlen:   workerlen,
		JobQueue:    make(chan Job),
		WorkerQueue: make(chan chan Job, workerlen),
	}
}
func (wp *WorkerPool) Run() {
	fmt.Println("初始化worker")
	//初始化worker
	for i := 0; i < wp.workerlen; i++ {
		worker := NewWorker()
		worker.Run(wp.WorkerQueue)
	}
	// 循环获取可用的worker,往worker中写job
	go func() {
		for {
			select {
			case job := <-wp.JobQueue: // 对内输出channel,接受调用者输入
				worker := <-wp.WorkerQueue // 从 chan chan job 中取出, work run中会发送补充此channel
				worker <- job              // job 就是数据,外界输入通过job Queue 传给 worker
			}
		}
	}()
}

type Score struct {
	Num int
}

func (s *Score) Do() {
	fmt.Println("num:", s.Num)
	time.Sleep(1 * 1 * time.Second)
}

func main() {
	// num := 100 * 100 * 20
	num := 10
	// debug.SetMaxThreads(num + 1000) //设置最大线程数
	// 注册工作池,传入任务
	// 参数1 worker并发个数
	p := NewWorkerPool(num)
	p.Run()
	// datanum := 100 * 100 * 100 * 100
	datanum := 15
	go func() {
		for i := 1; i <= datanum; i++ {
			sc := &Score{Num: i}
			p.JobQueue <- sc
		}
	}()

	for {
		fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
		time.Sleep(2 * time.Second)
	}

}

  

...自己写的,凑合用

package main

import (
	"log"
)
import "sync"


func main() {
	log.SetFlags(log.Llongfile | log.LstdFlags)
	var wg sync.WaitGroup
	listn := []int{}
	for i := 1; i < 6; i++ {
		listn = append(listn, i)
	}
	log.Println(listn)
	ch := make(chan int, 13)
	go func() {
		for _, v := range listn {
			log.Println("push to ch")
			ch <- v
		}
		close(ch)
	}()
	wg.Add(13)
	huzh:=5
	for i := 0; i < 13; i++ {
		go func(i int) {
			defer wg.Done()
			for v :=range ch{
				log.Println(v,huzh,i)
			}
		}(i)
	}
	wg.Wait()
}

  

from https://gobyexample.com/worker-pools

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {

    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

  

另外一个例子,生成固定数量的goroutine,这些协程从一个chan中使用for方式取数, 在排入另一个chan, 结果模块在第二个chan使用for方式取数

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Task struct {
	id      int
	randnum int
}
type Result struct {
	task   Task
	result int
}

var tasks = make(chan Task, 10)
var results = make(chan Result, 10)

func process(num int) int {
	sum := 0
	for num != 0 {
		digit := num % 10
		sum += digit
		num /= 10
	}
	//time.Sleep(2 * time.Second)
	return sum
}
func worker(wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range tasks {
		result := Result{task, process(task.randnum)}
		results <- result
	}
}
func createWorkerPool(numOfWorkers int) {
	var wg sync.WaitGroup
	//createWorkerPool生成固定的work数量,保持中...
	for i := 0; i < numOfWorkers; i++ {
		wg.Add(1)
		go worker(&wg)
	}
	wg.Wait()
	close(results)
}


func allocate(numOfTasks int) {
	//任务源,持续输出
	for i := 0; i < numOfTasks; i++ {
		randnum := rand.Intn(999)
		task := Task{i, randnum}
		//传到数量固定的channel->tasks中
		tasks <- task
	}
	close(tasks)
}
func getResult(done chan bool) {
	for result := range results {
		fmt.Printf("Task id %d, randnum %d , sum %d
", result.task.id, result.task.randnum, result.result)
	}
	done <- true
}
func main() {
	startTime := time.Now()
	numOfWorkers := 20
	numOfTasks := 100

	var done = make(chan bool)
	go getResult(done)
	go allocate(numOfTasks)
	go createWorkerPool(numOfWorkers)
	// 必须在allocate()和getResult()之后创建工作池
	<-done
	endTime := time.Now()
	diff := endTime.Sub(startTime)
	fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

  

 

...一个错误的demo

package main

import (
	"bytes"
	"fmt"
	"log"
	"os/exec"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/grd/stat"
)

/*
RunStress no comment
*/
func RunStress(wg *sync.WaitGroup, ch chan string) {
	log.Println("entry RunStress")
	wg.Add(1)
	defer wg.Done()
	cmd := exec.Command("stress", "-c", "4")
	cmd.Stdin = strings.NewReader("some input")
	var out bytes.Buffer
	cmd.Stdout = &out
	log.Println(out)
	log.Println("befor cmd.Run")
	ch <- "hello"
	err := cmd.Run()
	if err != nil {
		log.Fatal(err)
	}
}

type Process struct {
	pid int
	cpu float64
}

func CalcStressCpu(wg *sync.WaitGroup, ch chan string) {
	log.Println("entry CalcStress")
	wg.Add(1)
	defer wg.Done()
	log.Println(<-ch)
	time.Sleep(5 * time.Second)
	cmd := exec.Command("ps", "aux")
	var out bytes.Buffer
	cmd.Stdout = &out
	err := cmd.Run()
	if err != nil {
		log.Fatal(err)
	}
	processes := make([]*Process, 0)
	for {
		line, err := out.ReadString('
')
		if err != nil {
			break
		}

		if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") {
			continue
		}
		log.Println("line====>", line)

		tokens := strings.Split(line, " ")

		log.Println("tokens begin")
		log.Println("tokens====>", tokens)
		log.Println("tokens end")
		ft := make([]string, 0)
		for _, t := range tokens {
			if t != "" && t != "	" {
				ft = append(ft, t)
			}
		}

		pid, err := strconv.Atoi(ft[1])
		if err != nil {
			continue
		}
		cpu, err := strconv.ParseFloat(ft[2], 64)
		if err != nil {
			log.Fatal(err)
		}
		processes = append(processes, &Process{pid, cpu})
	}

	data := stat.Float64Slice{}
	for _, p := range processes {
		log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU")
		data = append(data, p.cpu)
	}

	variance := stat.Variance(&data)
	fmt.Printf("The estimated variance is %.4f", variance)

}

func main() {
	wg := &sync.WaitGroup{}
	ch1 := make(chan string)
	go CalcStressCpu(wg, ch1)
	go RunStress(wg, ch1)
	wg.Wait()
	time.Sleep(10 * time.Second)

}

...一个正确点的demo

package main

import (
	"bytes"
	"fmt"
	"log"
	"os/exec"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/grd/stat"
)

/*
RunStress no comment
*/
func RunStress(wg *sync.WaitGroup, ch chan string) {
	defer wg.Done()

	//kill stress for init
	cmd_stop := exec.Command("pkill", "stress")
	cmd_stop.Run()

	//start stress
	cmd := exec.Command("stress", "-c", "4")

	//start stress and notify the calc start
	ch <- "hello"
	cmd.Run()
	// if err != nil {
	// 	log.Println("recieved signal")
	// 	// log.Fatal(err)
	// }
}

type Process struct {
	pid int
	cpu float64
}

func CalcStressCpu(wg *sync.WaitGroup, ch chan string) {
	// log.Println("entry calc stress")
	defer wg.Done()

	//make sure the calc after "start stress"
	<-ch

	time.Sleep(5 * time.Second)
	cmd := exec.Command("ps", "aux")
	var out bytes.Buffer
	cmd.Stdout = &out
	err := cmd.Run()
	if err != nil {
		log.Fatal(err)
	}
	processes := make([]*Process, 0)
	for {
		line, err := out.ReadString('
')
		if err != nil {
			break
		}

		if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") {
			continue
		}

		tokens := strings.Split(line, " ")

		ft := make([]string, 0)
		for _, t := range tokens {
			if t != "" && t != "	" {
				ft = append(ft, t)
			}
		}

		pid, err := strconv.Atoi(ft[1])
		if err != nil {
			continue
		}
		cpu, err := strconv.ParseFloat(ft[2], 64)
		if err != nil {
			log.Fatal(err)
		}
		processes = append(processes, &Process{pid, cpu})
	}

	data := stat.Float64Slice{}
	for _, p := range processes {
		// log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU")
		data = append(data, p.cpu)
	}

	variance := stat.Variance(&data)
	fmt.Printf("The estimated variance is %.4f", variance)

	//after calc , stop the stress
	cmd_stop := exec.Command("pkill", "stress")
	cmd_stop.Run()

}

func main() {
	wg := &sync.WaitGroup{}
	ch1 := make(chan string)
	wg.Add(1)
	go CalcStressCpu(wg, ch1)
	wg.Add(1)
	go RunStress(wg, ch1)
	wg.Wait()

}

  

原文地址:https://www.cnblogs.com/eiguleo/p/14185966.html