//! Tokio-based event loop. //! //! A blocking task reads crossterm events and forwards them onto //! an `mpsc` channel as `AppEvent`s. The main loop awaits events, //! feeds them to `App::update`, enacts any returned `Action`s, //! and redraws the terminal. DSL execution is dispatched onto //! the database worker (see `db::Database`), and its result is //! posted back as a new `AppEvent`. Future async work (snapshot //! capture, auto-save) joins the same event channel as //! additional producers. use std::io; use std::time::Duration; use anyhow::{Context, Result}; use crossterm::event::{Event as CtEvent, EventStream}; use crossterm::execute; use crossterm::terminal::{ EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode, }; use futures_util::StreamExt; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use crate::action::Action; use crate::app::App; use crate::cli::Args; use crate::db::{ DataResult, Database, DbError, DeleteResult, InsertResult, TableDescription, UpdateResult, }; use crate::dsl::Command; use crate::event::AppEvent; use crate::project::open_or_create; use crate::theme::Theme; use crate::ui; const EVENT_CHANNEL_CAPACITY: usize = 64; const SHUTDOWN_GRACE: Duration = Duration::from_millis(100); /// Run the application until a `Quit` action is enacted or the /// terminal closes. pub async fn run(args: Args) -> Result<()> { let project = open_or_create(args.project_path.as_deref(), args.data_dir.as_deref()) .context("open or create project")?; let db_path = project.db_path(); let display_name = project.display_name().to_string(); let project_path = project.path().to_path_buf(); let persistence = crate::persistence::Persistence::new(project_path.clone()); // Capture whether the .db file existed BEFORE we open it — // sqlite creates it on connect, so this is the only honest // signal that we need to rebuild from text (ADR-0015 §7). let db_existed = db_path.exists(); let database = Database::open_with_persistence(db_path.as_path(), persistence) .context("open database")?; if !db_existed && let Err(e) = database .rebuild_from_text(project_path.clone(), None) .await { // The terminal is still in cooked mode here (we haven't // entered the alternate screen yet), so writing to // stderr lands directly in the user's shell. Drop the // project to release the lock first. drop(project); if matches!( e, DbError::PersistenceFatal { .. } | DbError::RebuildRowFailed { .. } ) { eprintln!("rdbms-playground: {}", e.friendly_message()); return Ok(()); } return Err(anyhow::anyhow!(e.friendly_message())).context("rebuild from text"); } let mut terminal = setup_terminal().context("setup terminal")?; let result = run_loop( &mut terminal, args.theme, database, display_name, project_path, ) .await; if let Err(e) = teardown_terminal(&mut terminal) { // Teardown failures should not mask the primary error. warn!(error = %e, "terminal teardown failed"); } // `project` (and the lock it holds) is dropped here, releasing // the lock file *after* the terminal has been restored. drop(project); // ADR-0015 §8: a fatal persistence failure makes its // banner visible above the shell prompt by writing to // stderr after the alternate screen has been left. if let Ok(Some(banner)) = &result { eprintln!("{banner}"); } result.map(|_| ()) } async fn run_loop( terminal: &mut Terminal>, theme: Theme, database: Database, project_display_name: String, project_path: std::path::PathBuf, ) -> Result> { let (event_tx, mut event_rx) = mpsc::channel::(EVENT_CHANNEL_CAPACITY); let reader_handle = spawn_event_reader(event_tx.clone()); let mut app = App::new(); app.project_name = Some(project_display_name); // Seed the table list with whatever the database currently // shows. For a fresh in-memory DB this is empty, but doing // it explicitly means file-backed databases (track 2) will // show their tables on launch without changes here. seed_initial_tables(&database, &event_tx).await; terminal .draw(|f| ui::render(&mut app, &theme, f)) .context("initial draw")?; info!("entering main event loop"); while let Some(event) = event_rx.recv().await { let actions = app.update(event); let mut should_quit = false; for action in actions { match action { Action::Quit => { debug!("quit action received"); should_quit = true; } Action::ExecuteDsl { command, source } => { spawn_dsl_dispatch(database.clone(), event_tx.clone(), command, source); } Action::PrepareRebuild => { spawn_prepare_rebuild(project_path.clone(), event_tx.clone()); } Action::Rebuild { source } => { spawn_rebuild( database.clone(), project_path.clone(), event_tx.clone(), source, ); } } } terminal .draw(|f| ui::render(&mut app, &theme, f)) .context("redraw")?; if should_quit { break; } } let _ = tokio::time::timeout(SHUTDOWN_GRACE, reader_handle).await; info!("event loop exited"); Ok(app.fatal_message.clone()) } async fn seed_initial_tables(database: &Database, event_tx: &mpsc::Sender) { match database.list_tables().await { Ok(tables) => { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } Err(e) => { error!(error = %e, "failed to seed initial table list"); } } } /// Read `project.yaml` + `data/` to compute the rebuild /// summary that the confirmation modal shows. Runs off the /// event loop so the brief I/O doesn't stall input handling /// even on slow filesystems. fn spawn_prepare_rebuild( project_path: std::path::PathBuf, event_tx: mpsc::Sender, ) { tokio::spawn(async move { let summary = match summarize_project(&project_path) { Ok(s) => s, Err(e) => format!("(could not read project sources: {e})"), }; let _ = event_tx.send(AppEvent::RebuildPrepared { summary }).await; }); } fn summarize_project(project_path: &std::path::Path) -> Result { let yaml_path = project_path.join(crate::project::PROJECT_YAML); let yaml = std::fs::read_to_string(&yaml_path).map_err(|e| e.to_string())?; let snapshot = crate::persistence::parse_schema(&yaml).map_err(|e| e.to_string())?; let table_count = snapshot.tables.len(); let data_dir = project_path.join(crate::project::DATA_DIR); let mut row_count: usize = 0; for table in &snapshot.tables { let csv_path = data_dir.join(format!("{}.csv", table.name)); let Ok(body) = std::fs::read_to_string(&csv_path) else { continue; }; // Header line + one line per row (per Iteration 2's // "no CSV when empty" rule, this is exact). row_count += body.lines().count().saturating_sub(1); } Ok(format!( "{table_count} table{} and {row_count} row{} will be reconstructed; \ the existing playground.db will be replaced", if table_count == 1 { "" } else { "s" }, if row_count == 1 { "" } else { "s" }, )) } /// Spawn the actual rebuild and forward the typed outcome /// back as an `AppEvent`. fn spawn_rebuild( database: Database, project_path: std::path::PathBuf, event_tx: mpsc::Sender, source: String, ) { tokio::spawn(async move { match database .rebuild_from_text(project_path.clone(), Some(source)) .await { Ok(()) => { let summary = summarize_project(&project_path) .unwrap_or_else(|_| "rebuild complete".to_string()); let _ = event_tx .send(AppEvent::RebuildSucceeded { summary }) .await; // Refresh the table list so the items panel // reflects whatever the rebuild produced. if let Ok(tables) = database.list_tables().await { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } } Err(DbError::PersistenceFatal { operation, path, message, }) => { let _ = event_tx .send(AppEvent::PersistenceFatal { operation: operation.to_string(), path, message, }) .await; } Err(other) => { let _ = event_tx .send(AppEvent::RebuildFailed { error: other.friendly_message(), }) .await; } } }); } /// Spawn a task that runs a DSL command against the database /// and forwards the result back as an `AppEvent`. fn spawn_dsl_dispatch( database: Database, event_tx: mpsc::Sender, command: Command, source: String, ) { tokio::spawn(async move { let outcome = execute_command_typed(&database, command.clone(), source).await; let event = match outcome { Ok(CommandOutcome::Schema(description)) => AppEvent::DslSucceeded { command: command.clone(), description, }, Ok(CommandOutcome::Query(data)) => AppEvent::DslDataSucceeded { command: command.clone(), data, }, Ok(CommandOutcome::Insert(result)) => AppEvent::DslInsertSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::Update(result)) => AppEvent::DslUpdateSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::Delete(result)) => AppEvent::DslDeleteSucceeded { command: command.clone(), result, }, Err(DbError::PersistenceFatal { operation, path, message, }) => AppEvent::PersistenceFatal { operation: operation.to_string(), path, message, }, Err(error) => AppEvent::DslFailed { command: command.clone(), error: error.friendly_message(), }, }; if event_tx.send(event).await.is_err() { return; } // Refresh the table list after every DDL operation so // the items panel reflects reality. A failed list_tables // here is logged but not surfaced to the user — they // already saw the primary outcome. match database.list_tables().await { Ok(tables) => { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } Err(e) => warn!(error = %e, "post-list_tables failed"), } }); } enum CommandOutcome { Schema(Option), Query(DataResult), Insert(InsertResult), Update(UpdateResult), Delete(DeleteResult), } /// Execute a parsed user command and return either a typed /// `CommandOutcome` or the raw `DbError`. Keeping the typed /// error here lets us distinguish persistence-fatal failures /// from ordinary user errors at dispatch time (ADR-0015 §8). async fn execute_command_typed( database: &Database, command: Command, source: String, ) -> Result { let src = Some(source); match command { Command::CreateTable { name, columns, primary_key, } => database .create_table(name, columns, primary_key, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::DropTable { name } => database .drop_table(name, src) .await .map(|()| CommandOutcome::Schema(None)), Command::AddColumn { table, column, ty } => database .add_column(table, column, ty, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::AddRelationship { name, parent_table, parent_column, child_table, child_column, on_delete, on_update, create_fk, } => database .add_relationship( name, parent_table, parent_column, child_table, child_column, on_delete, on_update, create_fk, src, ) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::DropRelationship { selector } => database .drop_relationship(selector, src) .await .map(CommandOutcome::Schema), Command::ShowTable { name } => database .describe_table(name, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::Insert { table, columns, values, } => database .insert(table, columns, values, src) .await .map(CommandOutcome::Insert), Command::Update { table, assignments, filter, } => database .update(table, assignments, filter, src) .await .map(CommandOutcome::Update), Command::Delete { table, filter } => database .delete(table, filter, src) .await .map(CommandOutcome::Delete), Command::ShowData { name } => database .query_data(name, src) .await .map(CommandOutcome::Query), } } fn spawn_event_reader(tx: mpsc::Sender) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut stream = EventStream::new(); while let Some(maybe_event) = stream.next().await { match maybe_event { Ok(CtEvent::Key(key)) => { if tx.send(AppEvent::Key(key)).await.is_err() { break; } } Ok(CtEvent::Resize(cols, rows)) => { if tx.send(AppEvent::Resize { cols, rows }).await.is_err() { break; } } Ok(_) => { // Ignore other event kinds (paste, focus, mouse) for now. } Err(e) => { error!(error = %e, "crossterm event stream error"); break; } } } debug!("event reader exiting"); }) } fn setup_terminal() -> Result>> { enable_raw_mode().context("enable raw mode")?; let mut stdout = io::stdout(); // Mouse capture is intentionally NOT enabled: it would prevent the // host terminal's native text selection (the cost of capturing every // mouse event), which we don't currently use for anything in-app. // If we ever want click-to-select panes or scroll wheel handling, // we'll need a different strategy than blanket capture. execute!(stdout, EnterAlternateScreen).context("enter alternate screen")?; let backend = CrosstermBackend::new(stdout); let terminal = Terminal::new(backend).context("construct terminal")?; Ok(terminal) } fn teardown_terminal( terminal: &mut Terminal>, ) -> Result<()> { disable_raw_mode().context("disable raw mode")?; execute!(terminal.backend_mut(), LeaveAlternateScreen) .context("leave alternate screen")?; terminal.show_cursor().context("show cursor")?; Ok(()) }