Rust 并发编程

https://www.jianshu.com/p/f4d853c0ef1e

在并发编程领域,一个非常让程序员兴奋,感到有成就感的事情就是做性能优化,譬如发现某个线程成为了单点瓶颈,然后上多线程。

提到了上多线程,那自然就会引入 thread pool,也就是我们通常说的线程池,我们会将任务扔给线程池,然后线程池里面自己会负责将任务派发到不同的线程去执行,除开任务自身执行的开销,如何高效的派发也会决定一个线程池是否有足够好的性能。下面,我们就来聊聊几种常见的线程池的实现。

Mutex + channel

在 Rust 里面,我们可以通过标准库提供的 channel 进行通讯,但 channel 其实是一个 multi-producer,single-consumer 的结构,也就是我们俗称的 MPSC。但对于线程池来说,我们需要的是一个 MPMC 的 channel,也就是说,我们需要有一个队列,这个队列可以支持多个线程同时添加,同时获取任务。

虽然单独的 channel 没法支持,但如果我们给 channel 的 Receiver 套上一个 Mutex,在加上 Arc,其实就可以了。通过 Mutex 我们能保证多个线程同时只能有一个线程抢到 lock,然后从队列里面拿到数据。而加上 Arc 主要是能在多个线程共享了,这里就不说明了。

所以实现也就比较简单了,如下:

pub struct ThreadPool {
    tx: Option<Sender<Task>>,
    handlers: Option<Vec<thread::JoinHandle<()>>>,
}
impl ThreadPool {
    pub fn new(number: usize) -> ThreadPool {
        let (tx, rx) = channel::<Task>();
        let mut handlers = vec![];

        let arx = Arc::new(Mutex::new(rx));
        for _ in 0..number {
            let arx = arx.clone();
            let handle = thread::spawn(move || {
                while let Ok(task) = arx.lock().unwrap().recv() {
                    task.call_box();
                }
            });

            handlers.push(handle);
        }

        ThreadPool {
            tx: Some(tx),
            handlers: Some(handlers),
        }
    }
}

Task 其实就是一个 FnBox,因为只有 nightly 版本支持 FnBox,所以我们自定义了一下

pub trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

pub type Task = Box<FnBox + Send>;

上面的代码逻辑非常的简单,创建一个 channel,然后使用 Arc + Mutex 包上 Receiver,创建多个线程,每个线程尝试去获取 channel 任务然后执行,如果 channel 里面没任务,recv 哪里就会等着,而其他的线程这时候因为没法拿到 lock 也会等着。

Condition Variable

抛开 channel,我们还有一种更通用的做法,可以用在不同的语言,譬如 C 上面,也就是使用 condition variable。关于 condition variable 的使用,大家可以 Google,因为在使用 condition variable 的时候,都会配套有一个 Mutex,所以我们可以通过这个 Mutex 同时控制 condition variable 以及任务队列。

首先我们定义一个 State,用来处理任务队列

struct State {
    queue: VecDeque<Task>,
    stopped: bool,
}

对于不同线程获取任务,我们可以通过

fn next_task(notifer: &Arc<(Mutex<State>, Condvar)>) -> Option<Task> {
    let &(ref lock, ref cvar) = &**notifer;
    let mut state = lock.lock().unwrap();
    loop {
        if state.stopped {
            return None;
        }
        match state.queue.pop_front() {
            Some(t) => {
                return Some(t);
            }
            None => {
                state = cvar.wait(state).unwrap();
            }
        }
    }
}

首先就是尝试用 Mutex 拿到 State,如果外面没有结束,那么就尝试从队列里面获取任务,如果没有,就调用 Condition Variable 的 wait 进行等待了。

任务的添加也比较简单

let &(ref lock, ref cvar) = &*self.notifer;
{
    let mut state = lock.lock().unwrap();
    state.queue.push_back(task);
    cvar.notify_one();
}

也是通过 lock 拿到 State,然后放到队列里面,在通知 Condition Variable。对于线程池的创建,也是比较容易的:

let s = State {
    queue: VecDeque::with_capacity(1024),
    stopped: false,
};
let notifer = Arc::new((Mutex::new(s), Condvar::new()));
for _ in 0..number {
    let notifer = notifer.clone();
    let handle = thread::spawn(move || {
        while let Some(task) = next_task(&notifer) {
            task.call_box();
        }
    });

    handlers.push(handle);
}

