執行器與系統輸入輸出

在前面的章節 Future Trait,我們探討了一個在 socket 上非同步讀取資料的範例:


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

這個 future 會從一個 socket 上讀取可用的資料,倘若無任何可用資料,它會將控制權轉至執行器,並要求 future 的任務應在 socket 有可讀資料時能被喚醒。然而,這個範例對 Socket 型別的實作並不夠清楚,具體來說,我們無法得知 set_readable_callback 函式如何運作。當 socket 有可讀資料時,我們要怎樣安排呼叫一次 lw.wake() ?有個作法是讓一個執行緒不斷檢查 socket 是否可讀,並適時呼叫 wake()。不過,這方法需要分別對每個阻塞 IO 的 future 建立執行緒,效率低落,絕對會大幅降低咱們非同步程式碼的效率。

一般來說,這問題的解法通常透過整合系統原生的阻塞性 IO 元件(IO-aware system blocking primitive)來實踐,例如 Linux 上的 epoll、FreeBSD 與 macOS 的 kqueue、Windows 上的 IOCP,以及 Fuchsia 上的 port(這些都在跨平台 Rust 模組 mio 中實作介面出來)。這些原生元件都允許一個執行緒被多個非同步 IO 事件阻塞,並在任一事件完成時返回。實際上,這些 API 長相如下:


# #![allow(unused_variables)]
#fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { ... }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { ... }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);
#}

Future 的執行器可以使用這些元件來提供非同步 IO 物件,例如 socket 可以配置特定 IO 事件發生時的回呼函式(callback)。在上面 SocketRead 範例的情況下,Socket::set_readable_callback 函式的虛擬碼可能如下:


# #![allow(unused_variables)]
#fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
#}

現在,我們只需要有一個執行器執行緒來接收與發送任何 IO 事件到合適的 Waker,這個 Waker 會喚醒對應的任務,讓執行器在回去檢查更多 IO 事件之前,能驅動更多任務(然後一直循環下去⋯)。