Channels
A channel is an easy way to use many threads that send to one place.它们相当流行,因为它们很容易组合在一起。你可以在Rust中用std::sync::mpsc
创建一个channel。mpsc
的意思是 "多个生产者,单个消费者",所以 "many threads sending to one place"。要启动一个通道,你可以使用 channel()
。这将创建一个 Sender
和一个 Receiver
,它们被绑在一起。你可以在函数签名中看到这一点。
#![allow(unused)] fn main() { // 🚧 pub fn channel<T>() -> (Sender<T>, Receiver<T>) }
所以你要选择一个发送者的名字和一个接收者的名字。通常你会看到像let (sender, receiver) = channel();
这样的开头。因为它是泛型函数,如果你只写这个,Rust不会知道类型。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); // ⚠️ }
编译器说:
error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
--> src\main.rs:30:30
|
30 | let (sender, receiver) = channel();
| ------------------ ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
| |
| consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified
它建议为Sender
和Receiver
添加一个类型。如果你愿意的话,可以这样做:
use std::sync::mpsc::{channel, Sender, Receiver}; // Added Sender and Receiver here fn main() { let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel(); }
但你不必这样做: 一旦你开始使用Sender
和Receiver
,Rust就能猜到类型。
所以我们来看一下最简单的使用通道的方法。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); sender.send(5); receiver.recv(); // recv = receive, not "rec v" }
现在编译器知道类型了。sender
是Result<(), SendError<i32>>
,receiver
是Result<i32, RecvError>
。所以你可以用.unwrap()
来看看发送是否有效,或者使用更好的错误处理。我们加上.unwrap()
,也加上println!
,看看得到什么。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); sender.send(5).unwrap(); println!("{}", receiver.recv().unwrap()); }
这样就可以打印出5
。
channel
就像Arc
一样,因为你可以克隆它,并将克隆的内容发送到其他线程中。让我们创建两个线程,并将值发送到receiver
。这段代码可以工作,但它并不完全是我们想要的。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); let sender_clone = sender.clone(); std::thread::spawn(move|| { // move sender in sender.send("Send a &str this time").unwrap(); }); std::thread::spawn(move|| { // move sender_clone in sender_clone.send("And here is another &str").unwrap(); }); println!("{}", receiver.recv().unwrap()); }
两个线程开始发送,然后我们println!
。它可能会打印 Send a &str this time
或 And here is another &str
,这取决于哪个线程先完成。让我们创建一个join句柄来等待它们完成。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); let sender_clone = sender.clone(); let mut handle_vec = vec![]; // Put our handles in here handle_vec.push(std::thread::spawn(move|| { // push this into the vec sender.send("Send a &str this time").unwrap(); })); handle_vec.push(std::thread::spawn(move|| { // and push this into the vec sender_clone.send("And here is another &str").unwrap(); })); for _ in handle_vec { // now handle_vec has 2 items. Let's print them println!("{:?}", receiver.recv().unwrap()); } }
这个将打印:
"Send a &str this time"
"And here is another &str"
现在我们不打印,我们创建一个results_vec
。
use std::sync::mpsc::channel; fn main() { let (sender, receiver) = channel(); let sender_clone = sender.clone(); let mut handle_vec = vec![]; let mut results_vec = vec![]; handle_vec.push(std::thread::spawn(move|| { sender.send("Send a &str this time").unwrap(); })); handle_vec.push(std::thread::spawn(move|| { sender_clone.send("And here is another &str").unwrap(); })); for _ in handle_vec { results_vec.push(receiver.recv().unwrap()); } println!("{:?}", results_vec); }
现在结果在我们的vec中:["Send a &str this time", "And here is another &str"]
。
现在让我们假设我们有很多工作要做,并且想要使用线程。我们有一个大的VEC,里面有1百万个元素,都是0,我们想把每个0都变成1,我们将使用10个线程,每个线程将做十分之一的工作。我们将创建一个新的VEC,并使用.extend()
来收集结果。
use std::sync::mpsc::channel; use std::thread::spawn; fn main() { let (sender, receiver) = channel(); let hugevec = vec![0; 1_000_000]; let mut newvec = vec![]; let mut handle_vec = vec![]; for i in 0..10 { let sender_clone = sender.clone(); let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // new vec to put the work in. 1/10th the size work.extend(&hugevec[i*100_000..(i+1)*100_000]); // first part gets 0..100_000, next gets 100_000..200_000, etc. let handle =spawn(move || { // make a handle for number in work.iter_mut() { // do the actual work *number += 1; }; sender_clone.send(work).unwrap(); // use the sender_clone to send the work to the receiver }); handle_vec.push(handle); } for handle in handle_vec { // stop until the threads are done handle.join().unwrap(); } while let Ok(results) = receiver.try_recv() { newvec.push(results); // push the results from receiver.recv() into the vec } // Now we have a Vec<Vec<u8>>. To put it together we can use .flatten() let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // Now it's one vec of 1_000_000 u8 numbers println!("{:?}, {:?}, total length: {}", // Let's print out some numbers to make sure they are all 1 &newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // And show that the length is 1_000_000 items ); for number in newvec { // And let's tell Rust that it can panic if even one number is not 1 if number != 1 { panic!(); } } }