Future
Trait
Future
trait 在 Rust 非同步程式設計中扮演關鍵角色。一個 Future
就是一個非同步的運算,產出一個結果值(雖然這個值可能為空,例如 ()
)。一個簡化版的 future trait 看起來如下:
# #![allow(unused_variables)] #fn main() { trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } #}
Future 可以透過呼叫 poll
函式往前推進,這個函式會盡可能地驅使 future 邁向完成。若 future 完成了,就回傳 Poll::Ready(reuslt)
。若這個 future 尚無法完成,就回傳 Poll::Pending
,並安排在該 Future
就緒且能有所進展時來呼叫 wake()
函式。當呼叫了 wake()
,執行器(executor)會驅使 Future
再次呼叫 poll
,於是 Future
就可以取得更多進展。
若沒有 wake()
,執行器不會知道哪個 future 能有所進展,因此必須不斷輪詢所有 future。有了 wake()
,執行器能夠確切知道哪個 future 已準備好接受輪詢。
舉例來說,試想我們要從 socket 讀取一些可能尚未就緒的資料。如果有資料進來,我們則可以讀取它並回傳 Poll:Ready(data)
;但若沒有任何資料就緒,我們的 future 會被阻塞且無法取得任何進展。當沒有任何資料,我們必須註冊 wake
函式,並在 socket 上的資料就緒時呼叫它,進而告知執行器我們的 future 準備好取得進展了。一個簡單的 SocketRead
future 大致上類似這樣:
# #![allow(unused_variables)] #fn main() { pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // The socket has data-- read it into a buffer and return it. Poll::Ready(self.socket.read_buf()) } else { // The socket does not yet have data. // // Arrange for `wake` to be called once data is available. // When data becomes available, `wake` will be called, and the // user of this `Future` will know to call `poll` again and // receive data. self.socket.set_readable_callback(wake); Poll::Pending } } } #}
這個 Future
原型可以在不需要中間配置(intermediate allocation)下組合多個非同步操作。並可以經由免配置狀態機(allocation-free state machine)實作同時執行多個 future 或是串聯多個 future 的操作,示例如下:
# #![allow(unused_variables)] #fn main() { /// A SimpleFuture that runs two other futures to completion concurrently. /// /// Concurrency is achieved via the fact that calls to `poll` each future /// may be interleaved, allowing each future to advance itself at its own pace. pub struct Join<FutureA, FutureB> { // Each field may contain a future that should be run to completion. // If the future has already completed, the field is set to `None`. // This prevents us from polling a future after it has completed, which // would violate the contract of the `Future` trait. a: Option<FutureA>, b: Option<FutureB>, } impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { // Attempt to complete future `a`. if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } } // Attempt to complete future `b`. if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } } if self.a.is_none() && self.b.is_none() { // Both futures have completed-- we can return successfully Poll::Ready(()) } else { // One or both futures returned `Poll::Pending` and still have // work to do. They will call `wake()` when progress can be made. Poll::Pending } } } #}
這展示了多個 future 如何在無需分別配置資源的情形下同時執行,讓我們能夠寫出更高效的非同步程式。無獨有偶,多個循序的 future 也能一個接著一個執行,如下所示:
# #![allow(unused_variables)] #fn main() { /// A SimpleFuture that runs two futures to completion, one after another. // // Note: for the purposes of this simple example, `AndThenFut` assumes both // the first and second futures are available at creation-time. The real // `AndThen` combinator allows creating the second future based on the output // of the first future, like `get_breakfast.and_then(|food| eat(food))`. pub struct AndThenFut<FutureA, FutureB> { first: Option<FutureA>, second: FutureB, } impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if let Some(first) = &mut self.first { match first.poll(wake) { // We've completed the first future-- remove it and start on // the second! Poll::Ready(()) => self.first.take(), // We couldn't yet complete the first future. Poll::Pending => return Poll::Pending, }; } // Now that the first future is done, attempt to complete the second. self.second.poll(wake) } } #}
這些示例展現出 Future
可以在無需額外配置的物件與深度巢狀回呼 (nested callback)的情形下,清晰表達出非同步的流程控制。看完這樣基本的流程控制,讓我們來討論真正的 Future
trait 和剛剛的原型哪裡不相同。
# #![allow(unused_variables)] #fn main() { trait Future { type Output; fn poll( // Note the change from `&mut self` to `Pin<&mut Self>`: self: Pin<&mut Self>, // and the change from `wake: fn()` to `cx: &mut Context<'_>`: cx: &mut Context<'_>, ) -> Poll<Self::Output>; } #}
你會發現的第一個改變是 self
型別不再是 &mut self
,而是由 Pin<&mut self>
取代。我們會在往後的章節有更多關於 Pinning 討論,此刻知道它允許我們建立一個不移動(immovable)的 future。不移動物件可在它們的欄位(field)保存指標,例如 struct MyFut { a: i32, ptr_to_a: *const i32 }
。Pinning 是啟用 async/await 前必要之功能。
第二,wake: fn()
改為 &mut Context<_>
。在 SimpleFuture
我們透過呼叫一個函式指標(fn()
)來告知 future 執行器這個 future 可以接受輪詢。然而,由於 fn()
是 zero-sized,它不能儲存任何有關哪個 Future
呼叫了 wake
的資訊。
在真實世界場景下,一個複雜如網頁伺服器的應用程式可能有數以千計不同的連線,這些連線的喚醒函式需要妥善分別管理。Context
型別提供取得一個叫 Waker
的型別來解決此問題,Waker
的功能則是用來喚醒一個特定任務。