透過 Waker 喚醒任務

對 future 來說,第一次被輪詢(poll)時任務尚未完成再正常不過了。當這件事發生,future 必須確認它未來就緒時會被再度輪詢以取得進展。這就需透過 Waker 型別處理。

每次輪詢一個 future 時,都將其作為「任務(task)」的一部分來輪詢。任務實際上是已提交到執行器的最上層 future。

Waker 提供一個 wake() 方法,可以告知執行器相關的任務需要喚醒。當呼叫了 wake(),執行器就會得知連動該 Waker 的任務已準備就緒取得進展,且應該再度輪詢它的 future。

Waker 同時實作了 clone(),所以它可以四處複製與儲存。

我們來試試用 Waker 實作一個簡易計時器。

案例:打造一個計時器

為了貼近示例的目的,我們只需要在計時器建立時開一個執行緒,睡眠一段必要的時間,當計時器時間到,再發送訊號給計時器 future。

這邊是我們開始手作前需要的 import:


# #![allow(unused_variables)]
#fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
#}

讓我們從定義自己的 future 型別開始。我們的 future 需要一個溝通方法,給執行緒通知我們計時器的時間到了 future 也該完成了。我們會用一個共享狀態 Arc<Mutex<..>> 在執行緒與 future 間溝通。


# #![allow(unused_variables)]
#fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
#}

現在,我們要實際動手實作 Future 了!


# #![allow(unused_variables)]
#fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#}

很簡單吧?如果執行緒設定 shared_state.completed = true,我們完成了!否則,我們就複製當前任務的 Waker 並將其傳入 shared_state.waker,以便執行緒往後喚醒任務。

重要的是,在每次輪詢 future 後必須更新 Waker,因為 future 有可能移動到帶有不同 Waker 的不同任務上。這種狀況會發生在 future 在 task 之間相互傳遞時。

最後,我們需要可以實際構建計時器與啟動執行緒的 API:


# #![allow(unused_variables)]
#fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
#}

呼!這些就是打造簡易計時器 future 的一切所需。現在,我們若有一個執行器可以讓 future 跑起來⋯