Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

oneshotwatchbroadcast

本集目標

認識另外三種 channel,並學會判斷該用哪一個。

正文

上一集的 mpsc 是「多發送、單接收」。Tokio 還有三種 channel,各自適合不同的狀況。最基本的不同之處是發送端接收端各有幾個。

oneshot:一個值,一次

oneshot 是「一個發送端、一個接收端、只送一個值」。最適合「背景算一個結果,算好送回來」這種一次性的回傳。

extern crate tokio;

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<i32>();

    tokio::spawn(async move {
        // 算好一個結果,送回去(send 只能用一次,而且不用 .await)
        tx.send(42).expect("接收端不見了");
    });

    // rx 本身就是一個 Future,.await 它就拿到那個值
    let result = rx.await.expect("發送端不見了");
    println!("拿到結果:{}", result);
}

注意 oneshot 的接收端 rx 本身就是一個 Future,直接 rx.await 即可。

watch:只關心「最新狀態」

watch 是「一個發送端、多個接收端,但接收端只看得到最新的值」。它不是排隊收每一則訊息,而是像一個「公告欄」:發送端隨時更新上面的內容,接收端只關心「現在公告欄上寫什麼」。中間錯過的舊值不會補給你。

這最適合用來廣播如「目前設定是什麼」的狀態。

extern crate tokio;

use tokio::sync::watch;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("啟動中");

    tokio::spawn(async move {
        tx.send("執行中").expect("沒有接收端");
        tx.send("完成").expect("沒有接收端");
    });

    // changed().await 等到值有更新,borrow() 讀目前最新的值
    while rx.changed().await.is_ok() {
        println!("最新狀態:{}", *rx.borrow());
    }
}

broadcast:把事件送給所有訂閱者

broadcast 是「多發送、多接收,而且每個接收端都有自己的接收進度」。和 watch 不同,它不是只給最新值,而是會把每則訊息送給所有目前訂閱的接收端。適合「一則事件要通知所有訂閱者」的場景。

extern crate tokio;

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel::<i32>(16);
    let mut rx2 = tx.subscribe(); // 多開一個接收端

    tx.send(1).expect("沒有接收端");
    tx.send(2).expect("沒有接收端");

    // rx1 和 rx2 都會收到 1 和 2
    println!("rx1 收到:{}", rx1.recv().await.unwrap());
    println!("rx1 收到:{}", rx1.recv().await.unwrap());
    println!("rx2 收到:{}", rx2.recv().await.unwrap());
    println!("rx2 收到:{}", rx2.recv().await.unwrap());
}

不過 broadcast 不是無限容量的歷史紀錄。建立 channel 時給的 16 是容量;如果某個接收端太久沒收,落後超過容量,舊訊息會被丟掉。這時 recv().await 會回傳 Lagged(n),告訴你漏掉了幾則:

match rx.recv().await {
    Ok(value) => println!("收到:{}", value),
    Err(broadcast::error::RecvError::Lagged(n)) => {
        println!("太慢了,漏掉 {} 則訊息", n);
    }
    Err(broadcast::error::RecvError::Closed) => {
        println!("所有發送端都關閉了");
    }
}

所以比較精準地說:broadcast 會把訊息廣播給所有接收端,但每個接收端要自己跟上;如果跟不上,就會收到 Lagged,而不是保證永遠拿得到每一則舊訊息。

重點整理

  • 不同的 channel「發送端 / 接收端數量」也不一樣。
  • oneshot:單送單收、只送一個值一次,接收端本身是 Futurerx.await),適合回傳結果。
  • watch:單送多收、只看得到最新值,適合廣播目前狀態;用 .changed().await + .borrow()
  • broadcast:多送多收、把事件通知所有訂閱者;每個接收端有自己的進度,但落後超過容量時會收到 Lagged
  • 對照上一集的 mpsc(多送單收、收每一則、queue)。