go goroutine channel 和C# Task BlockingCollection 以及python该如何实现

首先说结论吧,个人感觉go的goroutine 和C# 的Task 相似,goroutine 和Task 可以近似理解为逻辑线程, 至于多个goroutine 或Task 对应操作系统几个物理线程 是底层决定的,我们可以不用太关心;但是一定是多对多【这个我们可以简单理解多对一, 一个或多个goroutine 或Task 对应底层一个物理线程】, 具体的blockingcollection可以参考 https://blog.csdn.net/ma_jiang/article/details/54561684, go channel 可以参考https://blog.csdn.net/ma_jiang/article/details/84497607

channel 和BlockingCollection 可以再多线程之间通信,尤其是在同步通信 都是运用它们阻塞的特定来做了。比如常见的接力赛: 使用无缓冲的通道,在 goroutine 之间同步数据,来模拟接力比赛。在接力比赛里,4 个跑步者围绕赛道轮流跑。第二个、第三个和第四个跑步者要接到前一位跑步者的接力棒后才能起跑。比赛中最重要的部分是要传递接力棒,要求同步传递。在同步接力棒的时候,参与接力的两个跑步者必须在同一时刻准备好交接。

go的代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

// wg 用来等待程序结束
var wg sync.WaitGroup

// main 是所有Go 程序的入口
func main() {
    // 创建一个无缓冲的通道
    baton := make(chan int)
    // 为最后一位跑步者将计数加1
    wg.Add(1)
    // 第一位跑步者持有接力棒
    go Runner(baton)
    // 开始比赛
    baton <- 1
    // 等待比赛结束
    wg.Wait()
}

// Runner 模拟接力比赛中的一位跑步者
func Runner(baton chan int) {
    var newRunner int
    // 等待接力棒
    runner := <-baton
    // 开始绕着跑道跑步
    fmt.Printf("Runner %d Running With Baton
", runner)
    // 创建下一位跑步者
    if runner != 4 {
        newRunner = runner + 1
        fmt.Printf("Runner %d To The Line
", newRunner)
        go Runner(baton)
    }
    // 围绕跑道跑
    time.Sleep(100 * time.Millisecond)
    // 比赛结束了吗?
    if runner == 4 {
        fmt.Printf("Runner %d Finished, Race Over
", runner)
        wg.Done()
        return
    }
    // 将接力棒交给下一位跑步者
    fmt.Printf("Runner %d Exchange With Runner %d
",
        runner,
        newRunner)
    baton <- newRunner
}

C#代码:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace demo
{
    class Program
    {
        static void Main(string[] args)
        {
            // 创建一个无缓冲的通道
            var baton =new BlockingCollection<int>(1);
            // 第一位跑步者持有接力棒
            Task.Factory.StartNew(x => Runner((BlockingCollection<int>)x), baton);
            baton.Add(1);
            while(!baton.IsCompleted){
                Thread.Sleep(1000);
            }
            Console.Read();
        }
        static void Runner(BlockingCollection<int> baton){
           int  newRunner=0 ;
            // 等待接力棒 
            int runner=baton.Take();
            // 开始绕着跑道跑步
            Console.WriteLine($"Runner {runner} Running With Baton");
            // 创建下一位跑步者
            if (runner!=4){
                newRunner=runner+1;
                Console.WriteLine($"Runner {runner} To The Line");
                Task.Factory.StartNew(x=>Runner((BlockingCollection<int>)x),baton);
            }
            // 围绕跑道跑
            Thread.Sleep(100);
            // 比赛结束了吗?
            if(runner==4){
                Console.WriteLine($"Runner {runner} Finished, Race Over");
                 baton.CompleteAdding();
                return;
            }
            Console.WriteLine($"Runner {runner} Exchange With Runner {newRunner}");
            baton.Add(newRunner);

        }
    }
}

运行结果:

Python, 首先python 有队列queue 但是它不是线程安全的, 也没有阻塞的功能, 因此 我们需要自己实现一个 线程安全的队列, 并且具有阻塞功能threadSafeQueue.py 如下:

import time
import threading
 
# 线程安全的队列
class ThreadSafeQueue(object):
 
    def __init__(self, max_size=0):
        self.queue = []
        self.max_size = max_size
        self.lock = threading.Lock()
        self.condition = threading.Condition()
 
    # 当前队列元素的数量
    def size(self):
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size
 
    # 往队列里面放入元素
    def put(self, item):
        if self.max_size != 0 and self.size() > self.max_size:
            return Exception()
        self.lock.acquire()
        self.queue.append(item)
        self.lock.release()
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()
        pass
 
    def batch_put(self, item_list):
        if not isinstance(item_list, list):
            item_list = list(item_list)
        for item in item_list:
            self.put(item)
 
    # 从队列取出元素
    def pop(self, block=True, timeout=None):
        if self.size() == 0:
            # 需要阻塞等待
            if block:
                self.condition.acquire()
                self.condition.wait(timeout=timeout)
                self.condition.release()
            else:
                return None
        self.lock.acquire()
        item = None
        if len(self.queue) > 0:
            item = self.queue.pop()
        self.lock.release()
        return item
 
    def get(self, index):
        self.lock.acquire()
        item = self.queue[index]
        self.lock.release()
        return item

调用如下:

from threadSafeQueue import ThreadSafeQueue
import threading
import time
 
 
def Runner(baton):
    newRunner=0
    runner=baton.pop()
    print("Runner %s Running With Baton" % runner)
    if runner!=4:
        newRunner=int(runner)+1
        print("Runner %d To The Line" % runner)
        t=threading.Thread(target=Runner,args=(baton,))
        t.start()
    time.sleep(10)
    if runner==4:
        print("Runner %d Finished, Race Over" %runner)
        return
    print("Runner %d Exchange With Runner %d" %(runner,newRunner))
    baton.put(newRunner)
 
 
 
if __name__ == '__main__':
    baton=ThreadSafeQueue(1)
    baton.put(1)
    t=threading.Thread(target = Runner, args=(baton,))
    t.start()
    str1 = input()
    print(str1)

原文地址:https://www.cnblogs.com/majiang/p/14171957.html