//! Tokio-based event loop. //! //! A blocking task reads crossterm events and forwards them onto //! an `mpsc` channel as `AppEvent`s. The main loop awaits events, //! feeds them to `App::update`, enacts any returned `Action`s, //! and redraws the terminal. DSL execution is dispatched onto //! the database worker (see `db::Database`), and its result is //! posted back as a new `AppEvent`. Future async work (snapshot //! capture, auto-save) joins the same event channel as //! additional producers. use std::io; use std::path::PathBuf; use std::time::Duration; use anyhow::{Context, Result}; use crossterm::event::{Event as CtEvent, EventStream}; use crossterm::execute; use crossterm::terminal::{ EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode, }; use futures_util::StreamExt; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use tokio::sync::mpsc; use tracing::{debug, error, info, warn}; use crate::action::Action; use crate::app::App; use crate::cli::Args; use crate::db::{ AddColumnResult, ChangeColumnTypeResult, DataResult, Database, DbError, DeleteResult, DropColumnResult, InsertResult, QueryPlan, TableDescription, UpdateResult, }; use crate::dsl::{Command, ColumnSpec}; use crate::dsl::walker::Severity; use crate::event::AppEvent; use crate::project::{ Project, ProjectKind, copy_project, list_projects, open_or_create, projects_dir, read_last_project, resolve_data_root, safely_delete_temp_project, write_last_project, }; use crate::theme::Theme; use crate::ui; const EVENT_CHANNEL_CAPACITY: usize = 64; const SHUTDOWN_GRACE: Duration = Duration::from_millis(100); /// Quiet interval before the input-validity indicator /// reappears once typing stops (ADR-0027 §3). const INDICATOR_DEBOUNCE: Duration = Duration::from_millis(1000); /// The input-validity indicator's debounce state machine /// (ADR-0027 §3, step E). /// /// A keystroke hides the indicator and arms the debounce; the /// event loop time-boxes `recv` while `is_armed`, so once /// typing has paused for `INDICATOR_DEBOUNCE` it calls `settle` /// with the freshly computed verdict. The `tokio` timer and the /// terminal live in the loop — this owns only the decision /// logic, keeping the debounce contract unit-testable without /// an event loop. `App::input_indicator` mirrors `visible` for /// the renderer. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] struct IndicatorDebounce { /// What the indicator currently shows; `None` is hidden. visible: Option, /// True while a recompute is owed — only then does the /// event loop time-box `recv`. armed: bool, } impl IndicatorDebounce { /// The indicator value the renderer should mirror. const fn visible(self) -> Option { self.visible } /// Whether the event loop should time-box `recv` with the /// debounce window. const fn is_armed(self) -> bool { self.armed } /// Record a processed event. A keystroke hides the /// indicator and (re)arms the debounce; any other event /// leaves the indicator and the armed state untouched (a /// background result arriving mid-typing must not cancel /// the pending recompute). const fn note_event(&mut self, is_key: bool) { if is_key { self.visible = None; self.armed = true; } } /// The debounce window elapsed — typing has paused. Show /// `verdict` and disarm. const fn settle(&mut self, verdict: Option) { self.visible = verdict; self.armed = false; } } /// Run the application until a `Quit` action is enacted or the /// terminal closes. pub async fn run(args: Args) -> Result<()> { // Resolve data root explicitly so run_loop can refer back // to it for `new` (creates a temp) and `load` (lists // projects). We can't easily recover this from the // Project alone, so we keep it ourselves. let data_root = resolve_data_root(args.data_dir.as_deref()) .context("resolve data root")?; // Resolve the initial project path: --resume reads it from // /last_project; otherwise an explicit positional // arg, falling back to a fresh auto-named temp. // // ADR-0015 §7: --resume errors out cleanly when the path is // missing or the recorded project no longer exists. We // surface those failures to stderr before booting the // terminal so the message lands directly in the user's // shell. let initial_path: Option = if args.resume { match read_last_project(&data_root) .context("read last_project")? { Some(p) if p.exists() => Some(p), Some(p) => { eprintln!( "rdbms-playground: {}", crate::t!( "project.resume_recorded_missing", path = p.display(), ), ); return Ok(()); } None => { eprintln!( "rdbms-playground: {}", crate::t!( "project.resume_no_previous", data_root = data_root.display(), ), ); return Ok(()); } } } else { args.project_path.clone() }; let project = open_or_create(initial_path.as_deref(), Some(data_root.as_path())) .context("open or create project")?; // Run any pending project.yaml migrations before the // database opens (so the rebuild path only ever sees the // latest schema). The registry is empty in v1; future // versions register their migrators here. A migration // that runs is recorded in tracing and leaves a // `project.yaml.v.bak` breadcrumb on disk; that's // sufficient v1 UX and lets us defer dedicated event // plumbing until a real migrator demands it. let migrate_registry = crate::persistence::migrations::MigratorRegistry::production(); let migration_outcome = crate::persistence::migrations::ensure_project_yaml_migrated( project.path(), &migrate_registry, ) .context("migrate project.yaml")?; if let Some(from) = migration_outcome.migrated_from { info!( from_version = from, to_version = migrate_registry.latest_version(), "migrated project.yaml", ); } // Record the just-opened project as the `--resume` target — // but only if it is worth resuming. A freshly-created empty // temp project is NOT recorded: it is auto-deleted on quit // while still empty (ADR-0015), and a `last_project` // pointing at a since-deleted directory makes the next // `--resume` fail confusingly. A named project, or a temp // that already carries content, is recorded. A temp that // *gains* content this session is recorded on quit instead. // Write failures are non-fatal. if !project.is_unmodified_temp() && let Err(e) = write_last_project(&data_root, project.path()) { warn!(error = %e, "could not update last_project"); } let db_path = project.db_path(); let display_name = project.display_name().to_string(); let project_path = project.path().to_path_buf(); let project_is_temp = matches!(project.kind(), ProjectKind::Temp); let persistence = crate::persistence::Persistence::new(project_path.clone()); // Capture whether the .db file existed BEFORE we open it — // sqlite creates it on connect, so this is the only honest // signal that we need to rebuild from text (ADR-0015 §7). let db_existed = db_path.exists(); // Undo is on unless `--no-undo` (ADR-0006 Amendment 1). let undo_enabled = !args.no_undo; let database = Database::open_with_persistence_and_undo(db_path.as_path(), persistence, undo_enabled) .context("open database")?; let mut initial_events: Vec = Vec::new(); if !db_existed { match database.rebuild_from_text(project_path.clone(), None).await { Ok(()) => { // Surface the silent rebuild as a system note // *only* when there was something material to // reconstruct. A fresh-launch temp project // hits this branch with zero tables and zero // rows — the rebuild ran, but nothing // reconstructable existed, so there's nothing // worth telling the user. The explicit // `rebuild` command still always reports its // summary (see spawn_rebuild) since the user // asked. if project_has_content(&project_path) { let summary = summarize_project(&project_path).unwrap_or_else(|_| { "rebuilt playground.db from project.yaml + data/".to_string() }); initial_events.push(AppEvent::RebuildSucceeded { summary }); } } Err(e) => { // The terminal is still in cooked mode here // (we haven't entered the alternate screen // yet), so writing to stderr lands directly // in the user's shell. Drop the project to // release the lock first. drop(project); if matches!( e, DbError::PersistenceFatal { .. } | DbError::RebuildRowFailed { .. } ) { eprintln!("rdbms-playground: {}", e.friendly_message()); return Ok(()); } return Err(anyhow::anyhow!(e.friendly_message())).context("rebuild from text"); } } } let mut terminal = setup_terminal().context("setup terminal")?; let result = run_loop( &mut terminal, args.theme, Session { project: Some(project), database: Some(database), data_root, }, display_name, project_is_temp, initial_events, undo_enabled, ) .await; if let Err(e) = teardown_terminal(&mut terminal) { // Teardown failures should not mask the primary error. warn!(error = %e, "terminal teardown failed"); } // ADR-0015 §8: a fatal persistence failure makes its // banner visible above the shell prompt by writing to // stderr after the alternate screen has been left. if let Ok(Some(banner)) = &result { eprintln!("{banner}"); } result.map(|_| ()) } /// Mutable state owned by `run_loop` that survives project /// switches: the live `Project` (with its lock), the live /// `Database` (with its worker), and the active data root /// for resolving relative paths in `save as` / `new` / load /// picker listings. /// /// `project` and `database` are wrapped in `Option` so a /// project switch can `take()` the old (dropping its lock /// and worker) before opening the new — required for the /// "switch to my own current project" case where the new /// open would otherwise see a self-held lock. struct Session { project: Option, database: Option, data_root: std::path::PathBuf, } impl Session { const fn project(&self) -> &Project { match self.project.as_ref() { Some(p) => p, None => panic!("project always set during run_loop"), } } const fn database(&self) -> &Database { match self.database.as_ref() { Some(d) => d, None => panic!("database always set during run_loop"), } } } async fn run_loop( terminal: &mut Terminal>, theme: Theme, mut session: Session, project_display_name: String, project_is_temp: bool, initial_events: Vec, undo_enabled: bool, ) -> Result> { let (event_tx, mut event_rx) = mpsc::channel::(EVENT_CHANNEL_CAPACITY); let reader_handle = spawn_event_reader(event_tx.clone()); let mut app = App::new(); app.project_name = Some(project_display_name); app.project_is_temp = project_is_temp; app.undo_enabled = undo_enabled; // Seed the in-memory navigable history from the // initial project's history.log (I2-persist, ADR-0015 // §12). Subsequent project switches re-seed via the // `ProjectSwitched` event payload. app.seed_history(read_history_seed(session.project().path())); // Send any startup events (e.g., the system-message form // of "rebuilt from text on missing .db") so they're // dispatched through the normal event path and end up in // the output panel before the user types anything. for event in initial_events { let _ = event_tx.send(event).await; } // Seed the table list with whatever the database currently // shows. For a fresh in-memory DB this is empty, but doing // it explicitly means file-backed databases (track 2) will // show their tables on launch without changes here. seed_initial_tables(session.database(), &event_tx).await; terminal .draw(|f| ui::render(&mut app, &theme, f)) .context("initial draw")?; info!("entering main event loop"); // ADR-0027 §3: the validity indicator is debounced — hidden // on every keystroke, recomputed and shown once typing has // paused for `INDICATOR_DEBOUNCE`. `recv` is time-boxed only // while the debounce is armed, so an idle session still does // no wake-ups. See `IndicatorDebounce` for the decision // logic; `app.input_indicator` mirrors it for the renderer. let mut debounce = IndicatorDebounce::default(); loop { let event = if debounce.is_armed() { match tokio::time::timeout(INDICATOR_DEBOUNCE, event_rx.recv()).await { Ok(Some(event)) => event, Ok(None) => break, Err(_elapsed) => { // Typing has been quiet for the debounce // interval — settle the indicator. debounce.settle(app.input_validity_verdict()); app.input_indicator = debounce.visible(); terminal .draw(|f| ui::render(&mut app, &theme, f)) .context("redraw")?; continue; } } } else { match event_rx.recv().await { Some(event) => event, None => break, } }; let is_key = matches!(event, AppEvent::Key(_)); let actions = app.update(event); let mut should_quit = false; for action in actions { match action { Action::Quit => { debug!("quit action received"); should_quit = true; } Action::ExecuteDsl { command, source } => { spawn_dsl_dispatch( session.database().clone(), event_tx.clone(), command, source, ); } Action::JournalFailure { source } => { // ADR-0034 §1/§4: record a failed command as an // `err` record. Best-effort — a failure to record // a failure must never escalate a user error into // a fatal, so the result is logged and ignored. if let Err(e) = crate::persistence::Persistence::new( session.project().path().to_path_buf(), ) .append_history_failure(&source) { tracing::warn!(error = %e, "failed to journal err record (ignored)"); } } Action::PrepareRebuild => { spawn_prepare_rebuild( session.project().path().to_path_buf(), event_tx.clone(), ); } Action::Rebuild { source } => { spawn_rebuild( session.database().clone(), session.project().path().to_path_buf(), event_tx.clone(), source, ); } Action::OpenLoadPicker => { let entries: Vec = list_projects(&session.data_root) .into_iter() .map(|p| crate::app::LoadPickerEntry { display_name: p.display_name, modified: p.modified, path: p.path, is_temp: matches!(p.kind, ProjectKind::Temp), }) .collect(); let _ = event_tx.send(AppEvent::LoadPickerReady { entries }).await; } Action::LoadProject { path, source } => { handle_project_switch( &mut session, SwitchRequest::Load { path }, source, &event_tx, undo_enabled, ) .await; } Action::SaveAs { target, source } => { handle_project_switch( &mut session, SwitchRequest::SaveAs { target }, source, &event_tx, undo_enabled, ) .await; } Action::NewProject { source } => { handle_project_switch( &mut session, SwitchRequest::NewTemp, source, &event_tx, undo_enabled, ) .await; } Action::Export { target, source } => { spawn_export( session.project().path().to_path_buf(), directory_basename(session.project().path()), session.data_root.clone(), target, source, event_tx.clone(), ); } Action::Import { zip_path, as_target, source, } => { handle_project_switch( &mut session, SwitchRequest::Import { zip_path: std::path::PathBuf::from(&zip_path), as_target, }, source, &event_tx, undo_enabled, ) .await; } Action::PrepareUndo => { spawn_prepare_undo(session.database().clone(), event_tx.clone(), false); } Action::PrepareRedo => { spawn_prepare_undo(session.database().clone(), event_tx.clone(), true); } Action::Undo => { spawn_undo(session.database().clone(), event_tx.clone(), false); } Action::Redo => { spawn_undo(session.database().clone(), event_tx.clone(), true); } Action::Replay { path } => { spawn_replay( session.database().clone(), session.project().path().to_path_buf(), path, event_tx.clone(), ); } } } // A keystroke hides the indicator and re-arms the // debounce (ADR-0027 §3) — it reappears once typing // pauses; non-key events leave it untouched. debounce.note_event(is_key); app.input_indicator = debounce.visible(); terminal .draw(|f| ui::render(&mut app, &theme, f)) .context("redraw")?; if should_quit { break; } } let _ = tokio::time::timeout(SHUTDOWN_GRACE, reader_handle).await; // Decide the active project's fate on quit, before dropping // it. An unmodified empty temp is auto-deleted — same rule // as on project switch (see perform_switch). Anything else — // a named project, or a temp that *gained* content this // session — is recorded as the `--resume` target: this is // the point at which a launch-temp the user filled with // content finally gets remembered (the `run()` startup // write skipped it while it was still empty). The two are // mutually exclusive (one needs an unmodified temp, the // other anything else). let project_at_quit = session.project.as_ref(); let cleanup_on_quit: Option = project_at_quit .and_then(|p| p.is_unmodified_temp().then(|| p.path().to_path_buf())); let resume_target_on_quit: Option = project_at_quit .filter(|p| !p.is_unmodified_temp()) .map(|p| p.path().to_path_buf()); let _ = session.database.take(); let _ = session.project.take(); if let Some(target) = resume_target_on_quit && let Err(e) = write_last_project(&session.data_root, &target) { tracing::warn!(error = %e, "could not update last_project on quit"); } if let Some(stale) = cleanup_on_quit { match safely_delete_temp_project(&stale, &session.data_root) { Ok(()) => tracing::info!( path = %stale.display(), "cleaned up unmodified temp project on quit", ), Err(e) => tracing::warn!( path = %stale.display(), error = %e, "did not clean up unmodified temp project on quit", ), } } info!("event loop exited"); Ok(app.fatal_message.clone()) } /// What kind of project switch the user requested. enum SwitchRequest { /// `load` (picker or browse-to-path) — open an existing /// project at `path`. Load { path: std::path::PathBuf }, /// `save as` — copy the current project to a new /// location, then open that copy. `target` is a name /// (resolved under `/projects/`) or an /// absolute path. SaveAs { target: String }, /// `new` — close current, create a fresh auto-named temp. NewTemp, /// `import` — extract a zip into a new project under the /// data root's projects dir, then switch to it. The /// destination basename is taken from the zip's /// top-level folder by default (`as_target` is `None`), /// or from the user-supplied override; collisions /// auto-suffix `-NN` (ADR-0015 §11 amendment). Import { zip_path: std::path::PathBuf, as_target: Option, }, } /// Common project-switch path. Drops the current project + /// database (releasing the lock and stopping the worker), /// opens the new one, runs a rebuild if the .db is missing, /// appends history.log, and sends a `ProjectSwitched` event /// so App refreshes its display. /// /// Errors are surfaced as `ProjectSwitchFailed` (non-fatal): /// the current project remains active. async fn handle_project_switch( session: &mut Session, req: SwitchRequest, source: String, event_tx: &mpsc::Sender, undo_enabled: bool, ) { match perform_switch(session, req, source, undo_enabled).await { Ok((display_name, is_temp)) => { let history_entries = read_history_seed(session.project().path()); let _ = event_tx .send(AppEvent::ProjectSwitched { display_name, is_temp, history_entries, }) .await; if let Ok(tables) = session.database().list_tables().await { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } refresh_schema_cache(session.database(), event_tx).await; } Err(e) => { let _ = event_tx .send(AppEvent::ProjectSwitchFailed { error: e }) .await; } } } /// Read the most-recent `HISTORY_HYDRATION_CAP` source lines /// out of the project's `history.log` for input-history /// seeding. Failures are logged and swallowed — an empty /// hydration is the right fallback when the file is unreadable. fn read_history_seed(project_path: &std::path::Path) -> Vec { let p = crate::persistence::Persistence::new(project_path.to_path_buf()); match p.read_recent_history(HISTORY_HYDRATION_CAP) { Ok(entries) => entries, Err(e) => { tracing::warn!(error = %e, "history hydration failed; starting empty"); Vec::new() } } } /// Maximum number of `history.log` entries to seed the /// in-memory navigable history with on project open. Matches /// the in-memory cap (`app::HISTORY_CAPACITY`) per ADR-0015 /// §12: "latest N entries, where N is the same in-memory /// cap as today." const HISTORY_HYDRATION_CAP: usize = 1000; async fn perform_switch( session: &mut Session, req: SwitchRequest, source: String, undo_enabled: bool, ) -> Result<(String, bool), String> { use crate::persistence::Persistence; // For SaveAs we need a resolved target path up front // (so the existence check happens before we drop the // current project). For NewTemp we'll let create_temp // pick the path. For Load it's the user-supplied path. // For Import we inspect the zip and resolve the target // (auto-suffixing on collision per ADR-0015 §11 // amendment) before touching anything else. let resolved_target: Option = match &req { SwitchRequest::Load { path } => { if !path.exists() { return Err(crate::t!( "project.load_path_missing", path = path.display() )); } Some(path.clone()) } SwitchRequest::SaveAs { target } => { let p = resolve_save_target(target, &session.data_root); if p.exists() { return Err(crate::t!( "project.saveas_target_exists", path = p.display() )); } Some(p) } SwitchRequest::NewTemp => None, SwitchRequest::Import { zip_path, as_target } => { if !zip_path.exists() { return Err(crate::t!( "project.import_zip_missing", path = zip_path.display() )); } // Validate the zip up front so we don't drop the // current project for an unimportable file. let inspection = crate::archive::inspect_zip(zip_path) .map_err(|e| e.to_string())?; let resolved = resolve_import_destination( as_target.as_deref(), &inspection.top_folder, &session.data_root, )?; Some(resolved) } }; // For SaveAs: copy current project to the target while // the source is still on disk (auto-save guarantees its // state matches the in-memory db). if let SwitchRequest::SaveAs { .. } = &req { let src = session.project().path().to_path_buf(); let dst = resolved_target.as_ref().expect("SaveAs has resolved target"); copy_project(&src, dst).map_err(|e| e.to_string())?; } // For Import: extract the zip into the resolved target. // We do this *before* dropping the current project so // a failure here leaves the user where they were. if let SwitchRequest::Import { zip_path, .. } = &req { let dst = resolved_target.as_ref().expect("Import has resolved target"); let inspection = crate::archive::inspect_zip(zip_path) .map_err(|e| e.to_string())?; crate::archive::extract_into(zip_path, dst, &inspection.top_folder) .map_err(|e| e.to_string())?; } // Capture cleanup info from the OUTGOING project before // we drop it: if it was an unmodified empty temp, we // delete its directory after the switch so the data dir // doesn't accumulate empty scratch projects. let outgoing_cleanup_path: Option = session.project.as_ref().and_then(|p| { p.is_unmodified_temp().then(|| p.path().to_path_buf()) }); // Drop current project + database BEFORE opening the new // ones, releasing the old lock and stopping the old // worker. Required for the "load my own current project" // case (otherwise the new open would see a self-held // lock on this PID). let _ = session.database.take(); let _ = session.project.take(); // The outgoing project's lock is now released; it's // safe to remove its directory if it was unmodified. // The safely_delete_temp_project helper layers multiple // guards (containment under data root, [temp] marker, // contents allowlist, no symlinks) so a bug elsewhere // can't escalate into deleting the wrong directory. if let Some(stale) = outgoing_cleanup_path { match safely_delete_temp_project(&stale, &session.data_root) { Ok(()) => tracing::info!( path = %stale.display(), "cleaned up unmodified temp project on switch", ), Err(e) => tracing::warn!( path = %stale.display(), error = %e, "did not clean up unmodified temp project on switch", ), } } // Open the destination project. Load / SaveAs / Import // all open a path that already has the on-disk skeleton // (either because it pre-existed or because we just put // it there); NewTemp asks the project module for a fresh // auto-named one. let new_project = match &req { SwitchRequest::Load { .. } | SwitchRequest::SaveAs { .. } | SwitchRequest::Import { .. } => { let path = resolved_target.expect("Load/SaveAs/Import have resolved target"); Project::open(&path).map_err(|e| e.to_string())? } SwitchRequest::NewTemp => { Project::create_temp(&session.data_root).map_err(|e| e.to_string())? } }; let new_path = new_project.path().to_path_buf(); // Run any pending project.yaml migrations before the // database opens. Same registry as `run()`. A failed // migration aborts the switch (the old project has // already been dropped — user lands in a "no project" // state momentarily, but the next user action will // surface the error and they can retry). let migrate_registry = crate::persistence::migrations::MigratorRegistry::production(); crate::persistence::migrations::ensure_project_yaml_migrated( new_project.path(), &migrate_registry, ) .map_err(|e| e.to_string())?; // Open the new database (rebuild from text if .db is // missing — applies to NewTemp's just-created project, // and to Load when the user opened a project whose .db // had been deleted). let db_path = new_project.db_path(); let db_existed = db_path.exists(); let persistence = Persistence::new(new_path.clone()); let new_database = Database::open_with_persistence_and_undo(&db_path, persistence, undo_enabled) .map_err(|e| e.to_string())?; if !db_existed && let Err(e) = new_database.rebuild_from_text(new_path.clone(), None).await { return Err(e.friendly_message()); } let display_name = new_project.display_name().to_string(); let is_temp = matches!(new_project.kind(), ProjectKind::Temp); // Worth recording as the resume target? A switch to a fresh // empty temp (`new`) is not — see the gate in `run()`. let new_worth_recording = !new_project.is_unmodified_temp(); session.project = Some(new_project); session.database = Some(new_database); // Append the user-issued command to the destination's // history.log. The worker's persistence is wired but not // directly addressable from here, so we use a fresh // Persistence handle for this single line. let _ = Persistence::new(new_path.clone()).append_history(&source); // Update the resume pointer so the next `--resume` launch // reopens the project we just switched to — unless it is a // fresh empty temp (a `new` command), which must not be // recorded (see the gate in `run()`). Write failures are // non-fatal. if new_worth_recording && let Err(e) = write_last_project(&session.data_root, &new_path) { tracing::warn!(error = %e, "could not update last_project after switch"); } Ok((display_name, is_temp)) } /// Resolve the destination directory for an `import`: /// /// - **No `as `:** use the zip's top-level folder name /// under `/projects/`. Auto-suffix `-NN` on /// collision (ADR-0015 §11 amendment). /// - **`as `:** under `/projects/`, /// auto-suffix on collision. /// - **`as `:** use the path verbatim. Refuse /// if it already exists (no auto-suffix on absolute paths — /// we don't second-guess what the user typed). fn resolve_import_destination( as_target: Option<&str>, zip_top_folder: &str, data_root: &std::path::Path, ) -> Result { if let Some(t) = as_target { let p = std::path::Path::new(t); if p.is_absolute() { if p.exists() { return Err(format!( "`{}` already exists; pick a different target", p.display(), )); } return Ok(p.to_path_buf()); } } let basename: &str = as_target.unwrap_or(zip_top_folder); let parent = projects_dir(data_root); std::fs::create_dir_all(&parent).map_err(|e| e.to_string())?; let (resolved, _) = crate::archive::resolve_import_target(&parent, basename).map_err(|e| e.to_string())?; Ok(resolved) } /// Spawn a blocking task to write an export zip and forward /// the outcome via the event channel. /// /// The current project's auto-save semantics mean /// `` already reflects every successful command, /// so the export reads from disk without coordinating with the /// db worker. The `history.log` entry for this command is /// appended directly here (we already hold the project path /// and don't need to wait for the export to finish before /// recording the user-issued command). fn spawn_export( project_path: std::path::PathBuf, project_name: String, data_root: std::path::PathBuf, target: Option, source: String, event_tx: mpsc::Sender, ) { let _ = crate::persistence::Persistence::new(project_path.clone()).append_history(&source); tokio::spawn(async move { let outcome = tokio::task::spawn_blocking(move || { do_export(&project_path, &project_name, &data_root, target.as_deref()) }) .await; let event = match outcome { Ok(Ok(path)) => AppEvent::ExportSucceeded { path }, Ok(Err(e)) => AppEvent::ExportFailed { error: e }, Err(join_err) => AppEvent::ExportFailed { error: join_err.to_string(), }, }; let _ = event_tx.send(event).await; }); } /// Synchronous body of the export pipeline. fn do_export( project_path: &std::path::Path, project_name: &str, data_root: &std::path::Path, target: Option<&str>, ) -> Result { let final_path: std::path::PathBuf = match target { Some(t) => { let p = std::path::Path::new(t); if p.is_absolute() { p.to_path_buf() } else { data_root.join(t) } } None => { std::fs::create_dir_all(data_root).map_err(|e| e.to_string())?; let (filename, _) = crate::archive::next_export_sequence(data_root, project_name) .map_err(|e| e.to_string())?; data_root.join(filename) } }; if final_path.exists() { return Err(format!( "`{}` already exists; pick a different name or remove it first", final_path.display(), )); } crate::archive::export_project(project_path, project_name, &final_path) .map_err(|e| e.to_string())?; Ok(final_path) } /// The basename of `path` as a `String`. Falls back to the /// full display string when the path has no terminal /// component (e.g. `/`). fn directory_basename(path: &std::path::Path) -> String { path.file_name() .map(|s| s.to_string_lossy().into_owned()) .unwrap_or_else(|| path.display().to_string()) } /// Resolve a `save as` target path against the data root. /// /// Absolute paths pass through; relative paths join under /// `/projects/` per the user's stated preference /// in ADR-0015 §1 ("named projects right alongside the temp /// ones is the easiest workflow"). fn resolve_save_target(target: &str, data_root: &std::path::Path) -> std::path::PathBuf { let p = std::path::Path::new(target); if p.is_absolute() { p.to_path_buf() } else { projects_dir(data_root).join(p) } } async fn seed_initial_tables(database: &Database, event_tx: &mpsc::Sender) { match database.list_tables().await { Ok(tables) => { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } Err(e) => { error!(error = %e, "failed to seed initial table list"); } } refresh_schema_cache(database, event_tx).await; } /// Fetch the three identifier lists (tables / columns / /// relationships) and post them as `SchemaCacheRefreshed` /// (ADR-0022 §9 + stage 8d). Always sends an event, even on /// partial failure — best-effort completion is better than /// no completion. Called wherever `TablesRefreshed` is sent /// today; the schema cache lives on the App and feeds Tab /// completion for identifier slots. async fn refresh_schema_cache( database: &Database, event_tx: &mpsc::Sender, ) { let cache = build_schema_cache(database).await; let _ = event_tx.send(AppEvent::SchemaCacheRefreshed(cache)).await; } /// Build a `SchemaCache` snapshot from the live database. /// /// Shared by `refresh_schema_cache` (interactive path — wraps /// the result in a `SchemaCacheRefreshed` event) and the replay /// path (which re-snapshots per line because the schema mutates /// as replayed `create table` / `add column` commands run). /// Best-effort: a failed query leaves that list empty and the /// walker falls back to schemaless behaviour. async fn build_schema_cache(database: &Database) -> crate::completion::SchemaCache { use crate::completion::{SchemaCache, TableColumn}; use crate::dsl::grammar::IdentSource; let mut cache = SchemaCache::default(); if let Ok(tables) = database.list_names_for(IdentSource::Tables).await { cache.tables = tables; } if let Ok(columns) = database.list_names_for(IdentSource::Columns).await { cache.columns = columns; } if let Ok(rels) = database.list_names_for(IdentSource::Relationships).await { cache.relationships = rels; } if let Ok(indexes) = database.list_names_for(IdentSource::Indexes).await { cache.indexes = indexes; } // Phase D (ADR-0024 §Phase D): per-table column metadata // with user-facing types. The walker's // `DynamicSubgrammar(column_value_list)` reads this to // unfold typed value slots per column at `insert into T // values (...)` positions. Best-effort: a `describe_table` // miss leaves that table's columns unpopulated and the // walker falls back to the schemaless value-literal list. for name in cache.tables.clone() { if let Ok(desc) = database.describe_table(name.clone(), None).await { // Per-table index names for the items panel (S2, // ADR-0025). Captured before `desc.columns` is // consumed below. let index_names: Vec = desc.indexes.iter().map(|i| i.name.clone()).collect(); let cols: Vec = desc .columns .into_iter() .filter_map(|c| { c.user_type.map(|ty| TableColumn { name: c.name, user_type: ty, // Only a column the engine reports as NOT NULL // (ADR-0033 §8.3 "declared NOT NULL"). We do // NOT treat a PK as implicitly not-null: an // `int` PK is an `INTEGER PRIMARY KEY` rowid // alias that auto-fills, so flagging it omitted // would be a false positive (SQLite reports // notnull=0 for it anyway). serial/shortid are // excluded by type in the not_null_missing pass. not_null: c.notnull, has_default: c.default.is_some(), }) }) .collect(); cache.table_columns.insert(name.clone(), cols); cache.table_indexes.insert(name, index_names); } } cache } /// Read `project.yaml` + `data/` to compute the rebuild /// summary that the confirmation modal shows. Runs off the /// event loop so the brief I/O doesn't stall input handling /// even on slow filesystems. fn spawn_prepare_rebuild( project_path: std::path::PathBuf, event_tx: mpsc::Sender, ) { tokio::spawn(async move { let summary = match summarize_project(&project_path) { Ok(s) => s, Err(e) => format!("(could not read project sources: {e})"), }; let _ = event_tx.send(AppEvent::RebuildPrepared { summary }).await; }); } /// Does the project at `project_path` actually have any /// schema or data? /// /// "Has content" means at least one table is declared in /// `project.yaml` OR at least one CSV row exists under /// `data/`. A brand-new auto-named temp project, having /// neither, returns `false`. Errors reading the project /// (corrupt YAML, missing dir) also return `false` — /// suppressing a misleading "0 tables reconstructed" /// message for a project we can't read is the right default. fn project_has_content(project_path: &std::path::Path) -> bool { let yaml_path = project_path.join(crate::project::PROJECT_YAML); let Ok(yaml) = std::fs::read_to_string(&yaml_path) else { return false; }; let Ok(snapshot) = crate::persistence::parse_schema(&yaml) else { return false; }; !snapshot.tables.is_empty() } fn summarize_project(project_path: &std::path::Path) -> Result { let yaml_path = project_path.join(crate::project::PROJECT_YAML); let yaml = std::fs::read_to_string(&yaml_path).map_err(|e| e.to_string())?; let snapshot = crate::persistence::parse_schema(&yaml).map_err(|e| e.to_string())?; let table_count = snapshot.tables.len(); let data_dir = project_path.join(crate::project::DATA_DIR); let mut row_count: usize = 0; for table in &snapshot.tables { let csv_path = data_dir.join(format!("{}.csv", table.name)); let Ok(body) = std::fs::read_to_string(&csv_path) else { continue; }; // Header line + one line per row (per Iteration 2's // "no CSV when empty" rule, this is exact). row_count += body.lines().count().saturating_sub(1); } Ok(format!( "{table_count} table{} and {row_count} row{} will be reconstructed; \ the existing playground.db will be replaced", if table_count == 1 { "" } else { "s" }, if row_count == 1 { "" } else { "s" }, )) } /// Spawn the actual rebuild and forward the typed outcome /// back as an `AppEvent`. fn spawn_rebuild( database: Database, project_path: std::path::PathBuf, event_tx: mpsc::Sender, source: String, ) { tokio::spawn(async move { match database .rebuild_from_text(project_path.clone(), Some(source)) .await { Ok(()) => { let summary = summarize_project(&project_path) .unwrap_or_else(|_| "rebuild complete".to_string()); let _ = event_tx .send(AppEvent::RebuildSucceeded { summary }) .await; // Refresh the table list so the items panel // reflects whatever the rebuild produced. if let Ok(tables) = database.list_tables().await { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } refresh_schema_cache(&database, &event_tx).await; } Err(DbError::PersistenceFatal { operation, path, message, }) => { let _ = event_tx .send(AppEvent::PersistenceFatal { operation: operation.to_string(), path, message, }) .await; } Err(other) => { let _ = event_tx .send(AppEvent::RebuildFailed { error: other.friendly_message(), }) .await; } } }); } /// Peek the snapshot `undo`/`redo` would restore and post /// `UndoPrepared` (open the confirmation modal) or /// `UndoUnavailable` (ADR-0006 Amendment 1). fn spawn_prepare_undo(database: Database, event_tx: mpsc::Sender, is_redo: bool) { tokio::spawn(async move { let peek = if is_redo { database.peek_redo().await } else { database.peek_undo().await }; let event = match peek { Ok(Some(meta)) => AppEvent::UndoPrepared { command: meta.command, timestamp: meta.timestamp, is_redo, }, Ok(None) => AppEvent::UndoUnavailable { is_redo }, Err(e) => AppEvent::UndoFailed { error: e.friendly_message(), is_redo, }, }; let _ = event_tx.send(event).await; }); } /// Restore the snapshot `undo`/`redo` selects, then refresh the /// table list + schema cache so the TUI shows the restored state. fn spawn_undo(database: Database, event_tx: mpsc::Sender, is_redo: bool) { tokio::spawn(async move { let result = if is_redo { database.redo().await } else { database.undo().await }; match result { Ok(Some(meta)) => { let _ = event_tx .send(AppEvent::UndoSucceeded { command: meta.command, is_redo, }) .await; if let Ok(tables) = database.list_tables().await { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } refresh_schema_cache(&database, &event_tx).await; } Ok(None) => { let _ = event_tx.send(AppEvent::UndoUnavailable { is_redo }).await; } Err(e) => { let _ = event_tx .send(AppEvent::UndoFailed { error: e.friendly_message(), is_redo, }) .await; } } }); } /// Spawn a task that runs a DSL command against the database /// and forwards the result back as an `AppEvent`. fn spawn_dsl_dispatch( database: Database, event_tx: mpsc::Sender, command: Command, source: String, ) { tokio::spawn(async move { // Retain the source for `DslFailed` so the App can journal a // rejected command as `err` (ADR-0034 §1/§2). let source_for_journal = source.clone(); let outcome = execute_command_typed(&database, command.clone(), source).await; let event = match outcome { Ok(CommandOutcome::Schema(description)) => AppEvent::DslSucceeded { command: command.clone(), description, }, Ok(CommandOutcome::Query(data)) => AppEvent::DslDataSucceeded { command: command.clone(), data, }, Ok(CommandOutcome::QueryPlan(plan)) => AppEvent::DslExplainSucceeded { command: command.clone(), plan, }, Ok(CommandOutcome::Insert(result)) => AppEvent::DslInsertSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::Update(result)) => AppEvent::DslUpdateSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::Delete(result)) => AppEvent::DslDeleteSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::ChangeColumn(result)) => AppEvent::DslChangeColumnSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::AddColumn(result)) => AppEvent::DslAddColumnSucceeded { command: command.clone(), result, }, Ok(CommandOutcome::DropColumn(result)) => AppEvent::DslDropColumnSucceeded { command: command.clone(), result, }, Err(DbError::PersistenceFatal { operation, path, message, }) => AppEvent::PersistenceFatal { operation: operation.to_string(), path, message, }, Err(error) => { // Schema-resolved enrichment per ADR-0019 §6. // The runtime owns DB access; the App stays // presentation-only. let facts = enrich_dsl_failure(&database, &command, &error).await; AppEvent::DslFailed { command: command.clone(), error, facts, source: source_for_journal, } } }; if event_tx.send(event).await.is_err() { return; } // Refresh the table list after every DDL operation so // the items panel reflects reality. A failed list_tables // here is logged but not surfaced to the user — they // already saw the primary outcome. match database.list_tables().await { Ok(tables) => { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } Err(e) => warn!(error = %e, "post-list_tables failed"), } // Refresh the schema cache feeding Tab completion // (ADR-0022 §9). Same timing as TablesRefreshed. refresh_schema_cache(&database, &event_tx).await; }); } /// Build schema-resolved enrichment for a DSL failure (ADR-0019 §6). /// /// Best-effort: every lookup is independently fallible and a /// missing piece just leaves the corresponding /// `FailureContext` field `None`. The translator falls back to /// catalog `{name}` placeholders for unfilled fields. /// /// What we resolve, by classification: /// /// - **UNIQUE / NOT NULL violation** (engine reports `T.col`): /// - `table`, `column` from the engine message. /// - `value` from the originating Command (explicit columns /// or single-value short form, with schema lookup as a /// last resort for natural-order multi-value INSERT). /// - For UNIQUE only: `diagnostic_table` from a pinpoint /// `SELECT * FROM T WHERE col = value LIMIT N` showing /// the existing row that conflicts. /// - **FK INSERT/UPDATE** (child-side): outbound relationship /// lookup picks the FK column the user set; resolves /// `parent_table`, `parent_column`, and the attempted /// `value`. /// - **FK DELETE/UPDATE** (parent-side): inbound relationship /// lookup picks a `child_table` that references this row. /// - Anything else: `FailureContext::default()`. pub async fn enrich_dsl_failure( database: &Database, command: &Command, error: &DbError, ) -> crate::friendly::FailureContext { let DbError::Sqlite { message, .. } = error else { return crate::friendly::FailureContext::default(); }; let lower = message.to_ascii_lowercase(); if lower.contains("unique constraint failed") { enrich_unique_violation(database, command, message).await } else if lower.contains("not null constraint failed") { enrich_not_null_violation(command, message) } else if lower.contains("check constraint failed") { enrich_check_violation(database, command, message).await } else if lower.contains("foreign key constraint failed") { enrich_fk_violation(database, command).await } else { crate::friendly::FailureContext::default() } } /// Enrich a `CHECK` violation (ADR-0029 §10). The engine /// reports `CHECK constraint failed: ` — the column the /// constraint sits on, unqualified. We pair that with the /// command's table, the value the user supplied for the column, /// and the column's compiled `CHECK` expression so the friendly /// error can name the rule that was broken. async fn enrich_check_violation( database: &Database, command: &Command, message: &str, ) -> crate::friendly::FailureContext { let mut facts = crate::friendly::FailureContext::default(); let Some((_, after)) = message.split_once(':') else { return facts; }; let column = after.trim(); let table = command.target_table(); if column.is_empty() || table.is_empty() { return facts; } facts.column = Some(column.to_string()); facts.table = Some(table.to_string()); // The value the user supplied for the constrained column. facts.value = user_value_for_column_with_schema(database, command, table, column) .await .map(|v| v.to_string()); // The rule itself — the column's compiled CHECK expression. if let Ok(desc) = database.describe_table(table.to_string(), None).await && let Some(col) = desc.columns.iter().find(|c| c.name == column) { facts.check_rule.clone_from(&col.check); } facts } async fn enrich_unique_violation( database: &Database, command: &Command, message: &str, ) -> crate::friendly::FailureContext { let mut facts = crate::friendly::FailureContext::default(); let Some((table, column)) = parse_qualified_target(message) else { return facts; }; facts.table = Some(table.clone()); facts.column = Some(column.clone()); // Resolve the user's attempted value. let raw_value = user_value_for_column_with_schema(database, command, &table, &column).await; facts.value = raw_value.as_ref().map(ToString::to_string); // Pinpoint the existing conflicting row, capped per // ADR-0017 §7's `DIAGNOSTIC_ROW_CAP` (we use a tighter cap // here — a single conflicting row is the typical case for // UNIQUE since the constraint enforces it). if let Some(value) = raw_value && let Ok(data) = database .find_rows_matching(table.clone(), column.clone(), value, 5) .await && !data.rows.is_empty() { facts.diagnostic_table = Some(diagnostic_from_data_result(&data)); } facts } fn enrich_not_null_violation( command: &Command, message: &str, ) -> crate::friendly::FailureContext { let mut facts = crate::friendly::FailureContext::default(); let Some((table, column)) = parse_qualified_target(message) else { return facts; }; facts.table = Some(table); facts.column = Some(column); // The "attempted value" for NOT NULL is by definition null — // surfacing it doesn't add information. Skip the value // resolution; the catalog headline reads "X cannot be null" // and stands on its own. let _ = command; facts } async fn enrich_fk_violation( database: &Database, command: &Command, ) -> crate::friendly::FailureContext { let mut facts = crate::friendly::FailureContext::default(); match command { Command::Insert { table, .. } | Command::Update { table, .. } => { // Child-side: outbound FK lookup. Find the FK // column the user is setting / updating. Use the // schema-aware lookup so natural-order multi-value // INSERT (which `user_value_for_column` alone can't // resolve) gets handled too. let Ok((outbound, _)) = database.read_relationships(table.clone()).await else { return facts; }; facts.table = Some(table.clone()); for rel in outbound { let value = user_value_for_column_with_schema( database, command, table, &rel.local_column, ) .await; if let Some(v) = value { facts.column = Some(rel.local_column); facts.parent_table = Some(rel.other_table); facts.parent_column = Some(rel.other_column); facts.value = Some(v.to_string()); break; } } // For UPDATE, if no outbound match was found we may // be in the parent-side case (updating a column // children reference). Check inbound as a fallback. if facts.parent_table.is_none() && matches!(command, Command::Update { .. }) && let Ok((_, inbound)) = database.read_relationships(table.clone()).await && let Some(rel) = inbound.first() { facts.child_table = Some(rel.other_table.clone()); } } Command::Delete { table, .. } => { // Parent-side: inbound FK lookup. Surface a child // table that still references the row(s) being // deleted. let Ok((_, inbound)) = database.read_relationships(table.clone()).await else { return facts; }; facts.table = Some(table.clone()); if let Some(rel) = inbound.first() { facts.child_table = Some(rel.other_table.clone()); } } _ => {} } facts } /// Find the user's attempted value for `column` directly from /// the originating Command. Handles INSERT (explicit columns, /// single-value short form) and UPDATE (assignments). Returns /// `None` for natural-order multi-value INSERT — that case /// needs a schema lookup, see /// [`user_value_for_column_with_schema`]. fn user_value_for_column(command: &Command, column: &str) -> Option { match command { Command::Insert { columns, values, .. } => { if let Some(cols) = columns { let idx = cols.iter().position(|c| c == column)?; values.get(idx).cloned() } else if values.len() == 1 { values.first().cloned() } else { None } } Command::Update { assignments, .. } => assignments .iter() .find(|(c, _)| c == column) .map(|(_, v)| v.clone()), _ => None, } } /// Same as [`user_value_for_column`] but handles natural-order /// multi-value INSERT by reading the schema to learn which /// position belongs to which column. Mirrors `do_insert`'s /// position-mapping rule (auto-generated columns — serial, /// shortid — are skipped, since the user doesn't supply /// values for them in the natural-order short form). async fn user_value_for_column_with_schema( database: &Database, command: &Command, table: &str, column: &str, ) -> Option { if let Some(v) = user_value_for_column(command, column) { return Some(v); } if let Command::Insert { columns: None, values, .. } = command { let desc = database .describe_table(table.to_string(), None) .await .ok()?; // Build the natural-order column list the same way // `do_insert` does: filter out serial / shortid columns // because the engine auto-fills them and the user's // positional values map onto the remainder. let natural_cols: Vec<&str> = desc .columns .iter() .filter(|c| { !matches!( c.user_type, Some(crate::dsl::Type::Serial) | Some(crate::dsl::Type::ShortId) ) }) .map(|c| c.name.as_str()) .collect(); let idx = natural_cols.iter().position(|c| *c == column)?; return values.get(idx).cloned(); } None } /// Render a `DataResult` as a `DiagnosticTable` for the /// friendly-error layer's bordered renderer (ADR-0019 §7, /// reusing ADR-0017 §7's renderer). fn diagnostic_from_data_result( data: &DataResult, ) -> crate::friendly::DiagnosticTable { use crate::output_render::{numeric_alignment_for, Alignment}; let alignments: Vec = data .column_types .iter() .map(|t| { t.map_or(Alignment::Left, numeric_alignment_for) }) .collect(); let rows: Vec> = data .rows .iter() .map(|r| { r.iter() .map(|c| c.clone().unwrap_or_else(|| "NULL".to_string())) .collect() }) .collect(); crate::friendly::DiagnosticTable { headers: data.columns.clone(), rows, alignments, } } /// Extract `(table, column)` from a qualified target like /// `"UNIQUE constraint failed: T.col"`. Mirrors the helper in /// `friendly::translate` (the translator does its own parse as /// a fallback when enrichment didn't run). fn parse_qualified_target(message: &str) -> Option<(String, String)> { let after = message.split_once(':').map(|(_, r)| r.trim())?; let first = after.split(',').next()?.trim(); let mut parts = first.splitn(2, '.'); let table = parts.next()?.trim(); let column = parts.next()?.trim(); if table.is_empty() || column.is_empty() { None } else { Some((table.to_string(), column.to_string())) } } enum CommandOutcome { Schema(Option), Query(DataResult), QueryPlan(QueryPlan), Insert(InsertResult), Update(UpdateResult), Delete(DeleteResult), ChangeColumn(ChangeColumnTypeResult), AddColumn(AddColumnResult), DropColumn(DropColumnResult), } /// Spawn a task that reads a script file and dispatches each /// non-blank, non-comment line through the same `execute_command_typed` /// pipeline as interactive input. /// /// `path` is the literal user-typed argument; relative paths are /// resolved against `project_root`. On the first per-line failure /// the task posts `AppEvent::ReplayFailed` carrying the line /// number and stops — previously dispatched commands stay applied /// (no rollback). On clean completion the task posts /// `AppEvent::ReplayCompleted` with the count of commands that /// were actually run (blanks/comments excluded). /// /// The replay invocation itself is NOT written to `history.log` /// (that happens at the App level, where Replay is dispatched as /// `Action::Replay` rather than `Action::ExecuteDsl`); only the /// individual sub-commands land there, since they are what /// actually mutate state. fn spawn_replay( database: Database, project_root: PathBuf, path: String, event_tx: mpsc::Sender, ) { tokio::spawn(async move { let events = run_replay(&database, &project_root, &path).await; for event in events { if event_tx.send(event).await.is_err() { return; } } // Refresh the table list once at the end — every command // dispatched through `execute_command_typed` may have // altered the schema, but we don't want to flicker the // panel mid-replay (and the table list is a derived view // anyway). match database.list_tables().await { Ok(tables) => { let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await; } Err(e) => warn!(error = %e, "post-replay list_tables failed"), } refresh_schema_cache(&database, &event_tx).await; }); } /// Inner replay loop, separated from the spawn wrapper so tests /// can exercise the file-iteration / parse / dispatch logic /// without an mpsc channel and a spawned task. /// /// Returns the `AppEvent`s that the spawn wrapper would emit, in /// order. Always returns at least one terminal event /// (`ReplayCompleted`, `ReplayFailed`, or `PersistenceFatal`). pub async fn run_replay( database: &Database, project_root: &std::path::Path, path: &str, ) -> Vec { let mut events: Vec = Vec::new(); let resolved = resolve_replay_path(project_root, path); let body = match tokio::fs::read_to_string(&resolved).await { Ok(b) => b, Err(e) => { events.push(AppEvent::ReplayFailed { path: path.to_string(), line_number: 0, command: String::new(), error: crate::t!( "replay.error_could_not_open", path = resolved.display(), detail = e ), }); return events; } }; let mut count: usize = 0; let mut warnings: Vec = Vec::new(); for (idx, raw) in body.lines().enumerate() { let line_number = idx + 1; let trimmed = raw.trim(); if trimmed.is_empty() || trimmed.starts_with('#') { continue; } // ADR-0034 §3: a journal record (`||`) // contributes its extracted source when `ok` and is skipped // otherwise; any other line is a bare command run verbatim. // This is what makes `replay history.log` work without // breaking hand-written `.commands` scripts. let command_text = match crate::persistence::classify_replay_line(trimmed) { crate::persistence::ReplayLine::Skip => continue, crate::persistence::ReplayLine::Run(text) => text, }; // ADR-0034 Amendment 1: classify by entry word BEFORE parsing. // App-lifecycle commands and a nested `replay` are skipped // during replay — they are session navigation, not schema/data // reconstruction (and the worker dispatch cannot run them). // Classifying by the leading word handles the modal forms // (`save as` / `load` / `new`) and any incomplete app form // uniformly: ALL such skips continue, never aborting the // replay. `import` and a nested `replay` warn, because skipping // them can leave the replayed state incomplete (imported data / // the nested file's commands are not reconstructed); the rest // skip silently. Skipping a nested `replay` also closes the // infinite-loop footgun by construction. let entry = command_text .split_whitespace() .next() .unwrap_or("") .to_ascii_lowercase(); if entry == "import" { warnings.push(crate::t!( "replay.skipped_import", line = line_number, command = command_text )); continue; } if entry == "replay" { warnings.push(crate::t!( "replay.skipped_replay", line = line_number, command = command_text )); continue; } if is_app_lifecycle_entry_word(&entry) { continue; } // A schema/data write (or read, or a genuine typo). Parse with // the live schema — re-snapshotted every line because earlier // replayed commands mutate it (so Phase D typed-slot rejections // fire at parse time, matching the interactive path). A parse // failure here is a genuine malformed command (not an app // command, which was skipped above) — report it with the line // number and stop. let schema = build_schema_cache(database).await; let command = match crate::dsl::parser::parse_command_with_schema( &command_text, &schema, ) { Ok(c) => c, Err(e) => { events.push(AppEvent::ReplayFailed { path: path.to_string(), line_number, command: command_text.clone(), error: crate::t!("replay.error_parse", detail = e), }); return events; } }; // Dispatch through the same path as interactive input so // per-command persistence (history.log, project.yaml, // CSVs) fires as if the user had typed each line. The // source re-journalled is the *extracted* command, not the // raw `|ok|…` record (ADR-0034 §3). let outcome = execute_command_typed(database, command, command_text.clone()).await; match outcome { Ok(_) => { count += 1; } Err(DbError::PersistenceFatal { operation, path: pf_path, message, }) => { // Persistence-fatal escalates through the existing // fatal-banner channel; the runtime tears down on // it, so further replay progress is moot. events.push(AppEvent::PersistenceFatal { operation: operation.to_string(), path: pf_path, message, }); return events; } Err(e) => { events.push(AppEvent::ReplayFailed { path: path.to_string(), line_number, command: command_text.clone(), error: e.friendly_message(), }); return events; } } } events.push(AppEvent::ReplayCompleted { path: path.to_string(), count, warnings, }); events } /// True when `entry` (a lowercased leading command word) is an /// app-lifecycle command that replay skips (ADR-0034 Amendment 1). /// `import` and `replay` are handled separately by the caller /// (they warn); this is the silent-skip set. Mirrors the /// `AppCommand` entry words (see `src/completion.rs`'s /// `empty_input_offers_app_command_entry_keywords`); both must list /// the same words. A new `AppCommand` must be added here so replay /// skips it rather than aborting. `q` is intentionally absent: it is /// not a recognised command, so a `q` line is a genuine error. fn is_app_lifecycle_entry_word(entry: &str) -> bool { matches!( entry, "save" | "load" | "new" | "export" | "mode" | "messages" | "rebuild" | "help" | "quit" | "undo" | "redo" ) } /// Resolve a `replay ` argument: absolute paths pass /// through unchanged; relative paths are joined under the active /// project's root so `replay history.log` works without ceremony /// from inside any project. fn resolve_replay_path(project_root: &std::path::Path, path: &str) -> PathBuf { let p = PathBuf::from(path); if p.is_absolute() { p } else { project_root.join(p) } } /// Execute a parsed user command and return either a typed /// `CommandOutcome` or the raw `DbError`. Keeping the typed /// error here lets us distinguish persistence-fatal failures /// from ordinary user errors at dispatch time (ADR-0015 §8). async fn execute_command_typed( database: &Database, command: Command, source: String, ) -> Result { let src = Some(source); match command { Command::CreateTable { name, columns, primary_key, } => database .create_table(name, columns, primary_key, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::DropTable { name } => database .drop_table(name, src) .await .map(|()| CommandOutcome::Schema(None)), Command::AddColumn { table, column, ty, not_null, unique, default, check, } => database .add_column( table, ColumnSpec { name: column, ty, not_null, unique, default, check, }, src, ) .await .map(CommandOutcome::AddColumn), Command::DropColumn { table, column, cascade, } => database .drop_column(table, column, cascade, src) .await .map(CommandOutcome::DropColumn), Command::RenameColumn { table, old, new } => database .rename_column(table, old, new, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::ChangeColumnType { table, column, ty, mode, } => database .change_column_type(table, column, ty, mode, src) .await .map(CommandOutcome::ChangeColumn), Command::AddRelationship { name, parent_table, parent_column, child_table, child_column, on_delete, on_update, create_fk, } => database .add_relationship( name, parent_table, parent_column, child_table, child_column, on_delete, on_update, create_fk, src, ) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::DropRelationship { selector } => database .drop_relationship(selector, src) .await .map(CommandOutcome::Schema), Command::AddIndex { name, table, columns, } => database .add_index(name, table, columns, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::DropIndex { selector } => database .drop_index(selector, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::AddConstraint { table, column, constraint, } => database .add_constraint(table, column, constraint, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::DropConstraint { table, column, kind, } => database .drop_constraint(table, column, kind, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::ShowTable { name } => database .describe_table(name, src) .await .map(|d| CommandOutcome::Schema(Some(d))), Command::Insert { table, columns, values, } => database .insert(table, columns, values, src) .await .map(CommandOutcome::Insert), Command::Update { table, assignments, filter, } => database .update(table, assignments, filter, src) .await .map(CommandOutcome::Update), Command::Delete { table, filter } => database .delete(table, filter, src) .await .map(CommandOutcome::Delete), Command::ShowData { name, filter, limit, } => database .query_data(name, filter, limit, src) .await .map(CommandOutcome::Query), // A SQL `SELECT` (advanced mode; ADR-0030 §6, ADR-0031). // The grammar walker has already validated `sql` is in // the supported subset; the worker runs it as text. Command::Select { sql } => database .run_select(sql, src) .await .map(CommandOutcome::Query), // A SQL `INSERT` (advanced mode; ADR-0033 §1). Grammar-as- // text: the worker runs the validated `sql` and re-persists // the parsed `target_table`'s CSV. Reuses the DSL insert // outcome (affected-row count + auto-show). Command::SqlInsert { sql, target_table, listed_columns, row_source, returning, } => database .run_sql_insert(sql, src, target_table, listed_columns, row_source, returning) .await .map(CommandOutcome::Insert), // A SQL `UPDATE` (advanced mode; ADR-0033 §2). Grammar-as- // text: the worker runs the validated `sql` and re-persists // the parsed `target_table`'s CSV. Reuses the DSL update // outcome (affected-row count). Command::SqlUpdate { sql, target_table, returning, } => database .run_sql_update(sql, src, target_table, returning) .await .map(CommandOutcome::Update), // A SQL `DELETE` (advanced mode; ADR-0033 §1/§7). Grammar- // as-text: the worker runs the validated `sql`, detects FK // cascade by row-count diffing, and re-persists the target // plus every cascade-affected child. Reuses the DSL delete // outcome (affected-row count + per-relationship cascade // summary). Command::SqlDelete { sql, target_table, returning, } => database .run_sql_delete(sql, src, target_table, returning) .await .map(CommandOutcome::Delete), // `EXPLAIN QUERY PLAN` never executes the wrapped // statement (ADR-0028 §2), so explaining a destructive // command is safe. `src` is unused here — explain is a // diagnostic and is not written to `history.log`. Command::Explain { query } => database .explain_query_plan(*query) .await .map(CommandOutcome::QueryPlan), // `replay` is parsed as a DSL command but routed by // App::dispatch_dsl as `Action::Replay` rather than // `Action::ExecuteDsl`; it never reaches the worker // thread. Hitting this arm would mean the dispatch // routing was bypassed. Command::Replay { .. } => unreachable!( "Command::Replay is dispatched as Action::Replay; \ reaching execute_command_typed indicates a routing bug" ), // App-lifecycle commands are dispatched in App, not by // the database worker. Hitting this arm would mean the // dispatch routing was bypassed. Command::App(_) => unreachable!( "Command::App is dispatched via App::dispatch_app_command; \ reaching execute_command_typed indicates a routing bug" ), } } fn spawn_event_reader(tx: mpsc::Sender) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut stream = EventStream::new(); while let Some(maybe_event) = stream.next().await { match maybe_event { Ok(CtEvent::Key(key)) => { if tx.send(AppEvent::Key(key)).await.is_err() { break; } } Ok(CtEvent::Resize(cols, rows)) => { if tx.send(AppEvent::Resize { cols, rows }).await.is_err() { break; } } Ok(_) => { // Ignore other event kinds (paste, focus, mouse) for now. } Err(e) => { error!(error = %e, "crossterm event stream error"); break; } } } debug!("event reader exiting"); }) } fn setup_terminal() -> Result>> { enable_raw_mode().context("enable raw mode")?; let mut stdout = io::stdout(); // Mouse capture is intentionally NOT enabled: it would prevent the // host terminal's native text selection (the cost of capturing every // mouse event), which we don't currently use for anything in-app. // If we ever want click-to-select panes or scroll wheel handling, // we'll need a different strategy than blanket capture. execute!(stdout, EnterAlternateScreen).context("enter alternate screen")?; let backend = CrosstermBackend::new(stdout); let terminal = Terminal::new(backend).context("construct terminal")?; Ok(terminal) } fn teardown_terminal( terminal: &mut Terminal>, ) -> Result<()> { disable_raw_mode().context("disable raw mode")?; execute!(terminal.backend_mut(), LeaveAlternateScreen) .context("leave alternate screen")?; terminal.show_cursor().context("show cursor")?; Ok(()) } #[cfg(test)] mod tests { use super::IndicatorDebounce; use crate::dsl::walker::Severity; #[test] fn starts_hidden_and_disarmed() { let d = IndicatorDebounce::default(); assert_eq!(d.visible(), None); assert!(!d.is_armed()); } #[test] fn a_keystroke_hides_the_indicator_and_arms_the_debounce() { let mut d = IndicatorDebounce::default(); d.settle(Some(Severity::Error)); assert_eq!(d.visible(), Some(Severity::Error)); d.note_event(true); assert_eq!(d.visible(), None, "a keystroke hides the indicator"); assert!(d.is_armed(), "a keystroke arms the debounce"); } #[test] fn settling_shows_the_verdict_and_disarms() { let mut d = IndicatorDebounce::default(); d.note_event(true); assert!(d.is_armed()); d.settle(Some(Severity::Warning)); assert_eq!(d.visible(), Some(Severity::Warning)); assert!(!d.is_armed(), "a settled indicator owes no recompute"); } #[test] fn settling_a_clean_verdict_shows_nothing() { let mut d = IndicatorDebounce::default(); d.note_event(true); d.settle(None); assert_eq!(d.visible(), None); assert!(!d.is_armed()); } #[test] fn a_non_key_event_does_not_disturb_a_shown_indicator() { let mut d = IndicatorDebounce::default(); d.settle(Some(Severity::Error)); d.note_event(false); assert_eq!( d.visible(), Some(Severity::Error), "a non-key event leaves the indicator shown", ); assert!(!d.is_armed(), "a non-key event does not arm the debounce"); } #[test] fn a_non_key_event_while_armed_keeps_the_debounce_armed() { // A background event (a DSL result, a tables refresh) // arriving mid-typing must not cancel the pending // recompute. let mut d = IndicatorDebounce::default(); d.note_event(true); assert!(d.is_armed()); d.note_event(false); assert!(d.is_armed(), "the owed recompute survives a non-key event"); assert_eq!(d.visible(), None, "and the indicator stays hidden"); } #[test] fn typing_resumes_after_a_settle() { // The full cycle: type → settle → type again → settle. let mut d = IndicatorDebounce::default(); d.note_event(true); d.settle(Some(Severity::Warning)); assert_eq!(d.visible(), Some(Severity::Warning)); d.note_event(true); assert_eq!(d.visible(), None, "new typing hides the indicator again"); assert!(d.is_armed()); d.settle(None); assert_eq!(d.visible(), None); assert!(!d.is_armed()); } }