Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use std::{
time::Duration,
};

use tokio::task::JoinHandle;
use tokio::{sync::mpsc, task::JoinHandle};

use crate::{
app::{screens::CurrentScreen, App},
input::{event::InputEvent, mapper::InputMapper},
input::{event::InputEvent, handle::InputHandle},
terminal::{handle::TerminalHandle, messages::TerminalFrame, TerminalError},
};

Expand Down Expand Up @@ -136,8 +136,12 @@ async fn input_handling(
Ok(ControlFlow::Continue(()))
}

pub async fn run_app(mut app: App, terminal_handle: TerminalHandle) -> color_eyre::Result<()> {
let mut input_mapper = InputMapper::default();
pub async fn run_app(
mut app: App,
terminal_handle: TerminalHandle,
input_handle: InputHandle,
mut app_input_rx: mpsc::Receiver<InputEvent>,
) -> color_eyre::Result<()> {
let mut loading = TerminalLoadingIndicator::new(terminal_handle.clone());

loop {
Expand All @@ -148,14 +152,15 @@ pub async fn run_app(mut app: App, terminal_handle: TerminalHandle) -> color_eyr
.await
.map_err(terminal_error)?;

if let Some(terminal_event) = terminal_handle.read_event().await.map_err(terminal_error)? {
let input = input_mapper.map_terminal_event(terminal_event, &app.input_context());
if let Some(input) = input {
match app_input_rx.recv().await {
Some(input) => {
match input_handling(&mut app, input, &terminal_handle, &mut loading).await? {
ControlFlow::Continue(()) => {}
ControlFlow::Break(()) => return Ok(()),
}
input_handle.update_context(app.input_context()).await.ok();
}
None => return Ok(()), // InputActor stopped
}
}
}
Expand Down
264 changes: 264 additions & 0 deletions src/input/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use std::time::Duration;

use tokio::sync::mpsc;

use crate::{
input::{
context::InputContext,
event::{InputEvent, TerminalEvent},
handle::InputHandle,
mapper::InputMapper,
messages::InputMessage,
},
terminal::handle::TerminalHandle,
};

pub const DEFAULT_INPUT_CHANNEL_SIZE: usize = 32;
const EVENT_POLL_TIMEOUT: Duration = Duration::from_millis(50);

pub struct InputActor {
rx: mpsc::Receiver<InputMessage>,
terminal_handle: TerminalHandle,
mapper: InputMapper,
context: InputContext,
subscriber: Option<mpsc::Sender<InputEvent>>,
}

impl InputActor {
pub fn spawn(terminal_handle: TerminalHandle, initial_context: InputContext) -> InputHandle {
let (tx, rx) = mpsc::channel(DEFAULT_INPUT_CHANNEL_SIZE);
tracing::debug!(
channel_size = DEFAULT_INPUT_CHANNEL_SIZE,
"spawning input actor"
);
tokio::spawn(
Self {
rx,
terminal_handle,
mapper: InputMapper::default(),
context: initial_context,
subscriber: None,
}
.run(),
);
InputHandle::new(tx)
}

pub async fn run(mut self) {
tracing::info!("input actor started");

// Spawn a dedicated pump subtask that polls the terminal and buffers raw
// events. A dedicated subtask is used so that an in-flight poll_event
// future is never abandoned mid-read when a control message wins the
// select! race — abandoning it would silently discard the event because
// the TerminalActor already consumed it from the OS queue.
let (event_tx, mut event_rx) = mpsc::channel::<TerminalEvent>(8);
let terminal_handle = self.terminal_handle.clone();
tokio::spawn(async move {
loop {
match terminal_handle.poll_event(EVENT_POLL_TIMEOUT).await {
Ok(Some(event)) => {
tracing::debug!(?event, "terminal event captured by input pump");
if event_tx.send(event).await.is_err() {
break; // InputActor stopped; pump exits cleanly.
}
}
Ok(None) => {} // poll timeout with no event — retry
Err(err) => {
tracing::warn!(error = %err, "terminal poll error; input pump stopping");
break;
}
}
}
tracing::debug!("input event pump stopped");
});

loop {
tokio::select! {
msg = self.rx.recv() => match msg {
Some(InputMessage::SubscribeApp { tx }) => {
tracing::debug!("app subscribed to input events");
self.subscriber = Some(tx);
}
Some(InputMessage::UpdateContext { context }) => {
tracing::debug!(?context, "input context updated");
self.context = context;
}
Some(InputMessage::Shutdown) | None => {
tracing::info!("input actor stopping");
break;
}
},
event = event_rx.recv() => match event {
Some(terminal_event) => {
tracing::debug!(?terminal_event, "terminal event received by input actor");
if let Some(input_event) =
self.mapper.map_terminal_event(terminal_event, &self.context)
{
tracing::debug!(?input_event, "input event mapped and delivering");
self.deliver(input_event).await;
} else {
tracing::debug!("terminal event discarded (no mapping for current context)");
}
}
None => {
// Pump exited (terminal closed or error) — stop the actor.
tracing::warn!("input event pump closed; stopping input actor");
break;
}
},
}
}

tracing::info!("input actor stopped");
}

async fn deliver(&mut self, event: InputEvent) {
let Some(tx) = &self.subscriber else {
return;
};
if tx.send(event).await.is_err() {
tracing::warn!("input subscriber channel closed; clearing subscriber");
self.subscriber = None;
}
}
}

#[cfg(test)]
mod tests {
use ratatui::crossterm::event::KeyCode;

use crate::{
app::screens::CurrentScreen,
input::{
context::InputContext,
event::{InputEvent, KeyInput, TerminalEvent},
},
terminal::{actor::TerminalActor, session::MockTerminalSessionApi},
};

use super::*;

fn mailing_list_context() -> InputContext {
InputContext::new(CurrentScreen::MailingListSelection)
}

fn details_context() -> InputContext {
InputContext::new(CurrentScreen::PatchsetDetails)
}

fn spawn_test_actor(
session: MockTerminalSessionApi,
context: InputContext,
) -> (InputHandle, TerminalHandle) {
let terminal_handle = TerminalActor::spawn(Box::new(session));
let input_handle = InputActor::spawn(terminal_handle.clone(), context);
(input_handle, terminal_handle)
}

#[tokio::test]
async fn key_event_in_mailing_list_context_delivers_navigate_down() {
let mut session = MockTerminalSessionApi::new();
// First poll returns the key; subsequent polls time-out (return None).
session
.expect_poll_event()
.times(1)
.returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Down)))));
session.expect_poll_event().returning(|_| Ok(None));

let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context());
let (sub_tx, mut sub_rx) = mpsc::channel::<InputEvent>(8);

input_handle.subscribe_app(sub_tx).await.unwrap();

let received = sub_rx.recv().await;
assert_eq!(received, Some(InputEvent::NavigateDown));
}

#[tokio::test]
async fn updating_context_to_details_maps_escape_to_back() {
let mut session = MockTerminalSessionApi::new();
// First call returns None so the actor can process the context update
// before the key arrives. Second call returns the Esc key.
// Remaining calls time-out.
session.expect_poll_event().times(1).returning(|_| Ok(None));
session
.expect_poll_event()
.times(1)
.returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Esc)))));
session.expect_poll_event().returning(|_| Ok(None));

let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context());
let (sub_tx, mut sub_rx) = mpsc::channel::<InputEvent>(8);

input_handle.subscribe_app(sub_tx).await.unwrap();
// In MailingListSelection, Esc maps to Quit. Switch to PatchsetDetails
// so Esc maps to Back instead.
input_handle
.update_context(details_context())
.await
.unwrap();

let received = sub_rx.recv().await;
assert_eq!(received, Some(InputEvent::Back));
}

#[tokio::test]
async fn unmapped_terminal_event_is_discarded_without_error() {
let mut session = MockTerminalSessionApi::new();
// F6 has no mapping in any screen — it should be silently dropped.
session
.expect_poll_event()
.times(1)
.returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::F(6))))));
// Second event — Down — is delivered so we can wait for it to confirm
// the actor kept running after discarding the unmapped event.
session
.expect_poll_event()
.times(1)
.returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Down)))));
session.expect_poll_event().returning(|_| Ok(None));

let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context());
let (sub_tx, mut sub_rx) = mpsc::channel::<InputEvent>(8);
input_handle.subscribe_app(sub_tx).await.unwrap();

// Only NavigateDown arrives; the F6 event is silently discarded.
let received = sub_rx.recv().await;
assert_eq!(received, Some(InputEvent::NavigateDown));
}

#[tokio::test]
async fn shutdown_closes_subscriber_channel() {
let mut session = MockTerminalSessionApi::new();
session.expect_poll_event().returning(|_| Ok(None));

let (input_handle, _terminal_handle) = spawn_test_actor(session, mailing_list_context());
let (sub_tx, mut sub_rx) = mpsc::channel::<InputEvent>(8);

input_handle.subscribe_app(sub_tx).await.unwrap();
input_handle.shutdown().await.unwrap();

// When the actor stops it drops the subscriber Sender, closing the
// channel. recv() returns None once all senders are gone.
assert!(sub_rx.recv().await.is_none());
}

#[tokio::test]
async fn popup_open_context_maps_escape_to_close_popup() {
let mut session = MockTerminalSessionApi::new();
session
.expect_poll_event()
.times(1)
.returning(|_| Ok(Some(TerminalEvent::Key(KeyInput::press(KeyCode::Esc)))));
session.expect_poll_event().returning(|_| Ok(None));

let context = details_context().with_popup_open(true);
let (input_handle, _terminal_handle) = spawn_test_actor(session, context);
let (sub_tx, mut sub_rx) = mpsc::channel::<InputEvent>(8);
input_handle.subscribe_app(sub_tx).await.unwrap();

let received = sub_rx.recv().await;
assert_eq!(received, Some(InputEvent::ClosePopup));
}
}
9 changes: 9 additions & 0 deletions src/input/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use thiserror::Error;

/// Failures at the Input actor boundary.
#[derive(Debug, Error)]
pub enum InputError {
/// The Input actor's receive channel is closed; the actor has stopped.
#[error("input actor closed")]
Closed,
}
49 changes: 49 additions & 0 deletions src/input/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use tokio::sync::mpsc;

use crate::input::{
context::InputContext, errors::InputError, event::InputEvent, messages::InputMessage,
};

/// Cloneable handle to the Input actor.
///
/// All methods are fire-and-forget sends; the actor processes them
/// asynchronously in its own task.
#[derive(Clone)]
pub struct InputHandle {
tx: mpsc::Sender<InputMessage>,
}

impl InputHandle {
pub fn new(tx: mpsc::Sender<InputMessage>) -> Self {
Self { tx }
}

/// Registers `tx` as the destination for translated [`InputEvent`] values.
///
/// The actor delivers one `InputEvent` per raw terminal event that maps to
/// a semantic action under the current [`InputContext`].
pub async fn subscribe_app(&self, tx: mpsc::Sender<InputEvent>) -> Result<(), InputError> {
self.send(InputMessage::SubscribeApp { tx }).await
}

/// Replaces the current mapping context with `context`.
///
/// Should be called by the App after every state mutation that changes
/// screen, popup visibility, or edit mode.
pub async fn update_context(&self, context: InputContext) -> Result<(), InputError> {
self.send(InputMessage::UpdateContext { context }).await
}

/// Requests the actor to stop its event loop.
///
/// Dropping all clones of the handle achieves the same effect because the
/// actor's receive channel closes when its last sender is gone.
#[allow(dead_code)]
pub async fn shutdown(&self) -> Result<(), InputError> {
self.send(InputMessage::Shutdown).await
}

async fn send(&self, message: InputMessage) -> Result<(), InputError> {
self.tx.send(message).await.map_err(|_| InputError::Closed)
}
}
14 changes: 14 additions & 0 deletions src/input/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use tokio::sync::mpsc;

use crate::input::{context::InputContext, event::InputEvent};

/// Messages understood by the Input actor.
pub enum InputMessage {
/// Registers the App as the single consumer of [`InputEvent`] values.
SubscribeApp { tx: mpsc::Sender<InputEvent> },
/// Updates the mapping context so the actor can translate the next
/// terminal event with current application state.
UpdateContext { context: InputContext },
/// Requests a clean shutdown of the actor's event loop.
Shutdown,
}
Loading