迭代與並行
和同步的 Iterator
一樣,迭代並處理在 Steam
裡的值有許多作法。有許多組合子(combinator)風格的方法,例如 map
、filter
和 fold
,和它們遇錯就提前退出的表親 try_map
、try_filter
以及 try_fold
。
不幸的是 for
迴圈無法配合 Stream
使用,但命令式風格程式碼 while let
與 next
/try_next
可以使用:
# #![allow(unused_variables)] #fn main() { async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 { use futures::stream::StreamExt; // for `next` let mut sum = 0; while let Some(item) = stream.next().await { sum += item; } sum } async fn sum_with_try_next( mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>, ) -> Result<i32, io::Error> { use futures::stream::TryStreamExt; // for `try_next` let mut sum = 0; while let Some(item) = stream.try_next().await? { sum += item; } Ok(sum) } #}
不過,如果一次只處理一個元素,那就錯失善用並行的良機,畢竟並行是寫非同步程式碼的初衷嘛。想要並行處理 steam 上多個元素,請用 for_each_concurrent
與 try_for_each_concurrent
方法:
# #![allow(unused_variables)] #fn main() { async fn jump_around( mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>, ) -> Result<(), io::Error> { use futures::stream::TryStreamExt; // for `try_for_each_concurrent` const MAX_CONCURRENT_JUMPERS: usize = 100; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } #}