透過 Waker
喚醒任務
對 future 來說,第一次被輪詢(poll
)時任務尚未完成再正常不過了。當這件事發生,future 必須確認它未來就緒時會被再度輪詢以取得進展。這就需透過 Waker
型別處理。
每次輪詢一個 future 時,都將其作為「任務(task)」的一部分來輪詢。任務實際上是已提交到執行器的最上層 future。
Waker
提供一個 wake()
方法,可以告知執行器相關的任務需要喚醒。當呼叫了 wake()
,執行器就會得知連動該 Waker
的任務已準備就緒取得進展,且應該再度輪詢它的 future。
Waker
同時實作了 clone()
,所以它可以四處複製與儲存。
我們來試試用 Waker
實作一個簡易計時器。
案例:打造一個計時器
為了貼近示例的目的,我們只需要在計時器建立時開一個執行緒,睡眠一段必要的時間,當計時器時間到,再發送訊號給計時器 future。
這邊是我們開始手作前需要的 import:
# #![allow(unused_variables)] #fn main() { use { std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }, }; #}
讓我們從定義自己的 future 型別開始。我們的 future 需要一個溝通方法,給執行緒通知我們計時器的時間到了 future 也該完成了。我們會用一個共享狀態 Arc<Mutex<..>>
在執行緒與 future 間溝通。
# #![allow(unused_variables)] #fn main() { pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Shared state between the future and the waiting thread struct SharedState { /// Whether or not the sleep time has elapsed completed: bool, /// The waker for the task that `TimerFuture` is running on. /// The thread can use this after setting `completed = true` to tell /// `TimerFuture`'s task to wake up, see that `completed = true`, and /// move forward. waker: Option<Waker>, } #}
現在,我們要實際動手實作 Future
了!
# #![allow(unused_variables)] #fn main() { impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Look at the shared state to see if the timer has already completed. let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // Set waker so that the thread can wake up the current task // when the timer has completed, ensuring that the future is polled // again and sees that `completed = true`. // // It's tempting to do this once rather than repeatedly cloning // the waker each time. However, the `TimerFuture` can move between // tasks on the executor, which could cause a stale waker pointing // to the wrong task, preventing `TimerFuture` from waking up // correctly. // // N.B. it's possible to check for this using the `Waker::will_wake` // function, but we omit that here to keep things simple. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } #}
很簡單吧?如果執行緒設定 shared_state.completed = true
,我們完成了!否則,我們就複製當前任務的 Waker
並將其傳入 shared_state.waker
,以便執行緒往後喚醒任務。
重要的是,在每次輪詢 future 後必須更新 Waker
,因為 future 有可能移動到帶有不同 Waker
的不同任務上。這種狀況會發生在 future 在 task 之間相互傳遞時。
最後,我們需要可以實際構建計時器與啟動執行緒的 API:
# #![allow(unused_variables)] #fn main() { impl TimerFuture { /// Create a new `TimerFuture` which will complete after the provided /// timeout. pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn the new thread let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Signal that the timer has completed and wake up the last // task on which the future was polled, if one exists. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } #}
呼!這些就是打造簡易計時器 future 的一切所需。現在,我們若有一個執行器可以讓 future 跑起來⋯