Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Stream

本集目標

認識 Stream——async 版的 Iterator,以及怎麼走訪它。

正文

Streamasync 版的 Iterator

第 6 章的 Iterator 是「一連串值,要一個一個取」。但它的 .next()同步的——呼叫就馬上給你下一個值(或 None)。

Stream 是它的 async 版本:一樣是一連串值要一個一個取,但下一個值可能要等(例如等網路送來下一筆資料、等計時器、等使用者輸入)。所以 Stream.next() 回傳的是一個 Future,你要 .next().await 才拿得到下一個值。

對照記就很好懂:

  • iterator.next() → 回傳 Option<Item>(同步、馬上給)。
  • stream.next().await → 回傳 Option<Item>(要 .await、可能等一下)。

兩者都用「None 代表結束」。

走訪一個 Stream

Iterator 可以用 for 走訪,但 Stream 不行(for 沒辦法 .await)。Stream 的標準走訪寫法是 while let Some(x) = stream.next().await——一個一個取,取到 None 就停:

extern crate tokio;
extern crate tokio_stream;

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    // 從一個 Vec 做出最簡單的 stream
    let mut stream = tokio_stream::iter(vec![1, 2, 3]);

    // 一個一個取值,取到 None 為止
    while let Some(value) = stream.next().await {
        println!("收到 {}", value);
    }
}

Stream 不在標準庫裡

有件事要特別說明:和 Future 不同,Stream 目前不在標準庫裡,它定義在社群專案(futures)裡,Tokio 生態則提供 tokio_stream。要用 nextmapfilter 這些方法,得引入對應的擴充 trait StreamExt

extern crate tokio;
extern crate tokio_stream;

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    // 和 Iterator 一樣可以串接 map / filter 這些工具
    let mut stream = tokio_stream::iter(1..=5)
        .map(|x| x * 2)
        .filter(|x| x % 3 == 0);

    while let Some(value) = stream.next().await {
        println!("{}", value);
    }
}

你會發現 mapfilter 這些方法跟第 6 章的 Iterator 幾乎一模一樣——因為 Stream 本來就是 Iteratorasync 翻版。學過 IteratorStream 對你來說只是多了 .await

實務上 Stream 很適合表達「源源不絕、會陸續到來的資料」——例如一個一個進來的網路連線、資料庫查詢的逐筆結果或定時觸發的事件。tokio_stream 提供了一整套處理它們的工具。

重點整理

  • Streamasync 版的 Iterator:一連串值一個一個取,但下一個值可能要等,所以是 .next().await
  • 對照:iterator.next() 同步回 Optionstream.next().await.await 才回 Option;都用 None 表示結束。
  • 走訪用 while let Some(x) = stream.next().awaitStream 不能用 for)。
  • Stream 不在標準庫,定義在 futures;用 tokio_stream::StreamExt 取得 nextmapfilter 等方法(用法和 Iterator 幾乎一樣)。