Channels

A channel is an easy way to use many threads that send to one place.它们相当流行,因为它们很容易组合在一起。你可以在Rust中用std::sync::mpsc创建一个channel。mpsc的意思是 "多个生产者,单个消费者",所以 "many threads sending to one place"。要启动一个通道,你可以使用 channel()。这将创建一个 Sender 和一个 Receiver,它们被绑在一起。你可以在函数签名中看到这一点。


#![allow(unused)]
fn main() {
// 🚧
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
}

所以你要选择一个发送者的名字和一个接收者的名字。通常你会看到像let (sender, receiver) = channel();这样的开头。因为它是泛型函数,如果你只写这个,Rust不会知道类型。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel(); // ⚠️
}

编译器说:

error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
  --> src\main.rs:30:30
   |
30 |     let (sender, receiver) = channel();
   |         ------------------   ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
   |         |
   |         consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified

它建议为SenderReceiver添加一个类型。如果你愿意的话,可以这样做:

use std::sync::mpsc::{channel, Sender, Receiver}; // Added Sender and Receiver here

fn main() {
    let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel();
}

但你不必这样做: 一旦你开始使用SenderReceiver,Rust就能猜到类型。

所以我们来看一下最简单的使用通道的方法。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5);
    receiver.recv(); // recv = receive, not "rec v"
}

现在编译器知道类型了。senderResult<(), SendError<i32>>receiverResult<i32, RecvError>。所以你可以用.unwrap()来看看发送是否有效,或者使用更好的错误处理。我们加上.unwrap(),也加上println!,看看得到什么。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5).unwrap();
    println!("{}", receiver.recv().unwrap());
}

这样就可以打印出5

channel就像Arc一样,因为你可以克隆它,并将克隆的内容发送到其他线程中。让我们创建两个线程,并将值发送到receiver。这段代码可以工作,但它并不完全是我们想要的。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();

    std::thread::spawn(move|| { // move sender in
        sender.send("Send a &str this time").unwrap();
    });

    std::thread::spawn(move|| { // move sender_clone in
        sender_clone.send("And here is another &str").unwrap();
    });

    println!("{}", receiver.recv().unwrap());
}

两个线程开始发送,然后我们println!。它可能会打印 Send a &str this timeAnd here is another &str,这取决于哪个线程先完成。让我们创建一个join句柄来等待它们完成。

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![]; // Put our handles in here

    handle_vec.push(std::thread::spawn(move|| {  // push this into the vec
        sender.send("Send a &str this time").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {  // and push this into the vec
        sender_clone.send("And here is another &str").unwrap();
    }));

    for _ in handle_vec { // now handle_vec has 2 items. Let's print them
        println!("{:?}", receiver.recv().unwrap());
    }
}

这个将打印:

"Send a &str this time"
"And here is another &str"

现在我们不打印,我们创建一个results_vec

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![];
    let mut results_vec = vec![];

    handle_vec.push(std::thread::spawn(move|| {
        sender.send("Send a &str this time").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {
        sender_clone.send("And here is another &str").unwrap();
    }));

    for _ in handle_vec {
        results_vec.push(receiver.recv().unwrap());
    }

    println!("{:?}", results_vec);
}

现在结果在我们的vec中:["Send a &str this time", "And here is another &str"]

现在让我们假设我们有很多工作要做,并且想要使用线程。我们有一个大的VEC,里面有1百万个元素,都是0,我们想把每个0都变成1,我们将使用10个线程,每个线程将做十分之一的工作。我们将创建一个新的VEC,并使用.extend()来收集结果。

use std::sync::mpsc::channel;
use std::thread::spawn;

fn main() {
    let (sender, receiver) = channel();
    let hugevec = vec![0; 1_000_000];
    let mut newvec = vec![];
    let mut handle_vec = vec![];

    for i in 0..10 {
        let sender_clone = sender.clone();
        let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // new vec to put the work in. 1/10th the size
        work.extend(&hugevec[i*100_000..(i+1)*100_000]); // first part gets 0..100_000, next gets 100_000..200_000, etc.
        let handle =spawn(move || { // make a handle

            for number in work.iter_mut() { // do the actual work
                *number += 1;
            };
            sender_clone.send(work).unwrap(); // use the sender_clone to send the work to the receiver
        });
        handle_vec.push(handle);
    }

    for handle in handle_vec { // stop until the threads are done
        handle.join().unwrap();
    }

    while let Ok(results) = receiver.try_recv() {
        newvec.push(results); // push the results from receiver.recv() into the vec
    }

    // Now we have a Vec<Vec<u8>>. To put it together we can use .flatten()
    let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // Now it's one vec of 1_000_000 u8 numbers

    println!("{:?}, {:?}, total length: {}", // Let's print out some numbers to make sure they are all 1
        &newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // And show that the length is 1_000_000 items
    );

    for number in newvec { // And let's tell Rust that it can panic if even one number is not 1
        if number != 1 {
            panic!();
        }
    }
}