案例:打造一個執行器
Rust 的 Future
具有惰性:除非積極驅動到完成,不然不會做任何事。驅使 future 完成的方法之一是在 async
函式中用 .await
等待它,但這只會將問題推向更高的層次:誰來跑最上層 async
函式返回的 future?答案揭曉,我們需要一個 Future
執行器(executor)。
Future
執行器會拿一群最上層的 Future
並在它們可有所進展時呼叫 poll
執行它們一直到完成。通常一個執行器會 poll
一個 future 一次作為開始。當 Future
透過呼叫 wake()
表示它們已就緒並可以有所進展時,Future
會被放回佇列並再度被呼叫 poll
,不斷重複直到該 Future
完成了。
在這一部分,我們將編寫自己的簡易執行器,能夠並行執行一大群最上層的 future 直到完成。
這個示例中,我們會依賴 futures
crate 的 ArcWake
trait,這個 trait 提供一個簡便構建 Waker
的方法。
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures-preview = "=0.3.0-alpha.17"
接下來,我們需要在 src/main.rs
加上一些 import。
# #![allow(unused_variables)] #fn main() { use { futures::{ future::{FutureExt, BoxFuture}, task::{ArcWake, waker_ref}, }, std::{ future::Future, sync::{Arc, Mutex}, sync::mpsc::{sync_channel, SyncSender, Receiver}, task::{Context, Poll}, time::Duration, }, // The timer we wrote in the previous section: timer_future::TimerFuture, }; #}
我們的執行器工作模式是發送任務到通道上(channel)。這個執行器會從通道拉取事件並執行。當一個任務已就緒將執行更多工作(被喚醒),它就可將自己放回通道中,以此方式自我排程來等待再次被輪詢。
在這個設計下,執行器僅需要任務通道的接收端。使用者則會取得發送端,並藉由發送端產生(spawn)新的 future。而任務本身實際上是可自我排程的 future,所以我們將任務自身的結構設計為一個 future,加上一個可讓任務重新加入佇列的通道發送端。
# #![allow(unused_variables)] #fn main() { /// Task executor that receives tasks off of a channel and runs them. struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner` spawns new futures onto the task channel. #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// A future that can reschedule itself to be polled by an `Executor`. struct Task { /// In-progress future that should be pushed to completion. /// /// The `Mutex` is not necessary for correctness, since we only have /// one thread executing tasks at once. However, Rust isn't smart /// enough to know that `future` is only mutated from one thread, /// so we need use the `Mutex` to prove thread-safety. A production /// executor would not need this, and could use `UnsafeCell` instead. future: Mutex<Option<BoxFuture<'static, ()>>>, /// Handle to place the task itself back onto the task queue. task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // Maximum number of tasks to allow queueing in the channel at once. // This is just to make `sync_channel` happy, and wouldn't be present in // a real executor. const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender}) } #}
我們也在產生器(spawner)新增一個方法,使其產生新 future 更便利。這個方法需要傳入一個 future 型別,會將它 box 起來再放入新建立的 Arc<Task>
中,以便於將 future 加入 executor 的佇列。
# #![allow(unused_variables)] #fn main() { impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("too many tasks queued"); } } #}
欲輪詢 future,我們需要建立 Waker
。在任務喚醒一節中討論過,讓任務 wake
被呼叫時的輪詢排程是 Waker
的職責所在。記住這一點,Waker
會告知執行器確切已就緒的任務為何,並允許執行器輪詢這些已就緒可有所進展的 future。建立一個新 Waker
最簡單的作法是透過實作 ArcWake
trait,並使用 waker_ref
或 .into_waker()
函式將 Arc<impl ArcWake>
型別轉換為 Waker
。現在我們來替任務實作 ArcWake
,允許它們可以轉換成 Waker
並可以被喚醒:
# #![allow(unused_variables)] #fn main() { impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Implement `wake` by sending this task back onto the task channel // so that it will be polled again by the executor. let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); } } #}
當 Waker
從一個 Arc<Task>
建立後,呼叫它的 wake()
會發送一個 Arc
副本到任務通道上。我們的執行器就會取出並輪詢該任務。實作如下:
# #![allow(unused_variables)] #fn main() { impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // Take the future, and if it has not yet completed (is still Some), // poll it in an attempt to complete it. let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // Create a `LocalWaker` from the task itself let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>` is a type alias for // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`. // We can get a `Pin<&mut dyn Future + Send + 'static>` // from it by calling the `Pin::as_mut` method. if let Poll::Pending = future.as_mut().poll(context) { // We're not done processing the future, so put it // back in its task to be run again in the future. *future_slot = Some(future); } } } } } #}
恭喜啊!我們現在有一個能動的 future 執行器。我們甚至可以用它來執行
async/.await
程式碼或是親手打造的 future,像是之前寫的 TimerFuture
。
fn main() { let (executor, spawner) = new_executor_and_spawner(); // Spawn a task to print before and after waiting on a timer. spawner.spawn(async { println!("howdy!"); // Wait for our timer future to complete after two seconds. TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // Drop the spawner so that our executor knows it is finished and won't // receive more incoming tasks to run. drop(spawner); // Run the executor until the task queue is empty. // This will print "howdy!", pause, and then print "done!". executor.run(); }