select!

futures::select 巨集可以同時執行多個 future,並在任何一個 future 完成是迅速回應使用者。


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
#}

上面這個函式將並行執行 t1t2。當 t1t2 其中一個完成了,對應的處理程序就會呼叫 println!,這個函式就不會完成剩下的任務,而會直接結束。

select 基本的語法是 <pattern> = <expression> => <code>,你想要從多少個 future 中做 select 就重複多少次。

default => ...complete => ...

select 也支援 defaultcomplete 的流程分支。

default 分支會在所有被 select 的 future 都尚未完成時。一個包含 default 分支的 select 總是會立刻返回,因為開始執行時不會有任何 future 準備就緒。

complete 分支則負責處理這個狀況:當所有被 select 的 future 都完成且無法再有更多進展時。這有個不斷循環 select 的小撇步。


# #![allow(unused_variables)]
#fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
#}

UnpinFusedFuture 的互動

你可能會發現,第一個例子中我們會對兩個 async fn 回傳的 future 呼叫 .fuse(),且用了 pin_mut 來固定它們。這兩個都是必要操作,因為在 select 中的 future 必須實作 Unpin trait 與 FusedFuture trait。

Unpin 必要的原因是用在 select 的 future 不是直接取其值,而是取其可變引用(mutable reference)。由於沒有取走所有權,尚未完成的 future 可以在 select 之後再度使用。

FusedFuturue 列為必須是因為當一個 future 完成時,select 就不該再度輪詢它。FusedFuture 的原理是追蹤其 future 是否已完成。實作了 FusedFuture 使得 select 可以用在迴圈中,只輪詢尚未完成的 future。這可以從上面的範例中看到,a_futb_fut 將會在第二次迴圈迭代中完成,由於 future::ready 有實作 FusedFuture,所以可以告知 select 不要再輪詢它。

注意,stream 也有對應的 FusedStream trait。有實作這個 trait 或使用 .fuse() 封裝的 stream,都會從它們的組合子 .next() / .try_next() 產生 FusedFuture


# #![allow(unused_variables)]
#fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
#}

select 迴圈中使用 FuseFuturesUnordered 的並行任務

有個難以發現但很方便的函式是 Fuse::terminated(),它可以建立一個空的且已經終止的 future,並在往後填上實際需要執行的 future。

Fuse::termianted() 在遇到需要在 select 迴圈內執行一個任務,但這任務卻在在該迴圈內才產生的情況,顯得十分方便順手。

請注意使用 .select_next_some() 函式。這個函式可以與 select 配合,只執行會從 steam 產生 Some(_) 的分支,而忽略產生 None 的分支。


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
#}

當同一個 future 的多個副本需要同時執行,請使用 FuturesUnordered 型別。接下來的範例和上一個很相似,但會執行每個 run_on_new_num_fut 的副本到其完成,而非當新的產生就中止舊的。它同時會印出 run_on_new_num_fut 的返回值。


# #![allow(unused_variables)]
#fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

#}