案例:打造一個執行器

Rust 的 Future 具有惰性:除非積極驅動到完成,不然不會做任何事。驅使 future 完成的方法之一是在 async 函式中用 .await 等待它,但這只會將問題推向更高的層次:誰來跑最上層 async 函式返回的 future?答案揭曉,我們需要一個 Future 執行器(executor)。

Future 執行器會拿一群最上層的 Future 並在它們可有所進展時呼叫 poll 執行它們一直到完成。通常一個執行器會 poll 一個 future 一次作為開始。當 Future 透過呼叫 wake() 表示它們已就緒並可以有所進展時,Future 會被放回佇列並再度被呼叫 poll,不斷重複直到該 Future 完成了。

在這一部分,我們將編寫自己的簡易執行器,能夠並行執行一大群最上層的 future 直到完成。

這個示例中,我們會依賴 futures crate 的 ArcWake trait,這個 trait 提供一個簡便構建 Waker 的方法。

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "=0.3.0-alpha.17"

接下來,我們需要在 src/main.rs 加上一些 import。


# #![allow(unused_variables)]
#fn main() {
use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};
#}

我們的執行器工作模式是發送任務到通道上(channel)。這個執行器會從通道拉取事件並執行。當一個任務已就緒將執行更多工作(被喚醒),它就可將自己放回通道中,以此方式自我排程來等待再次被輪詢。

在這個設計下,執行器僅需要任務通道的接收端。使用者則會取得發送端,並藉由發送端產生(spawn)新的 future。而任務本身實際上是可自我排程的 future,所以我們將任務自身的結構設計為一個 future,加上一個可讓任務重新加入佇列的通道發送端。


# #![allow(unused_variables)]
#fn main() {
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender})
}
#}

我們也在產生器(spawner)新增一個方法,使其產生新 future 更便利。這個方法需要傳入一個 future 型別,會將它 box 起來再放入新建立的 Arc<Task> 中,以便於將 future 加入 executor 的佇列。


# #![allow(unused_variables)]
#fn main() {
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}
#}

欲輪詢 future,我們需要建立 Waker。在任務喚醒一節中討論過,讓任務 wake 被呼叫時的輪詢排程是 Waker 的職責所在。記住這一點,Waker 會告知執行器確切已就緒的任務為何,並允許執行器輪詢這些已就緒可有所進展的 future。建立一個新 Waker 最簡單的作法是透過實作 ArcWake trait,並使用 waker_ref.into_waker() 函式將 Arc<impl ArcWake> 型別轉換為 Waker。現在我們來替任務實作 ArcWake,允許它們可以轉換成 Waker 並可以被喚醒:


# #![allow(unused_variables)]
#fn main() {
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}
#}

Waker 從一個 Arc<Task> 建立後,呼叫它的 wake() 會發送一個 Arc 副本到任務通道上。我們的執行器就會取出並輪詢該任務。實作如下:


# #![allow(unused_variables)]
#fn main() {
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}
#}

恭喜啊!我們現在有一個能動的 future 執行器。我們甚至可以用它來執行 async/.await 程式碼或是親手打造的 future,像是之前寫的 TimerFuture

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}