diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 2521c57..d7a6125 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -13,11 +13,11 @@ use std::{ time::Duration, }; -use tokio::task::JoinHandle; +use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ app::{screens::CurrentScreen, App}, - input::{event::InputEvent, mapper::InputMapper}, + input::{event::InputEvent, handle::InputHandle}, terminal::{handle::TerminalHandle, messages::TerminalFrame, TerminalError}, }; @@ -136,8 +136,12 @@ async fn input_handling( Ok(ControlFlow::Continue(())) } -pub async fn run_app(mut app: App, terminal_handle: TerminalHandle) -> color_eyre::Result<()> { - let mut input_mapper = InputMapper::default(); +pub async fn run_app( + mut app: App, + terminal_handle: TerminalHandle, + input_handle: InputHandle, + mut app_input_rx: mpsc::Receiver, +) -> color_eyre::Result<()> { let mut loading = TerminalLoadingIndicator::new(terminal_handle.clone()); loop { @@ -148,14 +152,15 @@ pub async fn run_app(mut app: App, terminal_handle: TerminalHandle) -> color_eyr .await .map_err(terminal_error)?; - if let Some(terminal_event) = terminal_handle.read_event().await.map_err(terminal_error)? { - let input = input_mapper.map_terminal_event(terminal_event, &app.input_context()); - if let Some(input) = input { + match app_input_rx.recv().await { + Some(input) => { match input_handling(&mut app, input, &terminal_handle, &mut loading).await? { ControlFlow::Continue(()) => {} ControlFlow::Break(()) => return Ok(()), } + input_handle.update_context(app.input_context()).await.ok(); } + None => return Ok(()), // InputActor stopped } } } diff --git a/src/input/actor.rs b/src/input/actor.rs new file mode 100644 index 0000000..5e865ea --- /dev/null +++ b/src/input/actor.rs @@ -0,0 +1,264 @@ +use std::time::Duration; + +use tokio::sync::mpsc; + +use crate::{ + input::{ + context::InputContext, + event::{InputEvent, TerminalEvent}, + handle::InputHandle, + mapper::InputMapper, + messages::InputMessage, + }, + terminal::handle::TerminalHandle, +}; + +pub const DEFAULT_INPUT_CHANNEL_SIZE: usize = 32; +const EVENT_POLL_TIMEOUT: Duration = Duration::from_millis(50); + +pub struct InputActor { + rx: mpsc::Receiver, + terminal_handle: TerminalHandle, + mapper: InputMapper, + context: InputContext, + subscriber: Option>, +} + +impl InputActor { + pub fn spawn(terminal_handle: TerminalHandle, initial_context: InputContext) -> InputHandle { + let (tx, rx) = mpsc::channel(DEFAULT_INPUT_CHANNEL_SIZE); + tracing::debug!( + channel_size = DEFAULT_INPUT_CHANNEL_SIZE, + "spawning input actor" + ); + tokio::spawn( + Self { + rx, + terminal_handle, + mapper: InputMapper::default(), + context: initial_context, + subscriber: None, + } + .run(), + ); + InputHandle::new(tx) + } + + pub async fn run(mut self) { + tracing::info!("input actor started"); + + // Spawn a dedicated pump subtask that polls the terminal and buffers raw + // events. A dedicated subtask is used so that an in-flight poll_event + // future is never abandoned mid-read when a control message wins the + // select! race — abandoning it would silently discard the event because + // the TerminalActor already consumed it from the OS queue. + let (event_tx, mut event_rx) = mpsc::channel::(8); + let terminal_handle = self.terminal_handle.clone(); + tokio::spawn(async move { + loop { + match terminal_handle.poll_event(EVENT_POLL_TIMEOUT).await { + Ok(Some(event)) => { + tracing::debug!(?event, "terminal event captured by input pump"); + if event_tx.send(event).await.is_err() { + break; // InputActor stopped; pump exits cleanly. + } + } + Ok(None) => {} // poll timeout with no event — retry + Err(err) => { + tracing::warn!(error = %err, "terminal poll error; input pump stopping"); + break; + } + } + } + tracing::debug!("input event pump stopped"); + }); + + loop { + tokio::select! { + msg = self.rx.recv() => match msg { + Some(InputMessage::SubscribeApp { tx }) => { + tracing::debug!("app subscribed to input events"); + self.subscriber = Some(tx); + } + Some(InputMessage::UpdateContext { context }) => { + tracing::debug!(?context, "input context updated"); + self.context = context; + } + Some(InputMessage::Shutdown) | None => { + tracing::info!("input actor stopping"); + break; + } + }, + event = event_rx.recv() => match event { + Some(terminal_event) => { + tracing::debug!(?terminal_event, "terminal event received by input actor"); + if let Some(input_event) = + self.mapper.map_terminal_event(terminal_event, &self.context) + { + tracing::debug!(?input_event, "input event mapped and delivering"); + self.deliver(input_event).await; + } else { + tracing::debug!("terminal event discarded (no mapping for current context)"); + } + } + None => { + // Pump exited (terminal closed or error) — stop the actor. + tracing::warn!("input event pump closed; stopping input actor"); + break; + } + }, + } + } + + tracing::info!("input actor stopped"); + } + + async fn deliver(&mut self, event: InputEvent) { + let Some(tx) = &self.subscriber else { + return; + }; + if tx.send(event).await.is_err() { + tracing::warn!("input subscriber channel closed; clearing subscriber"); + self.subscriber = None; + } + } +} + +#[cfg(test)] +mod tests { + use ratatui::crossterm::event::KeyCode; + + use crate::{ + app::screens::CurrentScreen, + input::{ + context::InputContext, + event::{InputEvent, KeyInput, TerminalEvent}, + }, + terminal::{actor::TerminalActor, session::MockTerminalSessionApi}, + }; + + use super::*; + + fn mailing_list_context() -> InputContext { + InputContext::new(CurrentScreen::MailingListSelection) + } + + fn details_context() -> InputContext { + InputContext::new(CurrentScreen::PatchsetDetails) + } + + fn spawn_test_actor( + session: MockTerminalSessionApi, + context: InputContext, + ) -> (InputHandle, TerminalHandle) { + let terminal_handle = TerminalActor::spawn(Box::new(session)); + let input_handle = InputActor::spawn(terminal_handle.clone(), context); + (input_handle, terminal_handle) + } + + #[tokio::test] + async fn key_event_in_mailing_list_context_delivers_navigate_down() { + let mut session = MockTerminalSessionApi::new(); + // First poll returns the key; subsequent polls time-out (return None). + session + .expect_poll_event() + .times(1) + .returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Down))))); + session.expect_poll_event().returning(|_| Ok(None)); + + let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context()); + let (sub_tx, mut sub_rx) = mpsc::channel::(8); + + input_handle.subscribe_app(sub_tx).await.unwrap(); + + let received = sub_rx.recv().await; + assert_eq!(received, Some(InputEvent::NavigateDown)); + } + + #[tokio::test] + async fn updating_context_to_details_maps_escape_to_back() { + let mut session = MockTerminalSessionApi::new(); + // First call returns None so the actor can process the context update + // before the key arrives. Second call returns the Esc key. + // Remaining calls time-out. + session.expect_poll_event().times(1).returning(|_| Ok(None)); + session + .expect_poll_event() + .times(1) + .returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Esc))))); + session.expect_poll_event().returning(|_| Ok(None)); + + let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context()); + let (sub_tx, mut sub_rx) = mpsc::channel::(8); + + input_handle.subscribe_app(sub_tx).await.unwrap(); + // In MailingListSelection, Esc maps to Quit. Switch to PatchsetDetails + // so Esc maps to Back instead. + input_handle + .update_context(details_context()) + .await + .unwrap(); + + let received = sub_rx.recv().await; + assert_eq!(received, Some(InputEvent::Back)); + } + + #[tokio::test] + async fn unmapped_terminal_event_is_discarded_without_error() { + let mut session = MockTerminalSessionApi::new(); + // F6 has no mapping in any screen — it should be silently dropped. + session + .expect_poll_event() + .times(1) + .returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::F(6)))))); + // Second event — Down — is delivered so we can wait for it to confirm + // the actor kept running after discarding the unmapped event. + session + .expect_poll_event() + .times(1) + .returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Down))))); + session.expect_poll_event().returning(|_| Ok(None)); + + let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context()); + let (sub_tx, mut sub_rx) = mpsc::channel::(8); + input_handle.subscribe_app(sub_tx).await.unwrap(); + + // Only NavigateDown arrives; the F6 event is silently discarded. + let received = sub_rx.recv().await; + assert_eq!(received, Some(InputEvent::NavigateDown)); + } + + #[tokio::test] + async fn shutdown_closes_subscriber_channel() { + let mut session = MockTerminalSessionApi::new(); + session.expect_poll_event().returning(|_| Ok(None)); + + let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context()); + let (sub_tx, mut sub_rx) = mpsc::channel::(8); + + input_handle.subscribe_app(sub_tx).await.unwrap(); + input_handle.shutdown().await.unwrap(); + + // When the actor stops it drops the subscriber Sender, closing the + // channel. recv() returns None once all senders are gone. + assert!(sub_rx.recv().await.is_none()); + } + + #[tokio::test] + async fn popup_open_context_maps_escape_to_close_popup() { + let mut session = MockTerminalSessionApi::new(); + session + .expect_poll_event() + .times(1) + .returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Esc))))); + session.expect_poll_event().returning(|_| Ok(None)); + + let context = details_context().with_popup_open(true); + let (input_handle, _terminal_handle) = spawn_test_actor(session, context); + let (sub_tx, mut sub_rx) = mpsc::channel::(8); + input_handle.subscribe_app(sub_tx).await.unwrap(); + + let received = sub_rx.recv().await; + assert_eq!(received, Some(InputEvent::ClosePopup)); + } +} diff --git a/src/input/errors.rs b/src/input/errors.rs new file mode 100644 index 0000000..d551cd1 --- /dev/null +++ b/src/input/errors.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +/// Failures at the Input actor boundary. +#[derive(Debug, Error)] +pub enum InputError { + /// The Input actor's receive channel is closed; the actor has stopped. + #[error("input actor closed")] + Closed, +} diff --git a/src/input/handle.rs b/src/input/handle.rs new file mode 100644 index 0000000..7eb73de --- /dev/null +++ b/src/input/handle.rs @@ -0,0 +1,49 @@ +use tokio::sync::mpsc; + +use crate::input::{ + context::InputContext, errors::InputError, event::InputEvent, messages::InputMessage, +}; + +/// Cloneable handle to the Input actor. +/// +/// All methods are fire-and-forget sends; the actor processes them +/// asynchronously in its own task. +#[derive(Clone)] +pub struct InputHandle { + tx: mpsc::Sender, +} + +impl InputHandle { + pub fn new(tx: mpsc::Sender) -> Self { + Self { tx } + } + + /// Registers `tx` as the destination for translated [`InputEvent`] values. + /// + /// The actor delivers one `InputEvent` per raw terminal event that maps to + /// a semantic action under the current [`InputContext`]. + pub async fn subscribe_app(&self, tx: mpsc::Sender) -> Result<(), InputError> { + self.send(InputMessage::SubscribeApp { tx }).await + } + + /// Replaces the current mapping context with `context`. + /// + /// Should be called by the App after every state mutation that changes + /// screen, popup visibility, or edit mode. + pub async fn update_context(&self, context: InputContext) -> Result<(), InputError> { + self.send(InputMessage::UpdateContext { context }).await + } + + /// Requests the actor to stop its event loop. + /// + /// Dropping all clones of the handle achieves the same effect because the + /// actor's receive channel closes when its last sender is gone. + #[allow(dead_code)] + pub async fn shutdown(&self) -> Result<(), InputError> { + self.send(InputMessage::Shutdown).await + } + + async fn send(&self, message: InputMessage) -> Result<(), InputError> { + self.tx.send(message).await.map_err(|_| InputError::Closed) + } +} diff --git a/src/input/messages.rs b/src/input/messages.rs new file mode 100644 index 0000000..b496e28 --- /dev/null +++ b/src/input/messages.rs @@ -0,0 +1,14 @@ +use tokio::sync::mpsc; + +use crate::input::{context::InputContext, event::InputEvent}; + +/// Messages understood by the Input actor. +pub enum InputMessage { + /// Registers the App as the single consumer of [`InputEvent`] values. + SubscribeApp { tx: mpsc::Sender }, + /// Updates the mapping context so the actor can translate the next + /// terminal event with current application state. + UpdateContext { context: InputContext }, + /// Requests a clean shutdown of the actor's event loop. + Shutdown, +} diff --git a/src/input/mod.rs b/src/input/mod.rs index 7d7d3a5..ceee4aa 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -1,13 +1,17 @@ //! Protocol boundary between terminal input and application intent. //! -//! Phase 9 uses a pull loop: `handler::run_app` reads raw -//! [`event::TerminalEvent`] values through the terminal actor, then -//! [`mapper::InputMapper`] translates them using [`context::InputContext`] -//! before handlers consume semantic [`event::InputEvent`] values. Phase 10 -//! will introduce an `InputActor` that broadcasts terminal events instead of -//! this direct pull loop. +//! The Input actor mediates between the Terminal actor (producer of raw +//! [`event::TerminalEvent`] values) and the App (consumer of semantic +//! [`event::InputEvent`] values). [`mapper::InputMapper`] translates events +//! using the current [`context::InputContext`], which the App updates after +//! every state mutation. [`handle::InputHandle`] is the cloneable public +//! interface to the actor. +pub mod actor; pub mod bindings; pub mod context; +pub mod errors; pub mod event; +pub mod handle; pub mod mapper; +pub mod messages; diff --git a/src/main.rs b/src/main.rs index a46ca8e..0dbd02d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ use infrastructure::{ shell::OsShell, terminal::init, }; +use input::{actor::InputActor, event::InputEvent}; use lore::{ application::{actor::LoreApiActor, cache::CacheTtl, service::LoreService}, infrastructure::{ @@ -38,6 +39,7 @@ use render::{actor::RenderActor, ShellRenderService}; use render_prefs::PatchRenderer; use std::{ops::ControlFlow, sync::Arc}; use terminal::{actor::TerminalActor, session::CrosstermTerminalSession}; +use tokio::sync::mpsc; use tracing::{event, Level}; /// Verifies required and optional external binaries before the TUI runs. @@ -174,7 +176,14 @@ async fn main() -> color_eyre::Result<()> { bail!("patch-hub cannot be executed because some dependencies are missing, check logs for more information"); } - run_app(app, terminal_handle.clone()).await?; + let (app_input_tx, app_input_rx) = mpsc::channel::(64); + let input_handle = InputActor::spawn(terminal_handle.clone(), app.input_context()); + input_handle + .subscribe_app(app_input_tx) + .await + .map_err(|e| eyre!("{e}"))?; + + run_app(app, terminal_handle.clone(), input_handle, app_input_rx).await?; terminal_handle .shutdown() .await diff --git a/src/terminal/handle.rs b/src/terminal/handle.rs index 83973bb..f9d42e5 100644 --- a/src/terminal/handle.rs +++ b/src/terminal/handle.rs @@ -26,12 +26,17 @@ impl TerminalHandle { .await } + /// Reads the next raw terminal event, blocking until one arrives. + /// + /// The runtime event loop now receives input through [`InputHandle`] rather + /// than calling this directly. This method is kept for potential future use + /// in interactive sub-flows. + #[allow(dead_code)] pub async fn read_event(&self) -> TerminalResult> { self.request_result(|reply| TerminalMessage::ReadEvent { reply }) .await } - #[allow(dead_code)] // Reserved for Phase 10 non-blocking input polling. pub async fn poll_event(&self, timeout: Duration) -> TerminalResult> { self.request_result(|reply| TerminalMessage::PollEvent { timeout, reply }) .await diff --git a/src/terminal/mod.rs b/src/terminal/mod.rs index 3d14168..48bb521 100644 --- a/src/terminal/mod.rs +++ b/src/terminal/mod.rs @@ -1,11 +1,11 @@ //! Actor boundary for the Ratatui/Crossterm terminal session. //! -//! Phase 9 makes this module the single owner of terminal session operations. -//! The runtime pull loop in `handler::run_app` draws through -//! [`handle::TerminalHandle::draw`] and reads input through -//! [`handle::TerminalHandle::read_event`]. Phase 10 will introduce an -//! `InputActor` that broadcasts terminal events instead of the current -//! direct pull loop. +//! This module is the single owner of terminal session operations. The runtime +//! draws frames through [`handle::TerminalHandle::draw`]. Raw terminal events +//! are delivered to the [`crate::input`] actor via +//! [`handle::TerminalHandle::poll_event`]; the Input actor translates them into +//! semantic [`crate::input::event::InputEvent`] values before forwarding them +//! to the App. pub mod actor; pub mod errors;