select!
futures::select
巨集可以同時執行多個 future,並在任何一個 future 完成是迅速回應使用者。
# #![allow(unused_variables)] #fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } #}
上面這個函式將並行執行 t1
與 t2
。當 t1
或 t2
其中一個完成了,對應的處理程序就會呼叫 println!
,這個函式就不會完成剩下的任務,而會直接結束。
select
基本的語法是 <pattern> = <expression> => <code>
,你想要從多少個 future 中做 select
就重複多少次。
default => ...
與 complete => ...
select
也支援 default
與 complete
的流程分支。
default
分支會在所有被 select
的 future 都尚未完成時。一個包含 default
分支的 select
總是會立刻返回,因為開始執行時不會有任何 future 準備就緒。
complete
分支則負責處理這個狀況:當所有被 select
的 future 都完成且無法再有更多進展時。這有個不斷循環 select
的小撇步。
# #![allow(unused_variables)] #fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } #}
與 Unpin
及 FusedFuture
的互動
你可能會發現,第一個例子中我們會對兩個 async fn
回傳的 future 呼叫 .fuse()
,且用了 pin_mut
來固定它們。這兩個都是必要操作,因為在 select
中的 future 必須實作 Unpin
trait 與 FusedFuture
trait。
Unpin
必要的原因是用在 select
的 future 不是直接取其值,而是取其可變引用(mutable reference)。由於沒有取走所有權,尚未完成的 future 可以在 select
之後再度使用。
FusedFuturue
列為必須是因為當一個 future 完成時,select
就不該再度輪詢它。FusedFuture
的原理是追蹤其 future 是否已完成。實作了 FusedFuture
使得 select
可以用在迴圈中,只輪詢尚未完成的 future。這可以從上面的範例中看到,a_fut
或 b_fut
將會在第二次迴圈迭代中完成,由於 future::ready
有實作 FusedFuture
,所以可以告知 select
不要再輪詢它。
注意,stream 也有對應的 FusedStream
trait。有實作這個 trait 或使用 .fuse()
封裝的 stream,都會從它們的組合子 .next()
/ .try_next()
產生 FusedFuture
。
# #![allow(unused_variables)] #fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } #}
在 select
迴圈中使用 Fuse
與 FuturesUnordered
的並行任務
有個難以發現但很方便的函式是 Fuse::terminated()
,它可以建立一個空的且已經終止的 future,並在往後填上實際需要執行的 future。
Fuse::termianted()
在遇到需要在 select
迴圈內執行一個任務,但這任務卻在在該迴圈內才產生的情況,顯得十分方便順手。
請注意使用 .select_next_some()
函式。這個函式可以與 select
配合,只執行會從 steam 產生 Some(_)
的分支,而忽略產生 None
的分支。
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}
當同一個 future 的多個副本需要同時執行,請使用 FuturesUnordered
型別。接下來的範例和上一個很相似,但會執行每個 run_on_new_num_fut
的副本到其完成,而非當新的產生就中止舊的。它同時會印出 run_on_new_num_fut
的返回值。
# #![allow(unused_variables)] #fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } // Runs `run_on_new_num` with the latest number // retrieved from `get_new_num`. // // `get_new_num` is re-run every time a timer elapses, // immediately cancelling the currently running // `run_on_new_num` and replacing it with the newly // returned value. async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } #}