開始上手
歡迎來到 Rust 非同步程式設計!如果你正要開始編寫非同步 Rust 程式碼,那麼你來對地方啦。不論你要建立一個網路伺服器,或是一個作業系統,本書將會演示如何使用 Rust 的非同步程式設計工具來充分運用你的硬體。
本書涵蓋內容
本書旨在作為一本全面且不斷更新的指南,著重 Rust 非同步語言特性與函式庫,並適合新手和老鳥閱讀。
-
前幾個章節提供了一般的非同步程式設計介紹,以及 Rust 如何處理非同步。
-
中間幾個章節會討論編寫非同步程式碼的重要的實用工具與流程控制方法,並探討架構函式庫與應用程式的最佳實踐,以達到效能與可用性最大化。
-
本書最後的部分涵蓋了更為廣闊的非同步生態系,並提供許多常見任務的實戰範例。
有了這些,讓我們一起探索令人興奮的 Rust 非同步程式設計的世界吧!
為什麼是非同步?
我們都愛 Rust 讓我們有辦法寫出快速又安全的軟體。但為什麼要編寫非同步程式碼呢?
非同步的程式碼可以在同個作業系統的執行緒下同時執行多項任務。在一個典型的多執行緒程式中,如果要在同一時間下載多個不同的網頁,你需要講工作分佈到兩個相異的執行緒中,如下:
# #![allow(unused_variables)] #fn main() { fn get_two_sites() { // Spawn two threads to do work. let thread_one = thread::spawn(|| download("https:://www.foo.com")); let thread_two = thread::spawn(|| download("https:://www.bar.com")); // Wait for both threads to complete. thread_one.join().expect("thread one panicked"); thread_two.join().expect("thread two panicked"); } #}
對大部分的應用程式來說,這還行,畢竟執行緒就是設計來做這種事:同時執行多項不同的任務。然而,一些限制接踵而來。在不同的執行緒與切換,以及在執行緒間資料共享的開銷非常多。就算是無所事事在一旁涼快的一個執行緒仍然消耗寶貴的系統資源。而非同步程式就算設計來消滅這些開銷。我們可以透過 Rust 的 async/.await 語法,重寫上面這個函數,這個語法標記允許我們在不建立多個執行緒的條件下同時執行多項任務:
# #![allow(unused_variables)] #fn main() { async fn get_two_sites_async() { // Create a two different "futures" which, when run to completion, // will asynchronously download the webpages. let future_one = download_async("https:://www.foo.com"); let future_two = download_async("https:://www.bar.com"); // Run both futures to completion at the same time. join!(future_one, future_two); } #}
總體來說,非同步應用程式有潛力比相對應的多執行緒實作來得更快更節省資源。不過,它仍然有個成本。執行緒由作業系統原生提供,使用上不需要特殊的程式設計模式,任意的函式都可建立執行緒,且呼叫使用執行緒的函式就和呼叫普通函式一樣容易。然而,非同步的函式需要程式語言或函式庫特殊的支援。在 Rust 的世界,async fn 會建立一個非同步函式,函式會返回一個 Future。若要執行函式主體,則返回的 Future 必須執行直到完成。
傳統多執行緒的程式可以達到高效,而 Rust 少量的記憶體足跡與預測性代表了用了 async,你可以走得更遠。非同步程式設計模型帶來的複雜度並非總是值得,該仔細考慮你的應用程式是否在簡單多執行緒模型下能跑得更好。
非同步 Rust 的現況
隨著時間推移,非同步 Rust 生態系歷經了許多演進,所以不易了解什麼工具可用,什麼函式庫適合投資,或是什麼文件可以閱讀。幸虧,標準函式庫中的 Future trait 和 async/await 最近穩定了。整個生態系處於遷移至新穩定 API 的過程中,此後流失會顯著降低。
然而,目前生態系仍正在快速開發中,且尚未打磨非同步 Rust 的體驗。大多數函式庫依然使用 0.1 futures crate 的定義,這代表開發者經常要使用 0.3 futures 的compat 功能,才能讓 0.1 版相容地運作。async/await 語言特性還非常新。重要的擴充功能,如在 trait method 中的 async fn 語法仍未實作,且當前的編譯器錯誤訊息非常難以解析。
Rust 正在通往支援最高效能與人因工程的非同步程式設計的道路上,如果你不懼怕掉進坑裡,來好好享受 Rust 非同步程式設計的世界吧!
async/.await 入門
async/.await 是 Rust 內建編寫非同步函式的工具,讓非同步函式寫起來像同步。async 會將一塊程式碼轉換為一個實作 future trait 的狀態機。相較於在同步的方法中呼叫一個阻塞函式(blocking function)會阻塞整個執行緒,反之,被阻塞的 future 會釋出執行緒的控制權,讓其他 future 可以繼續運作。
使用 async fn 語法來建立一個非同步函式:
# #![allow(unused_variables)] #fn main() { async fn do_something() { ... } #}
async fn 的返回值是一個 future。要讓事情發生,future 需要在一個執行器(executor)上面運作。
// `block_on` blocks the current thread until the provided future has run to // completion. Other executors provide more complex behavior, like scheduling // multiple futures onto the same thread. use futures::executor::block_on; async fn hello_world() { println!("hello, world!"); } fn main() { let future = hello_world(); // Nothing is printed block_on(future); // `future` is run and "hello, world!" is printed }
在一個 async fn 裡,可以使用 .await 來等待另一個實作 future trait 的型別完成任務,例如其他 async fn 的輸出。和 block_on 不同的事,.await 不會阻塞當前的執行緒,取而代之的是非同步地等待這個 future 完成,若這個 future 當下不能有所進展,也能允許其他任務繼續執行。
舉例來說,想像有三個 async fn:learn_song、sing_song 與 dance:
# #![allow(unused_variables)] #fn main() { async fn learn_song() -> song { ... } async fn sing_song(song: song) { ... } async fn dance() { ... } #}
有個方法可以執行學習、唱歌和跳舞,就是分別阻塞每一個函式:
fn main() { let song = block_on(learn_song()); block_on(sing_song(song)); block_on(dance()); }
然而,這樣並沒有達到最佳的效能,我們同時只做了一件事而已!很明顯地,我們需要在唱歌之前學習歌曲,但能夠同時跳著舞有學習並唱歌。為了達成這個任務,我們建立兩個獨立可並行執行的 async fn:
async fn learn_and_sing() { // Wait until the song has been learned before singing it. // We use `.await` here rather than `block_on` to prevent blocking the // thread, which makes it possible to `dance` at the same time. let song = learn_song().await; sing_song(song).await; } async fn async_main() { let f1 = learn_and_sing(); let f2 = dance(); // `join!` is like `.await` but can wait for multiple futures concurrently. // If we're temporarily blocked in the `learn_and_sing` future, the `dance` // future will take over the current thread. If `dance` becomes blocked, // `learn_and_sing` can take back over. If both futures are blocked, then // `async_main` is blocked and will yield to the executor. futures::join!(f1, f2); } fn main() { block_on(async_main()); }
這個範例中,學習歌曲必須發生在唱歌之前,但無論唱歌或學習都能與跳舞同時發生。如果我們在 learn_and_sing 中使用 block_on(learn_song()) 而不是 learn_song().await,整個執行緒在 learn_song 執行期間無法做其他事,這種情況下要同時跳舞是不可能的任務。利用 .await 來等待 learn_song future,就會允許其他任務在 learn_song 阻塞時接管當前的執行緒。這讓在同個執行緒下並行執行多個future 變為可能。
現在,你學會了基礎的 async/await,讓我們來嘗嘗實戰範例。
案例:HTTP 伺服器
我們用 async/.await 打造一個 echo 伺服器吧!
首先,執行 rustup update nightly 讓你去的最新最棒的 Rust,我們正使用最前沿的功能,確保最新版本非常重要。當你完成上述指令,執行 cargo +nightly new async-await-echo 來建立新專案,並開啟 async-await-echo 資料夾。
讓我們在 Cargo.toml 中加上一些相依函式庫:
[dependencies]
# The latest version of the "futures" library, which has lots of utilities
# for writing async code. Enable the "compat" feature to include the
# functions for using futures 0.3 and async/await with the Hyper library,
# which use futures 0.1.
futures-preview = { version = "=0.3.0-alpha.17", features = ["compat"] }
# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.12.9"
現在已經取得相依函式庫,可以開始寫程式了。我們有些 import 需要新增:
# #![allow(unused_variables)] #fn main() { use { hyper::{ // Miscellaneous types from Hyper for working with HTTP. Body, Client, Request, Response, Server, Uri, // This function turns a closure which returns a future into an // implementation of the the Hyper `Service` trait, which is an // asynchronous function from a generic `Request` to a `Response`. service::service_fn, // A function which runs a future to completion using the Hyper runtime. rt::run, }, futures::{ // Extension trait for futures 0.1 futures, adding the `.compat()` method // which allows us to use `.await` on 0.1 futures. compat::Future01CompatExt, // Extension traits providing additional methods on futures. // `FutureExt` adds methods that work for all futures, whereas // `TryFutureExt` adds methods to futures that return `Result` types. future::{FutureExt, TryFutureExt}, }, std::net::SocketAddr, }; #}
當這些 imports 加入後,將這些鍋碗瓢盆放在一起,就能開始接收請求:
async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> { // Always return successfully with a response containing a body with // a friendly greeting ;) Ok(Response::new(Body::from("hello, world!"))) } async fn run_server(addr: SocketAddr) { println!("Listening on http://{}", addr); // Create a server bound on the provided address let serve_future = Server::bind(&addr) // Serve requests using our `async serve_req` function. // `serve` takes a closure which returns a type implementing the // `Service` trait. `service_fn` returns a value implementing the // `Service` trait, and accepts a closure which goes from request // to a future of the response. To use our `serve_req` function with // Hyper, we have to box it and put it in a compatability // wrapper to go from a futures 0.3 future (the kind returned by // `async fn`) to a futures 0.1 future (the kind used by Hyper). .serve(|| service_fn(|req| serve_req(req).boxed().compat())); // Wait for the server to complete serving or exit with an error. // If an error occurred, print it to stderr. if let Err(e) = serve_future.compat().await { eprintln!("server error: {}", e); } } fn main() { // Set the address to run our socket on. let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); // Call our `run_server` function, which returns a future. // As with every `async fn`, for `run_server` to do anything, // the returned future needs to be run. Additionally, // we need to convert the returned future from a futures 0.3 future into a // futures 0.1 future. let futures_03_future = run_server(addr); let futures_01_future = futures_03_future.unit_error().boxed().compat(); // Finally, we can run the future to completion using the `run` function // provided by Hyper. run(futures_01_future); }
如果現在跑 cargo run,應該會看到「Listening on http://127.0.0.1:3000」輸出在終端機上。若在你的瀏覽器開啟這個鏈結,會看到「hello, world」出現在你的瀏覽器。恭喜!你剛寫下第一個 Rust 非同步網頁伺服器。
你也可以檢視這個請求本身,會包含譬如請求的 URI、HTTP 版本、headers、以及其他詮釋資料。舉例來說,你可以打印出請求的 URI:
# #![allow(unused_variables)] #fn main() { println!("Got request at {:?}", req.uri()); #}
你可能注意到我們尚未做任何非同步的事情來處理這個請求,我們就只是即刻回應罷了,所以我們並無善用 async fn 給予我們的彈性。與其只返回一個靜態訊息,讓我們使用 Hyper 的 HTTP 客戶端來代理使用者的請求到其他網站。
我們從解析想要請求的 URL 開始:
# #![allow(unused_variables)] #fn main() { let url_str = "http://www.rust-lang.org/en-US/"; let url = url_str.parse::<Uri>().expect("failed to parse URL"); #}
接下來我們建立一個新的 hyper::Client 並用之生成一個 GET 請求,這個請求會回傳一個回應給使用者。
# #![allow(unused_variables)] #fn main() { let res = Client::new().get(url).compat().await; // Return the result of the request directly to the user println!("request finished-- returning response"); res #}
Client::get 返回一個 hyper::client::FutureResponse,這個 future 實作了 Future<Output = Result<Response, Error>>(或 futures 0.1 的 Future<Item = Response, Error = Error>)。當我們 .await 這個 future,將會發送一個 HTTP 請求,當前的任務會暫時停止(suspend),而這個任務會進入佇列中,在收到回應後繼續執行。
現在,執行 cargo run 並在瀏覽器中打開 http://127.0.0.1:3000/foo,會看到 Rust 首頁,以及以下的終端機輸出:
Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response
恭喜呀!你剛剛代理了一個 HTTP 請求。
揭秘:執行 Future 與任務
在本章節,我們會涵蓋 Future 與非同步任務如何排程的底層結構。如果只對學習如何編寫更高層次的程式碼,來使用既有的 Future,並對 Future 如何運作的細節毫無興趣,可直接跳到 async/await 章節。然而,本章講述的眾多主題對理解 async/await運作原理、執行期與效能相關屬性,以及打造新的非同步基礎型別,都非常有幫助。若你決心跳過這個部分,你可能會想要收藏起來,往後再度來訪。
現在,讓我們來談談 Future trait。
Future Trait
Future trait 在 Rust 非同步程式設計中扮演關鍵角色。一個 Future 就是一個非同步的運算,產出一個結果值(雖然這個值可能為空,例如 ())。一個簡化版的 future trait 看起來如下:
# #![allow(unused_variables)] #fn main() { trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } #}
Future 可以透過呼叫 poll 函式往前推進,這個函式會盡可能地驅使 future 邁向完成。若 future 完成了,就回傳 Poll::Ready(reuslt)。若這個 future 尚無法完成,就回傳 Poll::Pending,並安排在該 Future 就緒且能有所進展時來呼叫 wake() 函式。當呼叫了 wake(),執行器(executor)會驅使 Future 再次呼叫 poll,於是 Future 就可以取得更多進展。
若沒有 wake(),執行器不會知道哪個 future 能有所進展,因此必須不斷輪詢所有 future。有了 wake(),執行器能夠確切知道哪個 future 已準備好接受輪詢。
舉例來說,試想我們要從 socket 讀取一些可能尚未就緒的資料。如果有資料進來,我們則可以讀取它並回傳 Poll:Ready(data);但若沒有任何資料就緒,我們的 future 會被阻塞且無法取得任何進展。當沒有任何資料,我們必須註冊 wake 函式,並在 socket 上的資料就緒時呼叫它,進而告知執行器我們的 future 準備好取得進展了。一個簡單的 SocketRead future 大致上類似這樣:
# #![allow(unused_variables)] #fn main() { pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // The socket has data-- read it into a buffer and return it. Poll::Ready(self.socket.read_buf()) } else { // The socket does not yet have data. // // Arrange for `wake` to be called once data is available. // When data becomes available, `wake` will be called, and the // user of this `Future` will know to call `poll` again and // receive data. self.socket.set_readable_callback(wake); Poll::Pending } } } #}
這個 Future 原型可以在不需要中間配置(intermediate allocation)下組合多個非同步操作。並可以經由免配置狀態機(allocation-free state machine)實作同時執行多個 future 或是串聯多個 future 的操作,示例如下:
# #![allow(unused_variables)] #fn main() { /// A SimpleFuture that runs two other futures to completion concurrently. /// /// Concurrency is achieved via the fact that calls to `poll` each future /// may be interleaved, allowing each future to advance itself at its own pace. pub struct Join<FutureA, FutureB> { // Each field may contain a future that should be run to completion. // If the future has already completed, the field is set to `None`. // This prevents us from polling a future after it has completed, which // would violate the contract of the `Future` trait. a: Option<FutureA>, b: Option<FutureB>, } impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { // Attempt to complete future `a`. if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } } // Attempt to complete future `b`. if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } } if self.a.is_none() && self.b.is_none() { // Both futures have completed-- we can return successfully Poll::Ready(()) } else { // One or both futures returned `Poll::Pending` and still have // work to do. They will call `wake()` when progress can be made. Poll::Pending } } } #}
這展示了多個 future 如何在無需分別配置資源的情形下同時執行,讓我們能夠寫出更高效的非同步程式。無獨有偶,多個循序的 future 也能一個接著一個執行,如下所示:
# #![allow(unused_variables)] #fn main() { /// A SimpleFuture that runs two futures to completion, one after another. // // Note: for the purposes of this simple example, `AndThenFut` assumes both // the first and second futures are available at creation-time. The real // `AndThen` combinator allows creating the second future based on the output // of the first future, like `get_breakfast.and_then(|food| eat(food))`. pub struct AndThenFut<FutureA, FutureB> { first: Option<FutureA>, second: FutureB, } impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if let Some(first) = &mut self.first { match first.poll(wake) { // We've completed the first future-- remove it and start on // the second! Poll::Ready(()) => self.first.take(), // We couldn't yet complete the first future. Poll::Pending => return Poll::Pending, }; } // Now that the first future is done, attempt to complete the second. self.second.poll(wake) } } #}
這些示例展現出 Future 可以在無需額外配置的物件與深度巢狀回呼 (nested callback)的情形下,清晰表達出非同步的流程控制。看完這樣基本的流程控制,讓我們來討論真正的 Future trait 和剛剛的原型哪裡不相同。
# #![allow(unused_variables)] #fn main() { trait Future { type Output; fn poll( // Note the change from `&mut self` to `Pin<&mut Self>`: self: Pin<&mut Self>, // and the change from `wake: fn()` to `cx: &mut Context<'_>`: cx: &mut Context<'_>, ) -> Poll<Self::Output>; } #}
你會發現的第一個改變是 self 型別不再是 &mut self,而是由 Pin<&mut self> 取代。我們會在往後的章節有更多關於 Pinning 討論,此刻知道它允許我們建立一個不移動(immovable)的 future。不移動物件可在它們的欄位(field)保存指標,例如 struct MyFut { a: i32, ptr_to_a: *const i32 }。Pinning 是啟用 async/await 前必要之功能。
第二,wake: fn() 改為 &mut Context<_>。在 SimpleFuture 我們透過呼叫一個函式指標(fn())來告知 future 執行器這個 future 可以接受輪詢。然而,由於 fn() 是 zero-sized,它不能儲存任何有關哪個 Future 呼叫了 wake 的資訊。
在真實世界場景下,一個複雜如網頁伺服器的應用程式可能有數以千計不同的連線,這些連線的喚醒函式需要妥善分別管理。Context 型別提供取得一個叫 Waker 的型別來解決此問題,Waker 的功能則是用來喚醒一個特定任務。
透過 Waker 喚醒任務
對 future 來說,第一次被輪詢(poll)時任務尚未完成再正常不過了。當這件事發生,future 必須確認它未來就緒時會被再度輪詢以取得進展。這就需透過 Waker 型別處理。
每次輪詢一個 future 時,都將其作為「任務(task)」的一部分來輪詢。任務實際上是已提交到執行器的最上層 future。
Waker 提供一個 wake() 方法,可以告知執行器相關的任務需要喚醒。當呼叫了 wake(),執行器就會得知連動該 Waker 的任務已準備就緒取得進展,且應該再度輪詢它的 future。
Waker 同時實作了 clone(),所以它可以四處複製與儲存。
我們來試試用 Waker 實作一個簡易計時器。
案例:打造一個計時器
為了貼近示例的目的,我們只需要在計時器建立時開一個執行緒,睡眠一段必要的時間,當計時器時間到,再發送訊號給計時器 future。
這邊是我們開始手作前需要的 import:
# #![allow(unused_variables)] #fn main() { use { std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }, }; #}
讓我們從定義自己的 future 型別開始。我們的 future 需要一個溝通方法,給執行緒通知我們計時器的時間到了 future 也該完成了。我們會用一個共享狀態 Arc<Mutex<..>> 在執行緒與 future 間溝通。
# #![allow(unused_variables)] #fn main() { pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Shared state between the future and the waiting thread struct SharedState { /// Whether or not the sleep time has elapsed completed: bool, /// The waker for the task that `TimerFuture` is running on. /// The thread can use this after setting `completed = true` to tell /// `TimerFuture`'s task to wake up, see that `completed = true`, and /// move forward. waker: Option<Waker>, } #}
現在,我們要實際動手實作 Future 了!
# #![allow(unused_variables)] #fn main() { impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Look at the shared state to see if the timer has already completed. let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // Set waker so that the thread can wake up the current task // when the timer has completed, ensuring that the future is polled // again and sees that `completed = true`. // // It's tempting to do this once rather than repeatedly cloning // the waker each time. However, the `TimerFuture` can move between // tasks on the executor, which could cause a stale waker pointing // to the wrong task, preventing `TimerFuture` from waking up // correctly. // // N.B. it's possible to check for this using the `Waker::will_wake` // function, but we omit that here to keep things simple. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } #}
很簡單吧?如果執行緒設定 shared_state.completed = true,我們完成了!否則,我們就複製當前任務的 Waker 並將其傳入 shared_state.waker,以便執行緒往後喚醒任務。
重要的是,在每次輪詢 future 後必須更新 Waker,因為 future 有可能移動到帶有不同 Waker 的不同任務上。這種狀況會發生在 future 在 task 之間相互傳遞時。
最後,我們需要可以實際構建計時器與啟動執行緒的 API:
# #![allow(unused_variables)] #fn main() { impl TimerFuture { /// Create a new `TimerFuture` which will complete after the provided /// timeout. pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn the new thread let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Signal that the timer has completed and wake up the last // task on which the future was polled, if one exists. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } #}
呼!這些就是打造簡易計時器 future 的一切所需。現在,我們若有一個執行器可以讓 future 跑起來⋯
案例:打造一個執行器
Rust 的 Future 具有惰性:除非積極驅動到完成,不然不會做任何事。驅使 future 完成的方法之一是在 async 函式中用 .await 等待它,但這只會將問題推向更高的層次:誰來跑最上層 async 函式返回的 future?答案揭曉,我們需要一個 Future 執行器(executor)。
Future 執行器會拿一群最上層的 Future 並在它們可有所進展時呼叫 poll 執行它們一直到完成。通常一個執行器會 poll 一個 future 一次作為開始。當 Future 透過呼叫 wake() 表示它們已就緒並可以有所進展時,Future 會被放回佇列並再度被呼叫 poll,不斷重複直到該 Future 完成了。
在這一部分,我們將編寫自己的簡易執行器,能夠並行執行一大群最上層的 future 直到完成。
這個示例中,我們會依賴 futures crate 的 ArcWake trait,這個 trait 提供一個簡便構建 Waker 的方法。
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures-preview = "=0.3.0-alpha.17"
接下來,我們需要在 src/main.rs 加上一些 import。
# #![allow(unused_variables)] #fn main() { use { futures::{ future::{FutureExt, BoxFuture}, task::{ArcWake, waker_ref}, }, std::{ future::Future, sync::{Arc, Mutex}, sync::mpsc::{sync_channel, SyncSender, Receiver}, task::{Context, Poll}, time::Duration, }, // The timer we wrote in the previous section: timer_future::TimerFuture, }; #}
我們的執行器工作模式是發送任務到通道上(channel)。這個執行器會從通道拉取事件並執行。當一個任務已就緒將執行更多工作(被喚醒),它就可將自己放回通道中,以此方式自我排程來等待再次被輪詢。
在這個設計下,執行器僅需要任務通道的接收端。使用者則會取得發送端,並藉由發送端產生(spawn)新的 future。而任務本身實際上是可自我排程的 future,所以我們將任務自身的結構設計為一個 future,加上一個可讓任務重新加入佇列的通道發送端。
# #![allow(unused_variables)] #fn main() { /// Task executor that receives tasks off of a channel and runs them. struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner` spawns new futures onto the task channel. #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// A future that can reschedule itself to be polled by an `Executor`. struct Task { /// In-progress future that should be pushed to completion. /// /// The `Mutex` is not necessary for correctness, since we only have /// one thread executing tasks at once. However, Rust isn't smart /// enough to know that `future` is only mutated from one thread, /// so we need use the `Mutex` to prove thread-safety. A production /// executor would not need this, and could use `UnsafeCell` instead. future: Mutex<Option<BoxFuture<'static, ()>>>, /// Handle to place the task itself back onto the task queue. task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // Maximum number of tasks to allow queueing in the channel at once. // This is just to make `sync_channel` happy, and wouldn't be present in // a real executor. const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender}) } #}
我們也在產生器(spawner)新增一個方法,使其產生新 future 更便利。這個方法需要傳入一個 future 型別,會將它 box 起來再放入新建立的 Arc<Task> 中,以便於將 future 加入 executor 的佇列。
# #![allow(unused_variables)] #fn main() { impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("too many tasks queued"); } } #}
欲輪詢 future,我們需要建立 Waker。在任務喚醒一節中討論過,讓任務 wake 被呼叫時的輪詢排程是 Waker 的職責所在。記住這一點,Waker 會告知執行器確切已就緒的任務為何,並允許執行器輪詢這些已就緒可有所進展的 future。建立一個新 Waker 最簡單的作法是透過實作 ArcWake trait,並使用 waker_ref 或 .into_waker() 函式將 Arc<impl ArcWake> 型別轉換為 Waker。現在我們來替任務實作 ArcWake,允許它們可以轉換成 Waker 並可以被喚醒:
# #![allow(unused_variables)] #fn main() { impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Implement `wake` by sending this task back onto the task channel // so that it will be polled again by the executor. let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); } } #}
當 Waker 從一個 Arc<Task> 建立後,呼叫它的 wake() 會發送一個 Arc 副本到任務通道上。我們的執行器就會取出並輪詢該任務。實作如下:
# #![allow(unused_variables)] #fn main() { impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // Take the future, and if it has not yet completed (is still Some), // poll it in an attempt to complete it. let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // Create a `LocalWaker` from the task itself let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>` is a type alias for // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`. // We can get a `Pin<&mut dyn Future + Send + 'static>` // from it by calling the `Pin::as_mut` method. if let Poll::Pending = future.as_mut().poll(context) { // We're not done processing the future, so put it // back in its task to be run again in the future. *future_slot = Some(future); } } } } } #}
恭喜啊!我們現在有一個能動的 future 執行器。我們甚至可以用它來執行
async/.await 程式碼或是親手打造的 future,像是之前寫的 TimerFuture。
fn main() { let (executor, spawner) = new_executor_and_spawner(); // Spawn a task to print before and after waiting on a timer. spawner.spawn(async { println!("howdy!"); // Wait for our timer future to complete after two seconds. TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // Drop the spawner so that our executor knows it is finished and won't // receive more incoming tasks to run. drop(spawner); // Run the executor until the task queue is empty. // This will print "howdy!", pause, and then print "done!". executor.run(); }
執行器與系統輸入輸出
在前面的章節 Future Trait,我們探討了一個在 socket 上非同步讀取資料的範例:
# #![allow(unused_variables)] #fn main() { pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // The socket has data-- read it into a buffer and return it. Poll::Ready(self.socket.read_buf()) } else { // The socket does not yet have data. // // Arrange for `wake` to be called once data is available. // When data becomes available, `wake` will be called, and the // user of this `Future` will know to call `poll` again and // receive data. self.socket.set_readable_callback(wake); Poll::Pending } } } #}
這個 future 會從一個 socket 上讀取可用的資料,倘若無任何可用資料,它會將控制權轉至執行器,並要求 future 的任務應在 socket 有可讀資料時能被喚醒。然而,這個範例對 Socket 型別的實作並不夠清楚,具體來說,我們無法得知 set_readable_callback 函式如何運作。當 socket 有可讀資料時,我們要怎樣安排呼叫一次 lw.wake() ?有個作法是讓一個執行緒不斷檢查 socket 是否可讀,並適時呼叫 wake()。不過,這方法需要分別對每個阻塞 IO 的 future 建立執行緒,效率低落,絕對會大幅降低咱們非同步程式碼的效率。
一般來說,這問題的解法通常透過整合系統原生的阻塞性 IO 元件(IO-aware system blocking primitive)來實踐,例如 Linux 上的 epoll、FreeBSD 與 macOS 的 kqueue、Windows 上的 IOCP,以及 Fuchsia 上的 port(這些都在跨平台 Rust 模組 mio 中實作介面出來)。這些原生元件都允許一個執行緒被多個非同步 IO 事件阻塞,並在任一事件完成時返回。實際上,這些 API 長相如下:
# #![allow(unused_variables)] #fn main() { struct IoBlocker { ... } struct Event { // An ID uniquely identifying the event that occurred and was listened for. id: usize, // A set of signals to wait for, or which occurred. signals: Signals, } impl IoBlocker { /// Create a new collection of asynchronous IO events to block on. fn new() -> Self { ... } /// Express an interest in a particular IO event. fn add_io_event_interest( &self, /// The object on which the event will occur io_object: &IoObject, /// A set of signals that may appear on the `io_object` for /// which an event should be triggered, paired with /// an ID to give to events that result from this interest. event: Event, ) { ... } /// Block until one of the events occurs. fn block(&self) -> Event { ... } } let mut io_blocker = IoBlocker::new(); io_blocker.add_io_event_interest( &socket_1, Event { id: 1, signals: READABLE }, ); io_blocker.add_io_event_interest( &socket_2, Event { id: 2, signals: READABLE | WRITABLE }, ); let event = io_blocker.block(); // prints e.g. "Socket 1 is now READABLE" if socket one became readable. println!("Socket {:?} is now {:?}", event.id, event.signals); #}
Future 的執行器可以使用這些元件來提供非同步 IO 物件,例如 socket 可以配置特定 IO 事件發生時的回呼函式(callback)。在上面 SocketRead 範例的情況下,Socket::set_readable_callback 函式的虛擬碼可能如下:
# #![allow(unused_variables)] #fn main() { impl Socket { fn set_readable_callback(&self, waker: Waker) { // `local_executor` is a reference to the local executor. // this could be provided at creation of the socket, but in practice // many executor implementations pass it down through thread local // storage for convenience. let local_executor = self.local_executor; // Unique ID for this IO object. let id = self.id; // Store the local waker in the executor's map so that it can be called // once the IO event arrives. local_executor.event_map.insert(id, waker); local_executor.add_io_event_interest( &self.socket_file_descriptor, Event { id, signals: READABLE }, ); } } #}
現在,我們只需要有一個執行器執行緒來接收與發送任何 IO 事件到合適的 Waker,這個 Waker 會喚醒對應的任務,讓執行器在回去檢查更多 IO 事件之前,能驅動更多任務(然後一直循環下去⋯)。
async/.await
在第一章,我們簡要介紹了 async/.await,並用它簡單架構一個簡易伺服器。本章節將深入探討 async/.await 的細節,解釋其運作原理,並比較 async 程式碼和傳統 Rust 程式的區別。
async/.await 是特殊的 Rust 語法,使其能轉移控制權到當前執行緒,而非阻塞之,並在等待操作完成的同時,允許其他程式碼繼續推進。
使用 async 有兩個主要途徑:async fn 與 async 區塊(block)。兩者皆返回一個實作 Future trait 的值:
# #![allow(unused_variables)] #fn main() { // `foo()` returns a type that implements `Future<Output = u8>`. // `foo().await` will result in a value of type `u8`. async fn foo() -> u8 { 5 } fn bar() -> impl Future<Output = u8> { // This `async` block results in a type that implements // `Future<Output = u8>`. async { let x: u8 = foo().await; x + 5 } } #}
如我們在第一章所見, async 函式主體和其他 future 都具有惰性:在執行前不做任何事。執行一個 Future 最常見的手段是 .await 它。當對一個 Future 呼叫 .await 時,會嘗試執行它到完成。若該 Future 阻塞,將會轉移控制權到當前的執行緒。而執行器會在該 Future 能取得更多進展時恢復執行它,讓 .await 得以解決。
async 生命週期
和其他傳統函式不同, async fn 會取得引用(reference)或其他非 'static 引數(argument),並返回一個綁定這些引數生命週期的 Future。
# #![allow(unused_variables)] #fn main() { // This function: async fn foo(x: &u8) -> u8 { *x } // Is equivalent to this function: fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a { async move { *x } } #}
這代表了從 async fn 返回的 future 只能在其非 '-static 引數的有效生命週期內被 await。
在常見的例子像在呼叫函式(如 foo(&x).await)立刻 await 該 future,這並不構成問題。不過,如果想保存或將這個 future 傳送到其他任務或執行緒中,這可能會發生問題。
常見將一個引用作為引數的 async fn 轉換為回傳一個 'static future 的方法是,將呼叫 async fn 需要的引數包裹在一個 async 區塊裡:
# #![allow(unused_variables)] #fn main() { fn bad() -> impl Future<Output = u8> { let x = 5; borrow_x(&x) // ERROR: `x` does not live long enough } fn good() -> impl Future<Output = u8> { async { let x = 5; borrow_x(&x).await } } #}
透過將引數移動到該 async 區塊,我們延長了引數的生命週期,使其匹配 good 回傳的 Future。
async move
async 區塊(block) 與閉包(closure)可以使用 move 關鍵字,行為更類似一般的閉包。一個 async move block 會取得它引用到的變數之所有權,這些變數就可以活過(outlive)當前的作用範圍(scope),但也就得與放棄其他程式碼共享這些變數的好處。
# #![allow(unused_variables)] #fn main() { /// `async` block: /// /// Multiple different `async` blocks can access the same local variable /// so long as they're executed within the variable's scope async fn blocks() { let my_string = "foo".to_string(); let future_one = async { // ... println!("{}", my_string); }; let future_two = async { // ... println!("{}", my_string); }; // Run both futures to completion, printing "foo" twice: let ((), ()) = futures::join!(future_one, future_two); } /// `async move` block: /// /// Only one `async move` block can access the same captured variable, since /// captures are moved into the `Future` generated by the `async move` block. /// However, this allows the `Future` to outlive the original scope of the /// variable: fn move_block() -> impl Future<Output = ()> { let my_string = "foo".to_string(); async move { // ... println!("{}", my_string); } } #}
在多執行緒的執行器上 .await
請注意,當使用多執行緒的 Future 執行器時,因為 .await 可能導致環境切換至新執行緒,讓 Future 可能在不同執行緒間移動,所以任何用在 async 裡的變數都必須能在執行緒間傳輸。
這代表使用 Rc、&RefCell 或其他沒有實作 Send trait 的型別,包含沒實作 Sync trait 引用型別,都不安全。
(警告:只要不在呼叫 .await 的作用域裡,這些型別還是可以使用)
同樣地,在 .await 之間持有傳統非 future 的鎖不是個好主意,它可能會造成執行緒池(threadpool)完全鎖上:一個任務拿走了鎖,並且 .await 將控制權轉移至執行器,讓其他任務嘗試取得鎖,然後就造成死鎖(deadlock)。我們可以使用 futures::lock 而不是 std::sync 中的 Mutex 從而避免這件事。
Pinning
想要輪詢 future 前,必須先用一個特殊的型別 Pin<T> 固定住 future。如果你閱讀了前一節 揭秘:執行 Future 與任務對 Future trait 的解釋,你會在 Future:poll 方法的定義上 self: Pin<&mut Self> 認出 Pin,但它到底代表什麼,為什麼需要它?
為什麼要 Pinning
Pinning 達成了確保一個物件永遠不移動。我們需要憶起 async/.await 的運作原理,才能理解為什麼需要釘住一個物件。試想以下的程式碼:
# #![allow(unused_variables)] #fn main() { let fut_one = ...; let fut_two = ...; async move { fut_one.await; fut_two.await; } #}
神秘面紗之下,其實建立了一個實作了 Future 的匿名型別,它的 poll 方法如下:
# #![allow(unused_variables)] #fn main() { // The `Future` type generated by our `async { ... }` block struct AsyncFuture { fut_one: FutOne, fut_two: FutTwo, state: State, } // List of states our `async` block can be in enum State { AwaitingFutOne, AwaitingFutTwo, Done, } impl Future for AsyncFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { loop { match self.state { State::AwaitingFutOne => match self.fut_one.poll(..) { Poll::Ready(()) => self.state = State::AwaitingFutTwo, Poll::Pending => return Poll::Pending, } State::AwaitingFutTwo => match self.fut_two.poll(..) { Poll::Ready(()) => self.state = State::Done, Poll::Pending => return Poll::Pending, } State::Done => return Poll::Ready(()), } } } } #}
第一次呼叫 poll 時,會輪詢 fut_one。若 fut_one 不能完成,AsyncFuture::poll 會直接返回。接下來的 Future 的 poll 呼叫則從前一次結束的地方繼續執行。這個過程會持續到整個 future 有辦法成功完成。
不過,如果 async 區塊(block)使用到引用(reference)會如何?舉例來說:
# #![allow(unused_variables)] #fn main() { async { let mut x = [0; 128]; let read_into_buf_fut = read_into_buf(&mut x); read_into_buf_fut.await; println!("{:?}", x); } #}
這會編譯成怎樣的結構體(struct)呢?
# #![allow(unused_variables)] #fn main() { struct ReadIntoBuf<'a> { buf: &'a mut [u8], // points to `x` below } struct AsyncFuture { x: [u8; 128], read_into_buf_fut: ReadIntoBuf<'what_lifetime?>, } #}
這裡,ReadIntoBuf future 持有一個指向其他結構體 x 的引用。但是,若 AsyncFuture 移動了(moved),x 的記憶體位址也會隨之移動,並讓儲存在 read_into_buf_fut.buf 的指標失效。
將 future 固定在一個特定的記憶體位址可以避免這種問題發生,讓在 async 區塊裡建立值的引用更安全。
如何使用 Pinning
Pin 型別封裝了指標型別,保證這些在指標背後的值不被移動。例如 Pin<& mut>、Pin<&t>、Pin<Box<T>> 全都保證 T 不被移動。
大多數的型別不會有移不移動的問題。這些型別實作了 Unpin trait。指向 Unpin 型別的指標可任意移入 Pin 或從 Pin 取出。舉例來說,u8 是 Unpin 的,因此 Pin<&mut T> 會表現得像普通的 &mut T。
有些與 future 互動的函式會要求 future 必須為 Unpin。若想要在要求 Unpin 型別的函式使用一個非 Unpin 的 Future 或 Stream,你要先透過 Box::pin(可建立 Pin<Box<T>>)或 pin_utils::pin_mut! (可建立 Pin<&mut T>)來固定住(pin)這個值。Pin<Box<Fut>> 與 Pin<&mut Fut> 就都能作為 future 使用,且都實作了 Unpin。
以下是範例:
# #![allow(unused_variables)] #fn main() { use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io // A function which takes a `Future` that implements `Unpin`. fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... } let fut = async { ... }; execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait // Pinning with `Box`: let fut = async { ... }; let fut = Box::pin(fut); execute_unpin_future(fut); // OK // Pinning with `pin_mut!`: let fut = async { ... }; pin_mut!(fut); execute_unpin_future(fut); // OK #}
Stream Trait
Stream trait 類似於 Future,不同的是在完成前可以產生多個值,就如同標準函式庫的 Iterator trait:
# #![allow(unused_variables)] #fn main() { trait Stream { /// The type of the value yielded by the stream. type Item; /// Attempt to resolve the next item in the stream. /// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value /// is ready, and `Poll::Ready(None)` if the stream has completed. fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; } #}
常見的 Stream 範例是 futures 模組裡的通道(channel)型別的 Receiver。每次有值從 Sender 發送端發送,它就會產生 Some(val);或是當整個 Sender 被捨棄(dropped)且所有等待中的訊息都被接收到,Stream 就會產生 None:
# #![allow(unused_variables)] #fn main() { async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next` is similar to `Iterator::next`, but returns a // type that implements `Future<Output = Option<T>>`. assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } #}
迭代與並行
和同步的 Iterator 一樣,迭代並處理在 Steam 裡的值有許多作法。有許多組合子(combinator)風格的方法,例如 map、filter 和 fold,和它們遇錯就提前退出的表親 try_map、try_filter 以及 try_fold。
不幸的是 for 迴圈無法配合 Stream 使用,但命令式風格程式碼 while let 與 next/try_next 可以使用:
# #![allow(unused_variables)] #fn main() { async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 { use futures::stream::StreamExt; // for `next` let mut sum = 0; while let Some(item) = stream.next().await { sum += item; } sum } async fn sum_with_try_next( mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>, ) -> Result<i32, io::Error> { use futures::stream::TryStreamExt; // for `try_next` let mut sum = 0; while let Some(item) = stream.try_next().await? { sum += item; } Ok(sum) } #}
不過,如果一次只處理一個元素,那就錯失善用並行的良機,畢竟並行是寫非同步程式碼的初衷嘛。想要並行處理 steam 上多個元素,請用 for_each_concurrent 與 try_for_each_concurrent 方法:
# #![allow(unused_variables)] #fn main() { async fn jump_around( mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>, ) -> Result<(), io::Error> { use futures::stream::TryStreamExt; // for `try_for_each_concurrent` const MAX_CONCURRENT_JUMPERS: usize = 100; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } #}
同時執行多個 Future
至此,我們大部分都是使用 .await 來執行 future,並在完成特定的 Future 前,阻塞當前的任務。不過,真實的非同步應用程式通常需要並行處理多個不同的操作。
本章涵蓋一些同時執行多個非同步操作的方法:
join!:等待所有 future 完成select!:等待多個 future 中的其中一個完成- Spawning:建立一個最上層的任務,並在當前環境中執行該 future 到完成
FuturesUnordered:一群會產出每個 subfuture 結果的 future
join!
futures::join 巨集能夠在等待多個不同的 future 完成的同時,並行執行這些 future。
當執行多個非同步操作時,很容易就寫出簡單的循序 .await:
# #![allow(unused_variables)] #fn main() { async fn get_book_and_music() -> (Book, Music) { let book = get_book().await; let music = get_music().await; (book, music) } #}
然而,因為 get_music 不會在 get_book 完成後自動嘗試開始,這將比我們需要的更慢。在部分語言中,futurue 在哪都能執行到完成,所以兩個操作可透過第一次呼叫 async fn 來開始 future,並在之後等待它們兩者。
# #![allow(unused_variables)] #fn main() { // WRONG -- don't do this async fn get_book_and_music() -> (Book, Music) { let book_future = get_book(); let music_future = get_music(); (book_future.await, music_future.await) } #}
不過,Rust 的 future 在直接正面地 .await 之前不會做任何事。這代表這兩段程式碼小片段將不會並行執行,而是循序執行 book_future 與 music_future。想正確並行執行兩個 future,請愛用 futures::join!:
# #![allow(unused_variables)] #fn main() { use futures::join; async fn get_book_and_music() -> (Book, Music) { let book_fut = get_book(); let music_fut = get_music(); join!(book_fut, music_fut) } #}
join! 的返回值是一個帶有每個傳入的 Future 的輸出的元組(tuple)。
try_join!
若 future 回傳的是 Result,請考慮使用 try_join! 而非 join!。由於 join! 會在所有 subfuture 完成就完成自己,甚至有 subfuture 回傳 Err 時還是會繼續處理其他 future。
和 join! 不同的事,當任意一個 subfuture 回傳錯誤時,try_join! 就會立刻完成。
# #![allow(unused_variables)] #fn main() { use futures::try_join; async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) } async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) } async fn get_book_and_music() -> Result<(Book, Music), String> { let book_fut = get_book(); let music_fut = get_music(); try_join!(book_fut, music_fut) } #}
請注意,傳入 try_join! 的 future 需要有同樣的錯誤型別。可以考慮使用 futures::future::TryFutureExt 的 .map_err(|e| ...) 和 err_into() 函式來統一這些錯誤型別:
# #![allow(unused_variables)] #fn main() { use futures::{ future::TryFutureExt, try_join, }; async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) } async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) } async fn get_book_and_music() -> Result<(Book, Music), String> { let book_fut = get_book().map_err(|()| "Unable to get book".to_string()); let music_fut = get_music(); try_join!(book_fut, music_fut) } #}
select!
futures::select 巨集可以同時執行多個 future,並在任何一個 future 完成是迅速回應使用者。
# #![allow(unused_variables)] #fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } #}
上面這個函式將並行執行 t1 與 t2。當 t1 或 t2 其中一個完成了,對應的處理程序就會呼叫 println!,這個函式就不會完成剩下的任務,而會直接結束。
select 基本的語法是 <pattern> = <expression> => <code>,你想要從多少個 future 中做 select 就重複多少次。
default => ... 與 complete => ...
select 也支援 default 與 complete 的流程分支。
default 分支會在所有被 select 的 future 都尚未完成時。一個包含 default 分支的 select 總是會立刻返回,因為開始執行時不會有任何 future 準備就緒。
complete 分支則負責處理這個狀況:當所有被 select 的 future 都完成且無法再有更多進展時。這有個不斷循環 select 的小撇步。
# #![allow(unused_variables)] #fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } #}
與 Unpin 及 FusedFuture 的互動
你可能會發現,第一個例子中我們會對兩個 async fn 回傳的 future 呼叫 .fuse(),且用了 pin_mut 來固定它們。這兩個都是必要操作,因為在 select 中的 future 必須實作 Unpin trait 與 FusedFuture trait。
Unpin 必要的原因是用在 select 的 future 不是直接取其值,而是取其可變引用(mutable reference)。由於沒有取走所有權,尚未完成的 future 可以在 select 之後再度使用。
FusedFuturue 列為必須是因為當一個 future 完成時,select 就不該再度輪詢它。FusedFuture 的原理是追蹤其 future 是否已完成。實作了 FusedFuture 使得 select 可以用在迴圈中,只輪詢尚未完成的 future。這可以從上面的範例中看到,a_fut 或 b_fut 將會在第二次迴圈迭代中完成,由於 future::ready 有實作 FusedFuture,所以可以告知 select 不要再輪詢它。
注意,stream 也有對應的 FusedStream trait。有實作這個 trait 或使用 .fuse() 封裝的 stream,都會從它們的組合子 .next() / .try_next() 產生 FusedFuture。
# #![allow(unused_variables)] #fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } #}
在 select 迴圈中使用 Fuse 與 FuturesUnordered 的並行任務
有個難以發現但很方便的函式是 Fuse::terminated(),它可以建立一個空的且已經終止的 future,並在往後填上實際需要執行的 future。
Fuse::termianted() 在遇到需要在 select 迴圈內執行一個任務,但這任務卻在在該迴圈內才產生的情況,顯得十分方便順手。
請注意使用 .select_next_some() 函式。這個函式可以與 select 配合,只執行會從 steam 產生 Some(_) 的分支,而忽略產生 None 的分支。
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}
當同一個 future 的多個副本需要同時執行,請使用 FuturesUnordered 型別。接下來的範例和上一個很相似,但會執行每個 run_on_new_num_fut 的副本到其完成,而非當新的產生就中止舊的。它同時會印出 run_on_new_num_fut 的返回值。
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } // Runs `run_on_new_num` with the latest number // retrieved from `get_new_num`. // // `get_new_num` is re-run every time a timer elapses, // immediately cancelling the currently running // `run_on_new_num` and replacing it with the newly // returned value. async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}