Crossbeam

上面提到的两种做法,虽然都非常的通用,但有一个明显的问题,就在于他是有全局 lock 的,在并发系统里面,lock 如果使用不当,会造成非常严重的性能开销,尤其是在出现 contention 的时候,所以多数时候,我们希望使用的是一个 lock-free 的数据结构。

幸运的是,在 Rust 里面,已经有一个非常稳定的库来提供相关的支持了,这个就是 crossbeam,关于 crossbeam 的相关知识,后面可以再开一篇文章来详细说明,这里我们直接使用 crossbeam 的 channel,不同于标准库的 channel,crossbeam 的 channel 是一个 MPMC 的实现,所以我们能非常方便的用到线程池上面,简单代码如下:

let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];

for _ in 0..number {
    let rx = rx.clone();
    let handle = thread::spawn(move || {
        while let Some(task) = rx.recv() {
            task.call_box();
        }
    });

    handlers.push(handle);
}

可以看到,crossbeam 的 channel 使用比标准库的更简单,它甚至不需要 Arc 来包一层,而且还是 lock-free 的。

参考这个 benchmark,分别对不同的 ThreadPool 进行测试,在我的机器上面会发现 crossbeam 的性能会明显好很多,标准库 channel 其次,最后才是 condition variable。

test thread_pool::benchmark_condvar_thread_pool           ... bench: 128,924,340 ns/iter (+/- 39,853,735)
test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench:   1,497,272 ns/iter (+/- 355,120)
test thread_pool::benchmark_std_channel_thread_pool       ... bench:  50,925,087 ns/iter (+/- 6,753,377)

Channel Per-thread

可以看到,使用 crossbeam 的效果已经非常好了,但这种实现其实还有一个问题,主要在于它有一个全局的队列,当并发严重的时候,多个线程对这个全局队列的争抢,可能成为瓶颈。另外,还有一个问题在于,它的派发机制是任意的,也就是那个线程抢到了任务就执行,在某些时候,我们希望一些任务其实是在某个线程上面执行的,这样对于 CPU 的 cache 来说会更加友好,譬如有一个任务在执行的时候,又会产生一个后续任务,自然,我们希望这个后续任务在同一个线程执行。

为了解决上面的问题,最直观的做法就是每个线程一个队列,这样我们就能够显示的控制任务派发了。一个非常简单的例子

let mut handlers = vec![];
let mut txs = vec![];

for _ in 0..number {
    let (tx, rx) = channel::unbounded::<Task>();
    let handle = thread::spawn(move || {
        while let Some(task) = rx.recv() {
            task.call_box();
        }
    });

    txs.push(tx);
    handlers.push(handle);
}

上面我们为每个线程创建了一个 channel,这样每个线程就不用去争抢全局的 channel 了。

派发的时候我们也可以手动派发,譬如根据某个 ID hash 到一个对应的 thread 上面,通过 Sender 发送 消息。

Work Stealing

虽然每个线程一个 channel 解决了全局争抢问题,也提升了 CPU cache 的使用,但它引入了另一个问题,就是任务的不均衡。直观的来说,就是会导致某些线程一直忙碌,在不断的处理任务,而另一些线程则没有任务处理,一直很闲。为了解决这个问题,就有了 Work Stealing 的线程池。

Work Stealing 的原理其实很简单,当一个线程执行完自己线程队列里面的所有任务之后,它会尝试去其它线程的队列里面偷一点任务执行。

因为 Work Stealing 的实现过于复杂,这里就不描述了,Rust 的 tokio 库提供了一个 tokio-threadpool,就是基于 Work Stealing 来做的,不过现在只提供了 Future 的支持。

小结

上面简单的列举了一些线程池的实现方式,如果你只是单纯的想用一个比较简单的派发功能,基于 crossbeam 的就可以了,复杂一点的可以使用 Work Stealing 的。当然,这里只是大概列举了一些,如果有更好的实现,麻烦跟我联系讨论,我的邮箱 tl@pingcap.com



作者:siddontang
链接:https://www.jianshu.com/p/f4d853c0ef1e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
原文地址:https://www.cnblogs.com/dhcn/p/14255799.html