Files
rdbms-playground/src/runtime.rs
T
claude@clouddev1 4cd574b909 feat: persist & restore per-project input mode (#14)
The input mode always started in simple; a learner who quit in advanced
had to re-toggle every launch. Store the mode per-project in project.yaml
(project.mode:, optional, default simple) and restore it on every open.

Mode is live UI state, not schema: the worker stamps the current mode
into project.yaml on every write, so a later command rewrites the live
value rather than clobbering it — no db round-trip needed. The mode is
persisted on unload (quit + project switch) so the mode you leave a
project in is always what reopens; the `mode` command also persists
immediately. A switch saves the outgoing mode, then restores the
incoming project's stored mode.

New --mode simple|advanced CLI flag (precedence --mode > stored >
simple; combines with --resume). A teacher can ship a project that
opens in advanced mode and export it to students (the mode travels in
the zip).

ADR-0015 Amendment 1; ADR-0003 note; help banner; requirements L1b.
2026-06-02 06:47:34 +00:00

3438 lines
134 KiB
Rust

//! Tokio-based event loop.
//!
//! A blocking task reads crossterm events and forwards them onto
//! an `mpsc` channel as `AppEvent`s. The main loop awaits events,
//! feeds them to `App::update`, enacts any returned `Action`s,
//! and redraws the terminal. DSL execution is dispatched onto
//! the database worker (see `db::Database`), and its result is
//! posted back as a new `AppEvent`. Future async work (snapshot
//! capture, auto-save) joins the same event channel as
//! additional producers.
use std::io;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{Context, Result};
use crossterm::event::{Event as CtEvent, EventStream};
use crossterm::execute;
use crossterm::terminal::{
EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
};
use futures_util::StreamExt;
use ratatui::Terminal;
use ratatui::backend::CrosstermBackend;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::action::Action;
use crate::app::App;
use crate::cli::Args;
use crate::db::{
AddColumnResult, ChangeColumnTypeResult, CreateIndexOutcome, CreateOutcome, DataResult,
Database, DbError, DeleteResult, DropColumnResult, DropIndexOutcome, DropOutcome, InsertResult,
QueryPlan, TableDescription, UpdateResult,
};
use crate::dsl::command::{
Constraint, ConstraintKind, IndexSelector, RelationshipSelector, TableConstraint,
};
use crate::dsl::{AlterTableAction, ChangeColumnMode, Command, ColumnSpec};
use crate::dsl::walker::Severity;
use crate::event::AppEvent;
use crate::project::{
Project, ProjectKind, copy_project, list_projects, open_or_create, projects_dir,
read_last_project, resolve_data_root, safely_delete_temp_project, write_last_project,
};
use crate::theme::Theme;
use crate::ui;
const EVENT_CHANNEL_CAPACITY: usize = 64;
const SHUTDOWN_GRACE: Duration = Duration::from_millis(100);
/// Quiet interval before the input-validity indicator
/// reappears once typing stops (ADR-0027 §3).
const INDICATOR_DEBOUNCE: Duration = Duration::from_millis(1000);
/// The input-validity indicator's debounce state machine
/// (ADR-0027 §3, step E).
///
/// A keystroke hides the indicator and arms the debounce; the
/// event loop time-boxes `recv` while `is_armed`, so once
/// typing has paused for `INDICATOR_DEBOUNCE` it calls `settle`
/// with the freshly computed verdict. The `tokio` timer and the
/// terminal live in the loop — this owns only the decision
/// logic, keeping the debounce contract unit-testable without
/// an event loop. `App::input_indicator` mirrors `visible` for
/// the renderer.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
struct IndicatorDebounce {
/// What the indicator currently shows; `None` is hidden.
visible: Option<Severity>,
/// True while a recompute is owed — only then does the
/// event loop time-box `recv`.
armed: bool,
}
impl IndicatorDebounce {
/// The indicator value the renderer should mirror.
const fn visible(self) -> Option<Severity> {
self.visible
}
/// Whether the event loop should time-box `recv` with the
/// debounce window.
const fn is_armed(self) -> bool {
self.armed
}
/// Record a processed event. A keystroke hides the
/// indicator and (re)arms the debounce; any other event
/// leaves the indicator and the armed state untouched (a
/// background result arriving mid-typing must not cancel
/// the pending recompute).
const fn note_event(&mut self, is_key: bool) {
if is_key {
self.visible = None;
self.armed = true;
}
}
/// The debounce window elapsed — typing has paused. Show
/// `verdict` and disarm.
const fn settle(&mut self, verdict: Option<Severity>) {
self.visible = verdict;
self.armed = false;
}
}
/// Run the application until a `Quit` action is enacted or the
/// terminal closes.
pub async fn run(args: Args) -> Result<()> {
// Resolve data root explicitly so run_loop can refer back
// to it for `new` (creates a temp) and `load` (lists
// projects). We can't easily recover this from the
// Project alone, so we keep it ourselves.
let data_root = resolve_data_root(args.data_dir.as_deref())
.context("resolve data root")?;
// Resolve the initial project path: --resume reads it from
// <data-root>/last_project; otherwise an explicit positional
// arg, falling back to a fresh auto-named temp.
//
// ADR-0015 §7: --resume errors out cleanly when the path is
// missing or the recorded project no longer exists. We
// surface those failures to stderr before booting the
// terminal so the message lands directly in the user's
// shell.
let initial_path: Option<PathBuf> = if args.resume {
match read_last_project(&data_root)
.context("read last_project")?
{
Some(p) if p.exists() => Some(p),
Some(p) => {
eprintln!(
"rdbms-playground: {}",
crate::t!(
"project.resume_recorded_missing",
path = p.display(),
),
);
return Ok(());
}
None => {
eprintln!(
"rdbms-playground: {}",
crate::t!(
"project.resume_no_previous",
data_root = data_root.display(),
),
);
return Ok(());
}
}
} else {
args.project_path.clone()
};
let project = open_or_create(initial_path.as_deref(), Some(data_root.as_path()))
.context("open or create project")?;
// Run any pending project.yaml migrations before the
// database opens (so the rebuild path only ever sees the
// latest schema). The registry is empty in v1; future
// versions register their migrators here. A migration
// that runs is recorded in tracing and leaves a
// `project.yaml.v<N>.bak` breadcrumb on disk; that's
// sufficient v1 UX and lets us defer dedicated event
// plumbing until a real migrator demands it.
let migrate_registry = crate::persistence::migrations::MigratorRegistry::production();
let migration_outcome = crate::persistence::migrations::ensure_project_yaml_migrated(
project.path(),
&migrate_registry,
)
.context("migrate project.yaml")?;
if let Some(from) = migration_outcome.migrated_from {
info!(
from_version = from,
to_version = migrate_registry.latest_version(),
"migrated project.yaml",
);
}
// Record the just-opened project as the `--resume` target —
// but only if it is worth resuming. A freshly-created empty
// temp project is NOT recorded: it is auto-deleted on quit
// while still empty (ADR-0015), and a `last_project`
// pointing at a since-deleted directory makes the next
// `--resume` fail confusingly. A named project, or a temp
// that already carries content, is recorded. A temp that
// *gains* content this session is recorded on quit instead.
// Write failures are non-fatal.
if !project.is_unmodified_temp()
&& let Err(e) = write_last_project(&data_root, project.path())
{
warn!(error = %e, "could not update last_project");
}
let db_path = project.db_path();
let display_name = project.display_name().to_string();
let project_path = project.path().to_path_buf();
let project_is_temp = matches!(project.kind(), ProjectKind::Temp);
// Resolve the startup input mode (ADR-0015 mode-restore
// amendment, issue #14). Precedence: `--mode` flag > the
// project's stored mode > the default (`Simple`). A pre-#14
// project, or one with no `mode:` field, reads as `None` and
// falls through to the default. The resolved mode is given to
// `Persistence` so every `project.yaml` write records it, and
// set on the `App` so the first render shows the right mode.
let resolved_mode = crate::mode::Mode::resolve_startup(
args.mode,
crate::persistence::Persistence::read_stored_mode(&project_path),
);
info!(mode = %resolved_mode, "resolved startup input mode");
let persistence =
crate::persistence::Persistence::new(project_path.clone()).with_mode(resolved_mode);
// Capture whether the .db file existed BEFORE we open it —
// sqlite creates it on connect, so this is the only honest
// signal that we need to rebuild from text (ADR-0015 §7).
let db_existed = db_path.exists();
// Undo is on unless `--no-undo` (ADR-0006 Amendment 1).
let undo_enabled = !args.no_undo;
let database =
Database::open_with_persistence_and_undo(db_path.as_path(), persistence, undo_enabled)
.context("open database")?;
let mut initial_events: Vec<AppEvent> = Vec::new();
if !db_existed {
match database.rebuild_from_text(project_path.clone(), None).await {
Ok(()) => {
// Surface the silent rebuild as a system note
// *only* when there was something material to
// reconstruct. A fresh-launch temp project
// hits this branch with zero tables and zero
// rows — the rebuild ran, but nothing
// reconstructable existed, so there's nothing
// worth telling the user. The explicit
// `rebuild` command still always reports its
// summary (see spawn_rebuild) since the user
// asked.
if project_has_content(&project_path) {
let summary = summarize_project(&project_path).unwrap_or_else(|_| {
"rebuilt playground.db from project.yaml + data/".to_string()
});
initial_events.push(AppEvent::RebuildSucceeded { summary });
}
}
Err(e) => {
// The terminal is still in cooked mode here
// (we haven't entered the alternate screen
// yet), so writing to stderr lands directly
// in the user's shell. Drop the project to
// release the lock first.
drop(project);
if matches!(
e,
DbError::PersistenceFatal { .. } | DbError::RebuildRowFailed { .. }
) {
eprintln!("rdbms-playground: {}", e.friendly_message());
return Ok(());
}
return Err(anyhow::anyhow!(e.friendly_message())).context("rebuild from text");
}
}
}
let mut terminal = setup_terminal().context("setup terminal")?;
let result = run_loop(
&mut terminal,
args.theme,
Session {
project: Some(project),
database: Some(database),
data_root,
},
display_name,
project_is_temp,
initial_events,
undo_enabled,
resolved_mode,
)
.await;
if let Err(e) = teardown_terminal(&mut terminal) {
// Teardown failures should not mask the primary error.
warn!(error = %e, "terminal teardown failed");
}
// ADR-0015 §8: a fatal persistence failure makes its
// banner visible above the shell prompt by writing to
// stderr after the alternate screen has been left.
if let Ok(Some(banner)) = &result {
eprintln!("{banner}");
}
result.map(|_| ())
}
/// Mutable state owned by `run_loop` that survives project
/// switches: the live `Project` (with its lock), the live
/// `Database` (with its worker), and the active data root
/// for resolving relative paths in `save as` / `new` / load
/// picker listings.
///
/// `project` and `database` are wrapped in `Option` so a
/// project switch can `take()` the old (dropping its lock
/// and worker) before opening the new — required for the
/// "switch to my own current project" case where the new
/// open would otherwise see a self-held lock.
struct Session {
project: Option<Project>,
database: Option<Database>,
data_root: std::path::PathBuf,
}
impl Session {
const fn project(&self) -> &Project {
match self.project.as_ref() {
Some(p) => p,
None => panic!("project always set during run_loop"),
}
}
const fn database(&self) -> &Database {
match self.database.as_ref() {
Some(d) => d,
None => panic!("database always set during run_loop"),
}
}
}
#[allow(clippy::too_many_arguments)] // boot params; all inherent to one session
async fn run_loop(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
theme: Theme,
mut session: Session,
project_display_name: String,
project_is_temp: bool,
initial_events: Vec<AppEvent>,
undo_enabled: bool,
initial_mode: crate::mode::Mode,
) -> Result<Option<String>> {
let (event_tx, mut event_rx) = mpsc::channel::<AppEvent>(EVENT_CHANNEL_CAPACITY);
let reader_handle = spawn_event_reader(event_tx.clone());
let mut app = App::new();
app.project_name = Some(project_display_name);
app.project_is_temp = project_is_temp;
app.undo_enabled = undo_enabled;
// Start in the resolved input mode (ADR-0015 mode-restore
// amendment, issue #14): `--mode` > stored project mode >
// default. `Persistence` already carries the same value, so the
// worker records it on the next write.
app.mode = initial_mode;
// Seed the in-memory navigable history from the
// initial project's history.log (I2-persist, ADR-0015
// §12). Subsequent project switches re-seed via the
// `ProjectSwitched` event payload.
app.seed_history(read_history_seed(session.project().path()));
// Send any startup events (e.g., the system-message form
// of "rebuilt from text on missing .db") so they're
// dispatched through the normal event path and end up in
// the output panel before the user types anything.
for event in initial_events {
let _ = event_tx.send(event).await;
}
// Seed the table list with whatever the database currently
// shows. For a fresh in-memory DB this is empty, but doing
// it explicitly means file-backed databases (track 2) will
// show their tables on launch without changes here.
seed_initial_tables(session.database(), &event_tx).await;
terminal
.draw(|f| ui::render(&mut app, &theme, f))
.context("initial draw")?;
info!("entering main event loop");
// ADR-0027 §3: the validity indicator is debounced — hidden
// on every keystroke, recomputed and shown once typing has
// paused for `INDICATOR_DEBOUNCE`. `recv` is time-boxed only
// while the debounce is armed, so an idle session still does
// no wake-ups. See `IndicatorDebounce` for the decision
// logic; `app.input_indicator` mirrors it for the renderer.
let mut debounce = IndicatorDebounce::default();
loop {
let event = if debounce.is_armed() {
match tokio::time::timeout(INDICATOR_DEBOUNCE, event_rx.recv()).await {
Ok(Some(event)) => event,
Ok(None) => break,
Err(_elapsed) => {
// Typing has been quiet for the debounce
// interval — settle the indicator.
debounce.settle(app.input_validity_verdict());
app.input_indicator = debounce.visible();
terminal
.draw(|f| ui::render(&mut app, &theme, f))
.context("redraw")?;
continue;
}
}
} else {
match event_rx.recv().await {
Some(event) => event,
None => break,
}
};
let is_key = matches!(event, AppEvent::Key(_));
let actions = app.update(event);
let mut should_quit = false;
for action in actions {
match action {
Action::Quit => {
debug!("quit action received");
// Persist the mode we're leaving in, so it is
// restored next time the project opens (ADR-0015
// mode-restore amendment, issue #14 — persist on
// unload). Best-effort: a write failure must not
// block quitting.
if let Err(e) = session.database().set_mode(app.mode).await {
tracing::warn!(error = %e, "could not persist input mode on quit");
}
should_quit = true;
}
Action::ExecuteDsl {
command,
source,
submission_mode,
} => {
spawn_dsl_dispatch(
session.database().clone(),
event_tx.clone(),
command,
source,
submission_mode,
);
}
Action::JournalFailure { source } => {
// ADR-0034 §1/§4: record a failed command as an
// `err` record. Best-effort — a failure to record
// a failure must never escalate a user error into
// a fatal, so the result is logged and ignored.
if let Err(e) = crate::persistence::Persistence::new(
session.project().path().to_path_buf(),
)
.append_history_failure(&source)
{
tracing::warn!(error = %e, "failed to journal err record (ignored)");
}
}
Action::PrepareRebuild => {
spawn_prepare_rebuild(
session.project().path().to_path_buf(),
event_tx.clone(),
);
}
Action::Rebuild { source } => {
spawn_rebuild(
session.database().clone(),
session.project().path().to_path_buf(),
event_tx.clone(),
source,
);
}
Action::OpenLoadPicker => {
let entries: Vec<crate::app::LoadPickerEntry> =
list_projects(&session.data_root)
.into_iter()
.map(|p| crate::app::LoadPickerEntry {
display_name: p.display_name,
modified: p.modified,
path: p.path,
is_temp: matches!(p.kind, ProjectKind::Temp),
})
.collect();
let _ = event_tx.send(AppEvent::LoadPickerReady { entries }).await;
}
Action::LoadProject { path, source } => {
handle_project_switch(
&mut session,
SwitchRequest::Load { path },
source,
&event_tx,
undo_enabled,
app.mode,
)
.await;
}
Action::SaveAs { target, source } => {
handle_project_switch(
&mut session,
SwitchRequest::SaveAs { target },
source,
&event_tx,
undo_enabled,
app.mode,
)
.await;
}
Action::NewProject { source } => {
handle_project_switch(
&mut session,
SwitchRequest::NewTemp,
source,
&event_tx,
undo_enabled,
app.mode,
)
.await;
}
Action::Export { target, source } => {
spawn_export(
session.project().path().to_path_buf(),
directory_basename(session.project().path()),
session.data_root.clone(),
target,
source,
event_tx.clone(),
);
}
Action::Import {
zip_path,
as_target,
source,
} => {
handle_project_switch(
&mut session,
SwitchRequest::Import {
zip_path: std::path::PathBuf::from(&zip_path),
as_target,
},
source,
&event_tx,
undo_enabled,
app.mode,
)
.await;
}
Action::PrepareUndo => {
spawn_prepare_undo(session.database().clone(), event_tx.clone(), false);
}
Action::PrepareRedo => {
spawn_prepare_undo(session.database().clone(), event_tx.clone(), true);
}
Action::Undo => {
spawn_undo(session.database().clone(), event_tx.clone(), false);
}
Action::Redo => {
spawn_undo(session.database().clone(), event_tx.clone(), true);
}
Action::Replay { path } => {
spawn_replay(
session.database().clone(),
session.project().path().to_path_buf(),
path,
event_tx.clone(),
);
}
Action::PersistMode(mode) => {
// Best-effort: the in-memory mode already changed;
// a failure to record it must not fatal a UI toggle
// (ADR-0015 mode-restore amendment, issue #14).
if let Err(e) = session.database().set_mode(mode).await {
tracing::warn!(error = %e, "could not persist input mode");
}
}
}
}
// A keystroke hides the indicator and re-arms the
// debounce (ADR-0027 §3) — it reappears once typing
// pauses; non-key events leave it untouched.
debounce.note_event(is_key);
app.input_indicator = debounce.visible();
terminal
.draw(|f| ui::render(&mut app, &theme, f))
.context("redraw")?;
if should_quit {
break;
}
}
let _ = tokio::time::timeout(SHUTDOWN_GRACE, reader_handle).await;
// Decide the active project's fate on quit, before dropping
// it. An unmodified empty temp is auto-deleted — same rule
// as on project switch (see perform_switch). Anything else —
// a named project, or a temp that *gained* content this
// session — is recorded as the `--resume` target: this is
// the point at which a launch-temp the user filled with
// content finally gets remembered (the `run()` startup
// write skipped it while it was still empty). The two are
// mutually exclusive (one needs an unmodified temp, the
// other anything else).
let project_at_quit = session.project.as_ref();
let cleanup_on_quit: Option<std::path::PathBuf> = project_at_quit
.and_then(|p| p.is_unmodified_temp().then(|| p.path().to_path_buf()));
let resume_target_on_quit: Option<std::path::PathBuf> = project_at_quit
.filter(|p| !p.is_unmodified_temp())
.map(|p| p.path().to_path_buf());
let _ = session.database.take();
let _ = session.project.take();
if let Some(target) = resume_target_on_quit
&& let Err(e) = write_last_project(&session.data_root, &target)
{
tracing::warn!(error = %e, "could not update last_project on quit");
}
if let Some(stale) = cleanup_on_quit {
match safely_delete_temp_project(&stale, &session.data_root) {
Ok(()) => tracing::info!(
path = %stale.display(),
"cleaned up unmodified temp project on quit",
),
Err(e) => tracing::warn!(
path = %stale.display(),
error = %e,
"did not clean up unmodified temp project on quit",
),
}
}
info!("event loop exited");
Ok(app.fatal_message.clone())
}
/// What kind of project switch the user requested.
enum SwitchRequest {
/// `load` (picker or browse-to-path) — open an existing
/// project at `path`.
Load { path: std::path::PathBuf },
/// `save as` — copy the current project to a new
/// location, then open that copy. `target` is a name
/// (resolved under `<data-root>/projects/`) or an
/// absolute path.
SaveAs { target: String },
/// `new` — close current, create a fresh auto-named temp.
NewTemp,
/// `import` — extract a zip into a new project under the
/// data root's projects dir, then switch to it. The
/// destination basename is taken from the zip's
/// top-level folder by default (`as_target` is `None`),
/// or from the user-supplied override; collisions
/// auto-suffix `-NN` (ADR-0015 §11 amendment).
Import {
zip_path: std::path::PathBuf,
as_target: Option<String>,
},
}
/// Common project-switch path. Drops the current project +
/// database (releasing the lock and stopping the worker),
/// opens the new one, runs a rebuild if the .db is missing,
/// appends history.log, and sends a `ProjectSwitched` event
/// so App refreshes its display.
///
/// Errors are surfaced as `ProjectSwitchFailed` (non-fatal):
/// the current project remains active.
async fn handle_project_switch(
session: &mut Session,
req: SwitchRequest,
source: String,
event_tx: &mpsc::Sender<AppEvent>,
undo_enabled: bool,
outgoing_mode: crate::mode::Mode,
) {
// Persist the outgoing project's mode before it is unloaded
// (ADR-0015 mode-restore amendment, issue #14 — persist on
// unload). Best-effort, and before `perform_switch` drops the
// outgoing database. The switched-to project's own stored mode
// is restored separately, via the `ProjectSwitched` event.
if let Err(e) = session.database().set_mode(outgoing_mode).await {
tracing::warn!(error = %e, "could not persist input mode on switch");
}
match perform_switch(session, req, source, undo_enabled).await {
Ok((display_name, is_temp, mode)) => {
let history_entries = read_history_seed(session.project().path());
let _ = event_tx
.send(AppEvent::ProjectSwitched {
display_name,
is_temp,
history_entries,
mode,
})
.await;
if let Ok(tables) = session.database().list_tables().await {
let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await;
}
refresh_schema_cache(session.database(), event_tx).await;
}
Err(e) => {
let _ = event_tx
.send(AppEvent::ProjectSwitchFailed { error: e })
.await;
}
}
}
/// Read the most-recent `HISTORY_HYDRATION_CAP` source lines
/// out of the project's `history.log` for input-history
/// seeding. Failures are logged and swallowed — an empty
/// hydration is the right fallback when the file is unreadable.
fn read_history_seed(project_path: &std::path::Path) -> Vec<String> {
let p = crate::persistence::Persistence::new(project_path.to_path_buf());
match p.read_recent_history(HISTORY_HYDRATION_CAP) {
Ok(entries) => entries,
Err(e) => {
tracing::warn!(error = %e, "history hydration failed; starting empty");
Vec::new()
}
}
}
/// Maximum number of `history.log` entries to seed the
/// in-memory navigable history with on project open. Matches
/// the in-memory cap (`app::HISTORY_CAPACITY`) per ADR-0015
/// §12: "latest N entries, where N is the same in-memory
/// cap as today."
const HISTORY_HYDRATION_CAP: usize = 1000;
async fn perform_switch(
session: &mut Session,
req: SwitchRequest,
source: String,
undo_enabled: bool,
) -> Result<(String, bool, crate::mode::Mode), String> {
use crate::persistence::Persistence;
// For SaveAs we need a resolved target path up front
// (so the existence check happens before we drop the
// current project). For NewTemp we'll let create_temp
// pick the path. For Load it's the user-supplied path.
// For Import we inspect the zip and resolve the target
// (auto-suffixing on collision per ADR-0015 §11
// amendment) before touching anything else.
let resolved_target: Option<std::path::PathBuf> = match &req {
SwitchRequest::Load { path } => {
if !path.exists() {
return Err(crate::t!(
"project.load_path_missing",
path = path.display()
));
}
Some(path.clone())
}
SwitchRequest::SaveAs { target } => {
let p = resolve_save_target(target, &session.data_root);
if p.exists() {
return Err(crate::t!(
"project.saveas_target_exists",
path = p.display()
));
}
Some(p)
}
SwitchRequest::NewTemp => None,
SwitchRequest::Import { zip_path, as_target } => {
if !zip_path.exists() {
return Err(crate::t!(
"project.import_zip_missing",
path = zip_path.display()
));
}
// Validate the zip up front so we don't drop the
// current project for an unimportable file.
let inspection = crate::archive::inspect_zip(zip_path)
.map_err(|e| e.to_string())?;
let resolved = resolve_import_destination(
as_target.as_deref(),
&inspection.top_folder,
&session.data_root,
)?;
Some(resolved)
}
};
// For SaveAs: copy current project to the target while
// the source is still on disk (auto-save guarantees its
// state matches the in-memory db).
if let SwitchRequest::SaveAs { .. } = &req {
let src = session.project().path().to_path_buf();
let dst = resolved_target.as_ref().expect("SaveAs has resolved target");
copy_project(&src, dst).map_err(|e| e.to_string())?;
}
// For Import: extract the zip into the resolved target.
// We do this *before* dropping the current project so
// a failure here leaves the user where they were.
if let SwitchRequest::Import { zip_path, .. } = &req {
let dst = resolved_target.as_ref().expect("Import has resolved target");
let inspection = crate::archive::inspect_zip(zip_path)
.map_err(|e| e.to_string())?;
crate::archive::extract_into(zip_path, dst, &inspection.top_folder)
.map_err(|e| e.to_string())?;
}
// Capture cleanup info from the OUTGOING project before
// we drop it: if it was an unmodified empty temp, we
// delete its directory after the switch so the data dir
// doesn't accumulate empty scratch projects.
let outgoing_cleanup_path: Option<std::path::PathBuf> =
session.project.as_ref().and_then(|p| {
p.is_unmodified_temp().then(|| p.path().to_path_buf())
});
// Drop current project + database BEFORE opening the new
// ones, releasing the old lock and stopping the old
// worker. Required for the "load my own current project"
// case (otherwise the new open would see a self-held
// lock on this PID).
let _ = session.database.take();
let _ = session.project.take();
// The outgoing project's lock is now released; it's
// safe to remove its directory if it was unmodified.
// The safely_delete_temp_project helper layers multiple
// guards (containment under data root, [temp] marker,
// contents allowlist, no symlinks) so a bug elsewhere
// can't escalate into deleting the wrong directory.
if let Some(stale) = outgoing_cleanup_path {
match safely_delete_temp_project(&stale, &session.data_root) {
Ok(()) => tracing::info!(
path = %stale.display(),
"cleaned up unmodified temp project on switch",
),
Err(e) => tracing::warn!(
path = %stale.display(),
error = %e,
"did not clean up unmodified temp project on switch",
),
}
}
// Open the destination project. Load / SaveAs / Import
// all open a path that already has the on-disk skeleton
// (either because it pre-existed or because we just put
// it there); NewTemp asks the project module for a fresh
// auto-named one.
let new_project = match &req {
SwitchRequest::Load { .. }
| SwitchRequest::SaveAs { .. }
| SwitchRequest::Import { .. } => {
let path = resolved_target.expect("Load/SaveAs/Import have resolved target");
Project::open(&path).map_err(|e| e.to_string())?
}
SwitchRequest::NewTemp => {
Project::create_temp(&session.data_root).map_err(|e| e.to_string())?
}
};
let new_path = new_project.path().to_path_buf();
// Run any pending project.yaml migrations before the
// database opens. Same registry as `run()`. A failed
// migration aborts the switch (the old project has
// already been dropped — user lands in a "no project"
// state momentarily, but the next user action will
// surface the error and they can retry).
let migrate_registry = crate::persistence::migrations::MigratorRegistry::production();
crate::persistence::migrations::ensure_project_yaml_migrated(
new_project.path(),
&migrate_registry,
)
.map_err(|e| e.to_string())?;
// Open the new database (rebuild from text if .db is
// missing — applies to NewTemp's just-created project,
// and to Load when the user opened a project whose .db
// had been deleted).
let db_path = new_project.db_path();
let db_existed = db_path.exists();
// Restore the switched-to project's stored input mode (ADR-0015
// mode-restore amendment, issue #14): "loading triggers the mode
// switch each time." A switch uses the target's stored mode
// directly — the startup `--mode` override applies only at boot,
// not to subsequent loads. Absent/pre-#14 → default.
let restored_mode = Persistence::read_stored_mode(&new_path).unwrap_or_default();
let persistence = Persistence::new(new_path.clone()).with_mode(restored_mode);
let new_database =
Database::open_with_persistence_and_undo(&db_path, persistence, undo_enabled)
.map_err(|e| e.to_string())?;
if !db_existed
&& let Err(e) = new_database.rebuild_from_text(new_path.clone(), None).await
{
return Err(e.friendly_message());
}
let display_name = new_project.display_name().to_string();
let is_temp = matches!(new_project.kind(), ProjectKind::Temp);
// Worth recording as the resume target? A switch to a fresh
// empty temp (`new`) is not — see the gate in `run()`.
let new_worth_recording = !new_project.is_unmodified_temp();
session.project = Some(new_project);
session.database = Some(new_database);
// Append the user-issued command to the destination's
// history.log. The worker's persistence is wired but not
// directly addressable from here, so we use a fresh
// Persistence handle for this single line.
let _ = Persistence::new(new_path.clone()).append_history(&source);
// Update the resume pointer so the next `--resume` launch
// reopens the project we just switched to — unless it is a
// fresh empty temp (a `new` command), which must not be
// recorded (see the gate in `run()`). Write failures are
// non-fatal.
if new_worth_recording
&& let Err(e) = write_last_project(&session.data_root, &new_path)
{
tracing::warn!(error = %e, "could not update last_project after switch");
}
Ok((display_name, is_temp, restored_mode))
}
/// Resolve the destination directory for an `import`:
///
/// - **No `as <target>`:** use the zip's top-level folder name
/// under `<data-root>/projects/`. Auto-suffix `-NN` on
/// collision (ADR-0015 §11 amendment).
/// - **`as <relative-name>`:** under `<data-root>/projects/`,
/// auto-suffix on collision.
/// - **`as <absolute-path>`:** use the path verbatim. Refuse
/// if it already exists (no auto-suffix on absolute paths —
/// we don't second-guess what the user typed).
fn resolve_import_destination(
as_target: Option<&str>,
zip_top_folder: &str,
data_root: &std::path::Path,
) -> Result<std::path::PathBuf, String> {
if let Some(t) = as_target {
let p = std::path::Path::new(t);
if p.is_absolute() {
if p.exists() {
return Err(format!(
"`{}` already exists; pick a different target",
p.display(),
));
}
return Ok(p.to_path_buf());
}
}
let basename: &str = as_target.unwrap_or(zip_top_folder);
let parent = projects_dir(data_root);
std::fs::create_dir_all(&parent).map_err(|e| e.to_string())?;
let (resolved, _) =
crate::archive::resolve_import_target(&parent, basename).map_err(|e| e.to_string())?;
Ok(resolved)
}
/// Spawn a blocking task to write an export zip and forward
/// the outcome via the event channel.
///
/// The current project's auto-save semantics mean
/// `<project_path>` already reflects every successful command,
/// so the export reads from disk without coordinating with the
/// db worker. The `history.log` entry for this command is
/// appended directly here (we already hold the project path
/// and don't need to wait for the export to finish before
/// recording the user-issued command).
fn spawn_export(
project_path: std::path::PathBuf,
project_name: String,
data_root: std::path::PathBuf,
target: Option<String>,
source: String,
event_tx: mpsc::Sender<AppEvent>,
) {
let _ = crate::persistence::Persistence::new(project_path.clone()).append_history(&source);
tokio::spawn(async move {
let outcome = tokio::task::spawn_blocking(move || {
do_export(&project_path, &project_name, &data_root, target.as_deref())
})
.await;
let event = match outcome {
Ok(Ok(path)) => AppEvent::ExportSucceeded { path },
Ok(Err(e)) => AppEvent::ExportFailed { error: e },
Err(join_err) => AppEvent::ExportFailed {
error: join_err.to_string(),
},
};
let _ = event_tx.send(event).await;
});
}
/// Synchronous body of the export pipeline.
fn do_export(
project_path: &std::path::Path,
project_name: &str,
data_root: &std::path::Path,
target: Option<&str>,
) -> Result<std::path::PathBuf, String> {
let final_path: std::path::PathBuf = match target {
Some(t) => {
let p = std::path::Path::new(t);
if p.is_absolute() {
p.to_path_buf()
} else {
data_root.join(t)
}
}
None => {
std::fs::create_dir_all(data_root).map_err(|e| e.to_string())?;
let (filename, _) =
crate::archive::next_export_sequence(data_root, project_name)
.map_err(|e| e.to_string())?;
data_root.join(filename)
}
};
if final_path.exists() {
return Err(format!(
"`{}` already exists; pick a different name or remove it first",
final_path.display(),
));
}
crate::archive::export_project(project_path, project_name, &final_path)
.map_err(|e| e.to_string())?;
Ok(final_path)
}
/// The basename of `path` as a `String`. Falls back to the
/// full display string when the path has no terminal
/// component (e.g. `/`).
fn directory_basename(path: &std::path::Path) -> String {
path.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| path.display().to_string())
}
/// Resolve a `save as` target path against the data root.
///
/// Absolute paths pass through; relative paths join under
/// `<data-root>/projects/` per the user's stated preference
/// in ADR-0015 §1 ("named projects right alongside the temp
/// ones is the easiest workflow").
fn resolve_save_target(target: &str, data_root: &std::path::Path) -> std::path::PathBuf {
let p = std::path::Path::new(target);
if p.is_absolute() {
p.to_path_buf()
} else {
projects_dir(data_root).join(p)
}
}
async fn seed_initial_tables(database: &Database, event_tx: &mpsc::Sender<AppEvent>) {
match database.list_tables().await {
Ok(tables) => {
let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await;
}
Err(e) => {
error!(error = %e, "failed to seed initial table list");
}
}
refresh_schema_cache(database, event_tx).await;
}
/// Fetch the three identifier lists (tables / columns /
/// relationships) and post them as `SchemaCacheRefreshed`
/// (ADR-0022 §9 + stage 8d). Always sends an event, even on
/// partial failure — best-effort completion is better than
/// no completion. Called wherever `TablesRefreshed` is sent
/// today; the schema cache lives on the App and feeds Tab
/// completion for identifier slots.
async fn refresh_schema_cache(
database: &Database,
event_tx: &mpsc::Sender<AppEvent>,
) {
let cache = build_schema_cache(database).await;
let _ = event_tx.send(AppEvent::SchemaCacheRefreshed(cache)).await;
}
/// Build a `SchemaCache` snapshot from the live database.
///
/// Shared by `refresh_schema_cache` (interactive path — wraps
/// the result in a `SchemaCacheRefreshed` event) and the replay
/// path (which re-snapshots per line because the schema mutates
/// as replayed `create table` / `add column` commands run).
/// Best-effort: a failed query leaves that list empty and the
/// walker falls back to schemaless behaviour.
async fn build_schema_cache(database: &Database) -> crate::completion::SchemaCache {
use crate::completion::{SchemaCache, TableColumn};
use crate::dsl::grammar::IdentSource;
let mut cache = SchemaCache::default();
if let Ok(tables) = database.list_names_for(IdentSource::Tables).await {
cache.tables = tables;
}
if let Ok(columns) = database.list_names_for(IdentSource::Columns).await {
cache.columns = columns;
}
if let Ok(rels) = database.list_names_for(IdentSource::Relationships).await {
cache.relationships = rels;
}
if let Ok(indexes) = database.list_names_for(IdentSource::Indexes).await {
cache.indexes = indexes;
}
// Phase D (ADR-0024 §Phase D): per-table column metadata
// with user-facing types. The walker's
// `DynamicSubgrammar(column_value_list)` reads this to
// unfold typed value slots per column at `insert into T
// values (...)` positions. Best-effort: a `describe_table`
// miss leaves that table's columns unpopulated and the
// walker falls back to the schemaless value-literal list.
for name in cache.tables.clone() {
if let Ok(desc) = database.describe_table(name.clone(), None).await {
// Per-table indexes for the items panel (S2, ADR-0025).
// Carry uniqueness so the panel can mark a UNIQUE index
// (ADR-0035 §4d). Captured before `desc.columns` is
// consumed below.
let index_entries: Vec<crate::completion::IndexEntry> = desc
.indexes
.iter()
.map(|i| crate::completion::IndexEntry {
name: i.name.clone(),
unique: i.unique,
})
.collect();
let cols: Vec<TableColumn> = desc
.columns
.into_iter()
.filter_map(|c| {
c.user_type.map(|ty| TableColumn {
name: c.name,
user_type: ty,
// Only a column the engine reports as NOT NULL
// (ADR-0033 §8.3 "declared NOT NULL"). We do
// NOT treat a PK as implicitly not-null: an
// `int` PK is an `INTEGER PRIMARY KEY` rowid
// alias that auto-fills, so flagging it omitted
// would be a false positive (SQLite reports
// notnull=0 for it anyway). serial/shortid are
// excluded by type in the not_null_missing pass.
not_null: c.notnull,
has_default: c.default.is_some(),
})
})
.collect();
cache.table_columns.insert(name.clone(), cols);
cache.table_indexes.insert(name, index_entries);
}
}
cache
}
/// Read `project.yaml` + `data/` to compute the rebuild
/// summary that the confirmation modal shows. Runs off the
/// event loop so the brief I/O doesn't stall input handling
/// even on slow filesystems.
fn spawn_prepare_rebuild(
project_path: std::path::PathBuf,
event_tx: mpsc::Sender<AppEvent>,
) {
tokio::spawn(async move {
let summary = match summarize_project(&project_path) {
Ok(s) => s,
Err(e) => format!("(could not read project sources: {e})"),
};
let _ = event_tx.send(AppEvent::RebuildPrepared { summary }).await;
});
}
/// Does the project at `project_path` actually have any
/// schema or data?
///
/// "Has content" means at least one table is declared in
/// `project.yaml` OR at least one CSV row exists under
/// `data/`. A brand-new auto-named temp project, having
/// neither, returns `false`. Errors reading the project
/// (corrupt YAML, missing dir) also return `false` —
/// suppressing a misleading "0 tables reconstructed"
/// message for a project we can't read is the right default.
fn project_has_content(project_path: &std::path::Path) -> bool {
let yaml_path = project_path.join(crate::project::PROJECT_YAML);
let Ok(yaml) = std::fs::read_to_string(&yaml_path) else {
return false;
};
let Ok(snapshot) = crate::persistence::parse_schema(&yaml) else {
return false;
};
!snapshot.tables.is_empty()
}
fn summarize_project(project_path: &std::path::Path) -> Result<String, String> {
let yaml_path = project_path.join(crate::project::PROJECT_YAML);
let yaml = std::fs::read_to_string(&yaml_path).map_err(|e| e.to_string())?;
let snapshot = crate::persistence::parse_schema(&yaml).map_err(|e| e.to_string())?;
let table_count = snapshot.tables.len();
let data_dir = project_path.join(crate::project::DATA_DIR);
let mut row_count: usize = 0;
for table in &snapshot.tables {
let csv_path = data_dir.join(format!("{}.csv", table.name));
let Ok(body) = std::fs::read_to_string(&csv_path) else {
continue;
};
// Header line + one line per row (per Iteration 2's
// "no CSV when empty" rule, this is exact).
row_count += body.lines().count().saturating_sub(1);
}
Ok(format!(
"{table_count} table{} and {row_count} row{} will be reconstructed; \
the existing playground.db will be replaced",
if table_count == 1 { "" } else { "s" },
if row_count == 1 { "" } else { "s" },
))
}
/// Spawn the actual rebuild and forward the typed outcome
/// back as an `AppEvent`.
fn spawn_rebuild(
database: Database,
project_path: std::path::PathBuf,
event_tx: mpsc::Sender<AppEvent>,
source: String,
) {
tokio::spawn(async move {
match database
.rebuild_from_text(project_path.clone(), Some(source))
.await
{
Ok(()) => {
let summary = summarize_project(&project_path)
.unwrap_or_else(|_| "rebuild complete".to_string());
let _ = event_tx
.send(AppEvent::RebuildSucceeded { summary })
.await;
// Refresh the table list so the items panel
// reflects whatever the rebuild produced.
if let Ok(tables) = database.list_tables().await {
let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await;
}
refresh_schema_cache(&database, &event_tx).await;
}
Err(DbError::PersistenceFatal {
operation,
path,
message,
}) => {
let _ = event_tx
.send(AppEvent::PersistenceFatal {
operation: operation.to_string(),
path,
message,
})
.await;
}
Err(other) => {
let _ = event_tx
.send(AppEvent::RebuildFailed {
error: other.friendly_message(),
})
.await;
}
}
});
}
/// Peek the snapshot `undo`/`redo` would restore and post
/// `UndoPrepared` (open the confirmation modal) or
/// `UndoUnavailable` (ADR-0006 Amendment 1).
fn spawn_prepare_undo(database: Database, event_tx: mpsc::Sender<AppEvent>, is_redo: bool) {
tokio::spawn(async move {
let peek = if is_redo {
database.peek_redo().await
} else {
database.peek_undo().await
};
let event = match peek {
Ok(Some(meta)) => AppEvent::UndoPrepared {
command: meta.command,
timestamp: meta.timestamp,
is_redo,
},
Ok(None) => AppEvent::UndoUnavailable { is_redo },
Err(e) => AppEvent::UndoFailed {
error: e.friendly_message(),
is_redo,
},
};
let _ = event_tx.send(event).await;
});
}
/// Restore the snapshot `undo`/`redo` selects, then refresh the
/// table list + schema cache so the TUI shows the restored state.
fn spawn_undo(database: Database, event_tx: mpsc::Sender<AppEvent>, is_redo: bool) {
tokio::spawn(async move {
let result = if is_redo {
database.redo().await
} else {
database.undo().await
};
match result {
Ok(Some(meta)) => {
let _ = event_tx
.send(AppEvent::UndoSucceeded {
command: meta.command,
is_redo,
})
.await;
if let Ok(tables) = database.list_tables().await {
let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await;
}
refresh_schema_cache(&database, &event_tx).await;
}
Ok(None) => {
let _ = event_tx.send(AppEvent::UndoUnavailable { is_redo }).await;
}
Err(e) => {
let _ = event_tx
.send(AppEvent::UndoFailed {
error: e.friendly_message(),
is_redo,
})
.await;
}
}
});
}
/// Spawn a task that runs a DSL command against the database
/// and forwards the result back as an `AppEvent`.
fn spawn_dsl_dispatch(
database: Database,
event_tx: mpsc::Sender<AppEvent>,
command: Command,
source: String,
submission_mode: crate::app::EffectiveMode,
) {
tokio::spawn(async move {
// Retain the source for `DslFailed` so the App can journal a
// rejected command as `err` (ADR-0034 §1/§2).
let source_for_journal = source.clone();
// ADR-0038: the DSL → SQL teaching echo fires for a DSL-form
// command submitted in an advanced effective mode (ADR-0037).
// `replay` bypasses this spawn (it calls `execute_command_typed`
// directly), so replayed lines never echo.
//
// Two echo sources converge here. The Bucket A pre-execution
// path (`echo_for` → `command_to_sql`) handles every echo that is
// a pure function of the `Command` — wired into the non-Schema
// arms below (Update / Delete / AddColumn / DropColumn /
// ChangeColumn). The Schema arm uses `build_schema_echo`, which
// subsumes the Bucket A pure-Command schema cases *and* adds the
// Bucket B resolved-name cases that need the post-exec
// description (`add index` / `add relationship`) or a pre-exec
// lookup (`drop index` / `drop relationship` — the dropped thing
// is gone after execution, hence `collect_drop_lookups` runs
// first).
let lookups = collect_echo_lookups(&database, &command, submission_mode).await;
let echo = crate::echo::echo_for(&command, submission_mode);
let outcome = execute_command_typed(&database, command.clone(), source).await;
let event = match outcome {
Ok(CommandOutcome::Schema(description)) => {
let schema_echo = build_schema_echo(
&command,
submission_mode,
description.as_ref(),
&lookups,
);
AppEvent::DslSucceeded {
command: command.clone(),
description,
echo: schema_echo,
}
}
Ok(CommandOutcome::SchemaSkipped(description)) => AppEvent::DslCreateSkipped {
command: command.clone(),
description,
},
Ok(CommandOutcome::SchemaDropSkipped) => AppEvent::DslDropSkipped {
command: command.clone(),
},
Ok(CommandOutcome::SchemaDropIndexSkipped) => AppEvent::DslDropIndexSkipped {
command: command.clone(),
},
Ok(CommandOutcome::SchemaCreateIndexSkipped(name)) => {
AppEvent::DslCreateIndexSkipped {
command: command.clone(),
name,
}
}
Ok(CommandOutcome::Query(data)) => {
// ADR-0038: `show data` is the only DSL-form query that
// echoes; its limited form orders by the table's primary
// key, which is not on the Command — so the echo is built
// post-execution from the schema (handoff §5). A
// SQL-entered `SELECT` (also a Query outcome) has no echo.
let echo = build_show_data_echo(&database, &command, submission_mode).await;
AppEvent::DslDataSucceeded {
command: command.clone(),
data,
echo,
}
}
Ok(CommandOutcome::QueryPlan(plan)) => AppEvent::DslExplainSucceeded {
command: command.clone(),
plan,
},
Ok(CommandOutcome::Insert(result)) => AppEvent::DslInsertSucceeded {
command: command.clone(),
result,
},
Ok(CommandOutcome::Update(result)) => AppEvent::DslUpdateSucceeded {
command: command.clone(),
result,
echo,
},
Ok(CommandOutcome::Delete(result)) => AppEvent::DslDeleteSucceeded {
command: command.clone(),
result,
echo,
},
Ok(CommandOutcome::ChangeColumn(result)) => {
// ADR-0038 §6 category 3 caveat: `--dont-convert` skips
// the client-side layer entirely, so the headline echo
// (`ALTER TABLE … SET DATA TYPE …`) is the *nearest*
// SQL but not equivalent. The caveat only makes sense
// next to the echo, so gate on advanced mode (the line
// references "the line above").
let dont_convert_caveat = submission_mode.is_advanced()
&& matches!(
&command,
Command::ChangeColumnType {
mode: ChangeColumnMode::DontConvert,
..
}
);
AppEvent::DslChangeColumnSucceeded {
command: command.clone(),
result,
echo,
dont_convert_caveat,
}
}
Ok(CommandOutcome::AddColumn(result)) => AppEvent::DslAddColumnSucceeded {
command: command.clone(),
result,
echo,
},
Ok(CommandOutcome::DropColumn(result)) => {
// `drop column --cascade` is the only DropColumn shape
// whose echo needs the execution result (the names of
// the covering indexes the rebuild removed — Bucket B
// category 2, ADR-0038 §7 Slice 2b). Non-cascade falls
// through to the pre-execution `echo` from `echo_for`.
let cascade_echo = build_drop_column_cascade_echo(
&command,
submission_mode,
&result,
);
AppEvent::DslDropColumnSucceeded {
command: command.clone(),
result,
echo: cascade_echo.or(echo),
}
}
Err(DbError::PersistenceFatal {
operation,
path,
message,
}) => AppEvent::PersistenceFatal {
operation: operation.to_string(),
path,
message,
},
Err(error) => {
// Schema-resolved enrichment per ADR-0019 §6.
// The runtime owns DB access; the App stays
// presentation-only.
let facts = enrich_dsl_failure(&database, &command, &error).await;
AppEvent::DslFailed {
command: command.clone(),
error,
facts,
source: source_for_journal,
}
}
};
if event_tx.send(event).await.is_err() {
return;
}
// Refresh the table list after every DDL operation so
// the items panel reflects reality. A failed list_tables
// here is logged but not surfaced to the user — they
// already saw the primary outcome.
match database.list_tables().await {
Ok(tables) => {
let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await;
}
Err(e) => warn!(error = %e, "post-list_tables failed"),
}
// Refresh the schema cache feeding Tab completion
// (ADR-0022 §9). Same timing as TablesRefreshed.
refresh_schema_cache(&database, &event_tx).await;
});
}
/// Build the `show data` DSL → SQL teaching echo (ADR-0038).
///
/// `show data` is the one Bucket A row whose echo needs schema info beyond
/// the `Command`: the limited form (`show data T limit n`) orders by the
/// table's primary key for a stable "first n" (the worker's
/// `build_query_data_sql`), and that column list is not on the `Command`.
/// So when — and only when — the query is limited, this resolves the
/// primary key via `describe_table` (the same best-effort schema lookup
/// `enrich_dsl_failure` uses) and feeds it to [`crate::echo::echo_for_query`].
///
/// Silent in simple mode (gated before any lookup) and for a SQL-entered
/// `SELECT` (not a `ShowData`). A describe failure or a primary-key-less
/// table simply drops the `ORDER BY`, exactly as the worker does.
async fn build_show_data_echo(
database: &Database,
command: &Command,
submission_mode: crate::app::EffectiveMode,
) -> Option<Vec<String>> {
if !submission_mode.is_advanced() {
return None;
}
// The primary key is needed only for the `ORDER BY` of a limited query;
// skip the lookup otherwise so the common case stays round-trip-free.
let primary_key = match command {
Command::ShowData {
name,
limit: Some(_),
..
} => database
.describe_table(name.clone(), None)
.await
.map(|desc| {
desc.columns
.iter()
.filter(|c| c.primary_key)
.map(|c| c.name.clone())
.collect::<Vec<_>>()
})
.unwrap_or_default(),
_ => Vec::new(),
};
crate::echo::echo_for_query(command, submission_mode, &primary_key)
}
/// Pre-execution lookups captured for the teaching echo (ADR-0038 §7
/// Bucket B).
///
/// Two classes of echo need information that the `Command` alone doesn't
/// carry and that may not be recoverable from the post-execution
/// `description`:
///
/// - **Drops** of resolved-name things (`drop index` positional,
/// `drop relationship`): the thing is *gone* post-execution, so the
/// runtime resolves the name (and for `drop relationship Named`, the
/// owning child table) **before** calling the worker.
/// - **`add relationship --create-fk`**: the multi-line echo (category
/// 2, Slice 2b) emits an `ADD COLUMN` line *only when the child column
/// was newly created*; the runtime resolves both the pre-state
/// (existed?) and the new column type (from the parent's PK via
/// `Type::fk_target_type`) up front, so the post-exec builder is a
/// pure formatter.
///
/// Empty (`None` on each field) in simple mode, or when the command does
/// not need a lookup, or when the lookup didn't find anything (defensive
/// — the executor will then refuse with its own error and the echo
/// simply doesn't fire).
#[derive(Default)]
struct EchoLookups {
/// For `Command::DropIndex { IndexSelector::Columns }` — the resolved
/// index name. The positional form `drop index on T(cols)` reaches
/// this; the SQL `DROP INDEX <name>` is `Command::SqlDropIndex` and
/// is already SQL (no echo).
drop_index_name: Option<String>,
/// For `Command::DropRelationship` — `(resolved name, child_table)`.
/// For `Endpoints`, name is resolved + child_table is from the
/// command (captured here uniformly so the post-exec builder uses one
/// shape). For `Named`, name is from the command + child_table is
/// resolved via a scan of user tables (small schemas — fine for a
/// teaching playground).
drop_relationship: Option<(String, String)>,
/// For `Command::AddRelationship { create_fk: true, .. }` — the
/// type of the child column the `--create-fk` flag will create, *if*
/// the column did not already exist (`Some(ty)` → newly created →
/// multi-line echo; `None` → already existed → single-line echo).
/// The type is derived from the parent's PK column type via
/// `Type::fk_target_type` (ADR-0011: `serial → int`, `shortid →
/// text`, others identity). The outer `Option` is `None` for
/// not-applicable commands (not a `--create-fk` add, or simple mode,
/// or a pre-execution lookup failed); the inner option encodes the
/// existed-vs-created distinction.
add_rel_create_fk_new_column_type: Option<Option<crate::dsl::types::Type>>,
}
/// Resolve drop-target names and `--create-fk` pre-state **before**
/// execution, for the Bucket B echoes that need them (ADR-0038 §7).
/// Best-effort: an unresolved lookup yields `None` and the echo for that
/// command silently doesn't fire — the executor's own error path
/// surfaces any real problem.
async fn collect_echo_lookups(
database: &Database,
command: &Command,
submission_mode: crate::app::EffectiveMode,
) -> EchoLookups {
let mut out = EchoLookups::default();
if !submission_mode.is_advanced() {
return out;
}
match command {
Command::DropIndex {
selector: IndexSelector::Columns { table, columns },
} => {
if let Ok(desc) = database.describe_table(table.clone(), None).await
&& let Some(idx) = desc.indexes.iter().find(|i| i.columns == *columns)
{
out.drop_index_name = Some(idx.name.clone());
}
}
Command::DropRelationship {
selector:
RelationshipSelector::Endpoints {
parent_table,
parent_column,
child_table,
child_column,
},
} => {
if let Ok(desc) = database.describe_table(child_table.clone(), None).await
&& let Some(rel) = desc.outbound_relationships.iter().find(|r| {
r.other_table == *parent_table
&& r.other_column == *parent_column
&& r.local_column == *child_column
})
{
out.drop_relationship = Some((rel.name.clone(), child_table.clone()));
}
}
Command::DropRelationship {
selector: RelationshipSelector::Named { name },
} => {
// The named selector doesn't carry the child table — the
// worker resolves it from the relationships metadata. Mirror
// that with a small scan of user tables. For a teaching
// playground (small schemas) this is cheap; a dedicated
// resolver API would be the next step if schemas grow.
if let Ok(tables) = database.list_tables().await {
for table in tables {
if let Ok(desc) = database.describe_table(table.clone(), None).await
&& desc.outbound_relationships.iter().any(|r| r.name == *name)
{
out.drop_relationship = Some((name.clone(), table.clone()));
break;
}
}
}
}
Command::AddRelationship {
create_fk: true,
parent_table,
parent_column,
child_table,
child_column,
..
} => {
// Two pre-state facts feed the multi-line `--create-fk` echo
// (ADR-0038 §7 Bucket B, category 2): whether the child
// column already exists (determines single- vs multi-line)
// and the parent PK column's user type (determines the
// newly-created child column's type via
// `Type::fk_target_type`). Both are looked up post-exec from
// the description for `add relationship` (no `--create-fk`),
// but the `--create-fk` multi-line case needs them *before*
// execution to know whether to emit an `ADD COLUMN` line.
let parent_pk_type = database
.describe_table(parent_table.clone(), None)
.await
.ok()
.and_then(|d| {
d.columns
.iter()
.find(|c| c.name == *parent_column)
.and_then(|c| c.user_type)
});
let child_column_existed = database
.describe_table(child_table.clone(), None)
.await
.ok()
.map(|d| d.columns.iter().any(|c| c.name == *child_column));
if let (Some(parent_ty), Some(existed)) = (parent_pk_type, child_column_existed) {
out.add_rel_create_fk_new_column_type = Some(if existed {
None
} else {
Some(parent_ty.fk_target_type())
});
}
}
_ => {}
}
out
}
/// Build the teaching echo for a `Schema`-outcome command (ADR-0038).
///
/// Subsumes both the Bucket A pure-`Command` echoes (`create table`,
/// `rename column`, `add`/`drop constraint` — for which it delegates to
/// `echo::command_to_sql`) **and** the Bucket B resolved-name echoes
/// (`add`/`drop index`, `add`/`drop relationship`), which read the
/// post-execution `description` (for adds) or `drop_lookups` (for drops).
/// Returns `None` for non-advanced mode, for Bucket C / `Sql*` variants
/// that don't echo, and for the `--create-fk` form (Slice 2b — Phase 2
/// next slice).
fn build_schema_echo(
command: &Command,
submission_mode: crate::app::EffectiveMode,
description: Option<&TableDescription>,
lookups: &EchoLookups,
) -> Option<Vec<String>> {
if !submission_mode.is_advanced() {
return None;
}
match command {
Command::AddIndex {
name,
table,
columns,
} => {
// The post-exec description carries the new index with its
// stored name (user-given `as N` or worker auto-generated).
// Always sourcing from the description (rather than command
// when `name = Some`) keeps the runtime in one path and
// matches whatever the worker actually wrote.
let resolved = description
.and_then(|d| d.indexes.iter().find(|i| i.columns == *columns))
.map(|i| i.name.clone())
.or_else(|| name.clone());
resolved.map(|n| vec![crate::echo::render_create_index(&n, table, columns)])
}
Command::DropIndex {
selector: IndexSelector::Columns { .. },
} => lookups
.drop_index_name
.as_ref()
.map(|n| vec![crate::echo::render_drop_index(n)]),
Command::AddRelationship {
name,
parent_table,
parent_column,
child_table,
child_column,
on_delete,
on_update,
create_fk,
} => {
// Resolve the relationship name from the parent's inbound
// relationships (target_table for AddRelationship is the
// parent — `database.add_relationship` returns the parent's
// description per ADR-0013), falling back to the command's
// explicit `name` when the description is unavailable.
let resolved = description
.and_then(|d| {
d.inbound_relationships.iter().find(|r| {
r.other_table == *child_table
&& r.other_column == *child_column
&& r.local_column == *parent_column
})
})
.map(|r| r.name.clone())
.or_else(|| name.clone())?;
if *create_fk {
// Multi-line iff the child column was newly created
// (`--create-fk`'s pre-state, captured pre-execution
// into `add_rel_create_fk_new_column_type`). When the
// column already existed the echo collapses to the
// single-line FK form — the SQL `ADD COLUMN` would be
// a no-op-with-error otherwise, and the catalogue is
// explicit: "one line if the column already existed".
Some(lookups.add_rel_create_fk_new_column_type?.map_or_else(
|| {
vec![crate::echo::render_add_relationship(
&resolved,
parent_table,
parent_column,
child_table,
child_column,
*on_delete,
*on_update,
)]
},
|new_ty| {
crate::echo::render_add_relationship_create_fk(
&resolved,
parent_table,
parent_column,
child_table,
child_column,
*on_delete,
*on_update,
new_ty,
)
},
))
} else {
Some(vec![crate::echo::render_add_relationship(
&resolved,
parent_table,
parent_column,
child_table,
child_column,
*on_delete,
*on_update,
)])
}
}
Command::DropRelationship { .. } => lookups
.drop_relationship
.as_ref()
.map(|(name, child_table)| {
vec![crate::echo::render_drop_relationship(name, child_table)]
}),
// Everything else (Bucket A pure-Command, plus the no-echo Bucket C
// variants like `Sql*` / `ShowTable`) routes through the existing
// `echo::command_to_sql` — wrapping its `Option<String>` to fit the
// multi-line `Option<Vec<String>>` payload uniformly.
_ => crate::echo::command_to_sql(command).map(|s| vec![s]),
}
}
/// Build the `drop column --cascade` multi-line teaching echo (ADR-0038
/// §7 Bucket B, category 2). Returns `None` for non-`--cascade` drops
/// (the pre-execution `echo_for` already produced the single-line plain
/// `DROP COLUMN` echo for Bucket A) and for simple mode. Reads
/// `DropColumnResult::dropped_indexes` for the index names the rebuild
/// removed.
fn build_drop_column_cascade_echo(
command: &Command,
submission_mode: crate::app::EffectiveMode,
result: &DropColumnResult,
) -> Option<Vec<String>> {
if !submission_mode.is_advanced() {
return None;
}
match command {
Command::DropColumn {
table,
column,
cascade: true,
} => Some(crate::echo::render_drop_column_cascade(
table,
column,
&result.dropped_indexes,
)),
_ => None,
}
}
/// Build schema-resolved enrichment for a DSL failure (ADR-0019 §6).
///
/// Best-effort: every lookup is independently fallible and a
/// missing piece just leaves the corresponding
/// `FailureContext` field `None`. The translator falls back to
/// catalog `{name}` placeholders for unfilled fields.
///
/// What we resolve, by classification:
///
/// - **UNIQUE / NOT NULL violation** (engine reports `T.col`):
/// - `table`, `column` from the engine message.
/// - `value` from the originating Command (explicit columns
/// or single-value short form, with schema lookup as a
/// last resort for natural-order multi-value INSERT).
/// - For UNIQUE only: `diagnostic_table` from a pinpoint
/// `SELECT * FROM T WHERE col = value LIMIT N` showing
/// the existing row that conflicts.
/// - **FK INSERT/UPDATE** (child-side): outbound relationship
/// lookup picks the FK column the user set; resolves
/// `parent_table`, `parent_column`, and the attempted
/// `value`.
/// - **FK DELETE/UPDATE** (parent-side): inbound relationship
/// lookup picks a `child_table` that references this row.
/// - Anything else: `FailureContext::default()`.
pub async fn enrich_dsl_failure(
database: &Database,
command: &Command,
error: &DbError,
) -> crate::friendly::FailureContext {
let DbError::Sqlite { message, .. } = error else {
return crate::friendly::FailureContext::default();
};
let lower = message.to_ascii_lowercase();
if lower.contains("unique constraint failed") {
enrich_unique_violation(database, command, message).await
} else if lower.contains("not null constraint failed") {
enrich_not_null_violation(command, message)
} else if lower.contains("check constraint failed") {
enrich_check_violation(database, command, message).await
} else if lower.contains("foreign key constraint failed") {
enrich_fk_violation(database, command).await
} else {
crate::friendly::FailureContext::default()
}
}
/// Enrich a `CHECK` violation (ADR-0029 §10). The engine
/// reports `CHECK constraint failed: <column>` — the column the
/// constraint sits on, unqualified. We pair that with the
/// command's table, the value the user supplied for the column,
/// and the column's compiled `CHECK` expression so the friendly
/// error can name the rule that was broken.
async fn enrich_check_violation(
database: &Database,
command: &Command,
message: &str,
) -> crate::friendly::FailureContext {
let mut facts = crate::friendly::FailureContext::default();
let Some((_, after)) = message.split_once(':') else {
return facts;
};
let column = after.trim();
let table = command.target_table();
if column.is_empty() || table.is_empty() {
return facts;
}
facts.column = Some(column.to_string());
facts.table = Some(table.to_string());
// The value the user supplied for the constrained column.
facts.value = user_value_for_column_with_schema(database, command, table, column)
.await
.map(|v| v.to_string());
// The rule itself — the column's compiled CHECK expression.
if let Ok(desc) = database.describe_table(table.to_string(), None).await
&& let Some(col) = desc.columns.iter().find(|c| c.name == column)
{
facts.check_rule.clone_from(&col.check);
}
facts
}
async fn enrich_unique_violation(
database: &Database,
command: &Command,
message: &str,
) -> crate::friendly::FailureContext {
let mut facts = crate::friendly::FailureContext::default();
let Some((table, column)) = parse_qualified_target(message) else {
return facts;
};
facts.table = Some(table.clone());
facts.column = Some(column.clone());
// Resolve the user's attempted value.
let raw_value = user_value_for_column_with_schema(database, command, &table, &column).await;
facts.value = raw_value.as_ref().map(ToString::to_string);
// Pinpoint the existing conflicting row, capped per
// ADR-0017 §7's `DIAGNOSTIC_ROW_CAP` (we use a tighter cap
// here — a single conflicting row is the typical case for
// UNIQUE since the constraint enforces it).
if let Some(value) = raw_value
&& let Ok(data) = database
.find_rows_matching(table.clone(), column.clone(), value, 5)
.await
&& !data.rows.is_empty()
{
facts.diagnostic_table = Some(diagnostic_from_data_result(&data));
}
facts
}
fn enrich_not_null_violation(
command: &Command,
message: &str,
) -> crate::friendly::FailureContext {
let mut facts = crate::friendly::FailureContext::default();
let Some((table, column)) = parse_qualified_target(message) else {
return facts;
};
facts.table = Some(table);
facts.column = Some(column);
// The "attempted value" for NOT NULL is by definition null —
// surfacing it doesn't add information. Skip the value
// resolution; the catalog headline reads "X cannot be null"
// and stands on its own.
let _ = command;
facts
}
async fn enrich_fk_violation(
database: &Database,
command: &Command,
) -> crate::friendly::FailureContext {
let mut facts = crate::friendly::FailureContext::default();
match command {
Command::Insert { table, .. } | Command::Update { table, .. } => {
// Child-side: outbound FK lookup. Find the FK
// column the user is setting / updating. Use the
// schema-aware lookup so natural-order multi-value
// INSERT (which `user_value_for_column` alone can't
// resolve) gets handled too.
let Ok((outbound, _)) =
database.read_relationships(table.clone()).await
else {
return facts;
};
facts.table = Some(table.clone());
for rel in outbound {
let value = user_value_for_column_with_schema(
database,
command,
table,
&rel.local_column,
)
.await;
if let Some(v) = value {
facts.column = Some(rel.local_column);
facts.parent_table = Some(rel.other_table);
facts.parent_column = Some(rel.other_column);
facts.value = Some(v.to_string());
break;
}
}
// For UPDATE, if no outbound match was found we may
// be in the parent-side case (updating a column
// children reference). Check inbound as a fallback.
if facts.parent_table.is_none()
&& matches!(command, Command::Update { .. })
&& let Ok((_, inbound)) =
database.read_relationships(table.clone()).await
&& let Some(rel) = inbound.first()
{
facts.child_table = Some(rel.other_table.clone());
}
}
Command::Delete { table, .. } => {
// Parent-side: inbound FK lookup. Surface a child
// table that still references the row(s) being
// deleted.
let Ok((_, inbound)) =
database.read_relationships(table.clone()).await
else {
return facts;
};
facts.table = Some(table.clone());
if let Some(rel) = inbound.first() {
facts.child_table = Some(rel.other_table.clone());
}
}
_ => {}
}
facts
}
/// Find the user's attempted value for `column` directly from
/// the originating Command. Handles INSERT (explicit columns,
/// single-value short form) and UPDATE (assignments). Returns
/// `None` for natural-order multi-value INSERT — that case
/// needs a schema lookup, see
/// [`user_value_for_column_with_schema`].
fn user_value_for_column(command: &Command, column: &str) -> Option<crate::dsl::Value> {
match command {
Command::Insert {
columns, values, ..
} => {
if let Some(cols) = columns {
let idx = cols.iter().position(|c| c == column)?;
values.get(idx).cloned()
} else if values.len() == 1 {
values.first().cloned()
} else {
None
}
}
Command::Update { assignments, .. } => assignments
.iter()
.find(|(c, _)| c == column)
.map(|(_, v)| v.clone()),
// ADR-0036 Phase 1: a single-row literal SQL INSERT retains its
// captured literals, so a constraint error can name the real
// value. Explicit column list resolves here (sync, no schema);
// the natural-order (no-column-list) form is resolved in
// `user_value_for_column_with_schema` (it needs the schema to map
// position → column). Multi-row is skipped (which row conflicted
// is ambiguous) — it degrades to the neutral "that value".
Command::SqlInsert {
listed_columns,
literal_rows,
..
} if literal_rows.len() == 1 && !listed_columns.is_empty() => {
let idx = listed_columns.iter().position(|c| c == column)?;
literal_rows[0].get(idx).cloned().flatten()
}
// ADR-0036 Phase 2: a SQL UPDATE retains its captured `SET`
// literals, so a constraint error can name the real value.
// Assignments are explicitly named, so (unlike SqlInsert) there is
// no positional/multi-row ambiguity — mirror the DSL `Update` case.
Command::SqlUpdate { set_literals, .. } => set_literals
.iter()
.find(|(c, _)| c == column)
.and_then(|(_, v)| v.clone()),
_ => None,
}
}
/// Same as [`user_value_for_column`] but handles natural-order
/// multi-value INSERT by reading the schema to learn which
/// position belongs to which column. Mirrors `do_insert`'s
/// position-mapping rule (auto-generated columns — serial,
/// shortid — are skipped, since the user doesn't supply
/// values for them in the natural-order short form).
async fn user_value_for_column_with_schema(
database: &Database,
command: &Command,
table: &str,
column: &str,
) -> Option<crate::dsl::Value> {
if let Some(v) = user_value_for_column(command, column) {
return Some(v);
}
if let Command::Insert {
columns: None,
values,
..
} = command
{
let desc = database
.describe_table(table.to_string(), None)
.await
.ok()?;
// Build the natural-order column list the same way
// `do_insert` does: filter out serial / shortid columns
// because the engine auto-fills them and the user's
// positional values map onto the remainder.
let natural_cols: Vec<&str> = desc
.columns
.iter()
.filter(|c| {
!matches!(
c.user_type,
Some(crate::dsl::Type::Serial)
| Some(crate::dsl::Type::ShortId)
)
})
.map(|c| c.name.as_str())
.collect();
let idx = natural_cols.iter().position(|c| *c == column)?;
return values.get(idx).cloned();
}
// ADR-0036 Phase 1 follow-up: a no-column-list (natural-order) SQL
// INSERT also names the offending value. Each VALUES position maps to
// the schema's columns in declaration order — and unlike the DSL
// `Insert` short form above, ALL columns are mapped (advanced-mode
// Form B auto-fills nothing, so the user supplies a value for every
// column; `do_sql_insert` validates against the same full mapping).
// Single-row only — which row of a multi-row insert conflicted is
// ambiguous, so it degrades to the neutral "that value".
if let Command::SqlInsert {
listed_columns,
literal_rows,
..
} = command
&& listed_columns.is_empty()
&& literal_rows.len() == 1
{
let desc = database
.describe_table(table.to_string(), None)
.await
.ok()?;
let idx = desc.columns.iter().position(|c| c.name == column)?;
return literal_rows[0].get(idx).cloned().flatten();
}
None
}
/// Render a `DataResult` as a `DiagnosticTable` for the
/// friendly-error layer's bordered renderer (ADR-0019 §7,
/// reusing ADR-0017 §7's renderer).
fn diagnostic_from_data_result(
data: &DataResult,
) -> crate::friendly::DiagnosticTable {
use crate::output_render::{numeric_alignment_for, Alignment};
let alignments: Vec<Alignment> = data
.column_types
.iter()
.map(|t| {
t.map_or(Alignment::Left, numeric_alignment_for)
})
.collect();
let rows: Vec<Vec<String>> = data
.rows
.iter()
.map(|r| {
r.iter()
.map(|c| c.clone().unwrap_or_else(|| "NULL".to_string()))
.collect()
})
.collect();
crate::friendly::DiagnosticTable {
headers: data.columns.clone(),
rows,
alignments,
}
}
/// Extract `(table, column)` from a qualified target like
/// `"UNIQUE constraint failed: T.col"`. Mirrors the helper in
/// `friendly::translate` (the translator does its own parse as
/// a fallback when enrichment didn't run).
fn parse_qualified_target(message: &str) -> Option<(String, String)> {
let after = message.split_once(':').map(|(_, r)| r.trim())?;
let first = after.split(',').next()?.trim();
let mut parts = first.splitn(2, '.');
let table = parts.next()?.trim();
let column = parts.next()?.trim();
if table.is_empty() || column.is_empty() {
None
} else {
Some((table.to_string(), column.to_string()))
}
}
enum CommandOutcome {
Schema(Option<TableDescription>),
/// A SQL `CREATE TABLE IF NOT EXISTS` that matched an existing
/// table — a no-op (ADR-0035 §4). Carries the existing structure
/// so the App can render it alongside the "already exists —
/// skipped" note.
SchemaSkipped(TableDescription),
/// A SQL `DROP TABLE IF EXISTS` that matched no table — a no-op
/// (ADR-0035 §4, 4c). Carries no structure (there is none); the App
/// renders the "doesn't exist — skipped" note from the command.
SchemaDropSkipped,
/// A SQL `DROP INDEX IF EXISTS` that matched no index — a no-op
/// (ADR-0035 §4d). The App renders the "doesn't exist — skipped"
/// note from the command's index name.
SchemaDropIndexSkipped,
/// A SQL `CREATE INDEX IF NOT EXISTS` that matched an existing index
/// name — a no-op (ADR-0035 §4d). Carries the resolved index name
/// (the auto-name is unknown to the command) for the "already exists
/// — skipped" note.
SchemaCreateIndexSkipped(String),
Query(DataResult),
QueryPlan(QueryPlan),
Insert(InsertResult),
Update(UpdateResult),
Delete(DeleteResult),
ChangeColumn(ChangeColumnTypeResult),
AddColumn(AddColumnResult),
DropColumn(DropColumnResult),
}
/// Spawn a task that reads a script file and dispatches each
/// non-blank, non-comment line through the same `execute_command_typed`
/// pipeline as interactive input.
///
/// `path` is the literal user-typed argument; relative paths are
/// resolved against `project_root`. On the first per-line failure
/// the task posts `AppEvent::ReplayFailed` carrying the line
/// number and stops — previously dispatched commands stay applied
/// (no rollback). On clean completion the task posts
/// `AppEvent::ReplayCompleted` with the count of commands that
/// were actually run (blanks/comments excluded).
///
/// The replay invocation itself is NOT written to `history.log`
/// (that happens at the App level, where Replay is dispatched as
/// `Action::Replay` rather than `Action::ExecuteDsl`); only the
/// individual sub-commands land there, since they are what
/// actually mutate state.
fn spawn_replay(
database: Database,
project_root: PathBuf,
path: String,
event_tx: mpsc::Sender<AppEvent>,
) {
tokio::spawn(async move {
let events = run_replay(&database, &project_root, &path).await;
for event in events {
if event_tx.send(event).await.is_err() {
return;
}
}
// Refresh the table list once at the end — every command
// dispatched through `execute_command_typed` may have
// altered the schema, but we don't want to flicker the
// panel mid-replay (and the table list is a derived view
// anyway).
match database.list_tables().await {
Ok(tables) => {
let _ = event_tx.send(AppEvent::TablesRefreshed(tables)).await;
}
Err(e) => warn!(error = %e, "post-replay list_tables failed"),
}
refresh_schema_cache(&database, &event_tx).await;
});
}
/// Inner replay loop, separated from the spawn wrapper so tests
/// can exercise the file-iteration / parse / dispatch logic
/// without an mpsc channel and a spawned task.
///
/// Returns the `AppEvent`s that the spawn wrapper would emit, in
/// order. Always returns at least one terminal event
/// (`ReplayCompleted`, `ReplayFailed`, or `PersistenceFatal`).
pub async fn run_replay(
database: &Database,
project_root: &std::path::Path,
path: &str,
) -> Vec<AppEvent> {
let mut events: Vec<AppEvent> = Vec::new();
let resolved = resolve_replay_path(project_root, path);
let body = match tokio::fs::read_to_string(&resolved).await {
Ok(b) => b,
Err(e) => {
events.push(AppEvent::ReplayFailed {
path: path.to_string(),
line_number: 0,
command: String::new(),
error: crate::t!(
"replay.error_could_not_open",
path = resolved.display(),
detail = e
),
});
return events;
}
};
let mut count: usize = 0;
let mut warnings: Vec<String> = Vec::new();
for (idx, raw) in body.lines().enumerate() {
let line_number = idx + 1;
let trimmed = raw.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
// ADR-0034 §3: a journal record (`<ts>|<status>|<source>`)
// contributes its extracted source when `ok` and is skipped
// otherwise; any other line is a bare command run verbatim.
// This is what makes `replay history.log` work without
// breaking hand-written `.commands` scripts.
let command_text = match crate::persistence::classify_replay_line(trimmed) {
crate::persistence::ReplayLine::Skip => continue,
crate::persistence::ReplayLine::Run(text) => text,
};
// ADR-0034 Amendment 1: classify by entry word BEFORE parsing.
// App-lifecycle commands and a nested `replay` are skipped
// during replay — they are session navigation, not schema/data
// reconstruction (and the worker dispatch cannot run them).
// Classifying by the leading word handles the modal forms
// (`save as` / `load` / `new`) and any incomplete app form
// uniformly: ALL such skips continue, never aborting the
// replay. `import` and a nested `replay` warn, because skipping
// them can leave the replayed state incomplete (imported data /
// the nested file's commands are not reconstructed); the rest
// skip silently. Skipping a nested `replay` also closes the
// infinite-loop footgun by construction.
let entry = command_text
.split_whitespace()
.next()
.unwrap_or("")
.to_ascii_lowercase();
if entry == "import" {
warnings.push(crate::t!(
"replay.skipped_import",
line = line_number,
command = command_text
));
continue;
}
if entry == "replay" {
warnings.push(crate::t!(
"replay.skipped_replay",
line = line_number,
command = command_text
));
continue;
}
if is_app_lifecycle_entry_word(&entry) {
continue;
}
// A schema/data write (or read, or a genuine typo). Parse with
// the live schema — re-snapshotted every line because earlier
// replayed commands mutate it (so Phase D typed-slot rejections
// fire at parse time, matching the interactive path). A parse
// failure here is a genuine malformed command (not an app
// command, which was skipped above) — report it with the line
// number and stop.
let schema = build_schema_cache(database).await;
let command = match crate::dsl::parser::parse_command_with_schema(
&command_text, &schema,
) {
Ok(c) => c,
Err(e) => {
events.push(AppEvent::ReplayFailed {
path: path.to_string(),
line_number,
command: command_text.clone(),
error: crate::t!("replay.error_parse", detail = e),
});
return events;
}
};
// Dispatch through the same path as interactive input so
// per-command persistence (history.log, project.yaml,
// CSVs) fires as if the user had typed each line. The
// source re-journalled is the *extracted* command, not the
// raw `<ts>|ok|…` record (ADR-0034 §3).
// Retain a clone for failure enrichment (the command is moved into
// dispatch). ADR-0035 Amendment 1, F2 follow-up.
let command_for_ctx = command.clone();
let outcome =
execute_command_typed(database, command, command_text.clone()).await;
match outcome {
Ok(_) => {
count += 1;
}
Err(DbError::PersistenceFatal {
operation,
path: pf_path,
message,
}) => {
// Persistence-fatal escalates through the existing
// fatal-banner channel; the runtime tears down on
// it, so further replay progress is moot.
events.push(AppEvent::PersistenceFatal {
operation: operation.to_string(),
path: pf_path,
message,
});
return events;
}
Err(e) => {
// Enrich like the interactive path (ADR-0019 §6) so a
// replayed failing command shows the real table/column/
// value instead of a contextless, `{name}`-leaking message
// (ADR-0035 Amendment 1, F2 follow-up). Verbose to match
// the prior `friendly_message()` rendering.
let facts = enrich_dsl_failure(database, &command_for_ctx, &e).await;
let ctx = crate::app::App::translate_context_for(
&command_for_ctx,
facts,
crate::friendly::Verbosity::default(),
);
events.push(AppEvent::ReplayFailed {
path: path.to_string(),
line_number,
command: command_text.clone(),
error: crate::friendly::translate_error(&e, &ctx).render(),
});
return events;
}
}
}
events.push(AppEvent::ReplayCompleted {
path: path.to_string(),
count,
warnings,
});
events
}
/// True when `entry` (a lowercased leading command word) is an
/// app-lifecycle command that replay skips (ADR-0034 Amendment 1).
/// `import` and `replay` are handled separately by the caller
/// (they warn); this is the silent-skip set. Mirrors the
/// `AppCommand` entry words (see `src/completion.rs`'s
/// `empty_input_offers_app_command_entry_keywords`); both must list
/// the same words. A new `AppCommand` must be added here so replay
/// skips it rather than aborting. `q` is intentionally absent: it is
/// not a recognised command, so a `q` line is a genuine error.
fn is_app_lifecycle_entry_word(entry: &str) -> bool {
matches!(
entry,
"save"
| "load"
| "new"
| "export"
| "mode"
| "messages"
| "rebuild"
| "help"
| "quit"
| "undo"
| "redo"
)
}
/// Resolve a `replay <path>` argument: absolute paths pass
/// through unchanged; relative paths are joined under the active
/// project's root so `replay history.log` works without ceremony
/// from inside any project.
fn resolve_replay_path(project_root: &std::path::Path, path: &str) -> PathBuf {
let p = PathBuf::from(path);
if p.is_absolute() {
p
} else {
project_root.join(p)
}
}
/// Execute a parsed user command and return either a typed
/// `CommandOutcome` or the raw `DbError`. Keeping the typed
/// error here lets us distinguish persistence-fatal failures
/// from ordinary user errors at dispatch time (ADR-0015 §8).
async fn execute_command_typed(
database: &Database,
command: Command,
source: String,
) -> Result<CommandOutcome, DbError> {
let src = Some(source);
match command {
Command::CreateTable {
name,
columns,
primary_key,
} => database
.create_table(name, columns, primary_key, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::SqlCreateTable {
name,
columns,
primary_key,
unique_constraints,
check_constraints,
foreign_keys,
if_not_exists,
} => database
.sql_create_table(
name,
columns,
primary_key,
unique_constraints,
check_constraints,
foreign_keys,
if_not_exists,
src,
)
.await
.map(|outcome| match outcome {
CreateOutcome::Created(d) => CommandOutcome::Schema(Some(d)),
CreateOutcome::Skipped(d) => CommandOutcome::SchemaSkipped(d),
}),
Command::DropTable { name } => database
.drop_table(name, src)
.await
.map(|()| CommandOutcome::Schema(None)),
Command::SqlDropTable { name, if_exists } => database
.sql_drop_table(name, if_exists, src)
.await
.map(|outcome| match outcome {
DropOutcome::Dropped => CommandOutcome::Schema(None),
DropOutcome::Skipped => CommandOutcome::SchemaDropSkipped,
}),
Command::AddColumn {
table,
column,
ty,
not_null,
unique,
default,
check,
} => database
.add_column(
table,
ColumnSpec {
name: column,
ty,
not_null,
unique,
default,
check,
default_sql: None,
check_sql: None,
},
src,
)
.await
.map(CommandOutcome::AddColumn),
Command::DropColumn {
table,
column,
cascade,
} => database
.drop_column(table, column, cascade, src)
.await
.map(CommandOutcome::DropColumn),
Command::RenameColumn { table, old, new } => database
.rename_column(table, old, new, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::ChangeColumnType {
table,
column,
ty,
mode,
} => database
.change_column_type(table, column, ty, mode, src)
.await
.map(CommandOutcome::ChangeColumn),
Command::AddRelationship {
name,
parent_table,
parent_column,
child_table,
child_column,
on_delete,
on_update,
create_fk,
} => database
.add_relationship(
name,
parent_table,
parent_column,
child_table,
child_column,
on_delete,
on_update,
create_fk,
src,
)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::DropRelationship { selector } => database
.drop_relationship(selector, src)
.await
.map(CommandOutcome::Schema),
Command::AddIndex {
name,
table,
columns,
} => database
.add_index(name, table, columns, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::DropIndex { selector } => database
.drop_index(selector, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::SqlDropIndex { name, if_exists } => database
.sql_drop_index(name, if_exists, src)
.await
.map(|outcome| match outcome {
// Auto-show the now de-indexed table (ADR-0014), unlike
// SQL DROP TABLE whose table is gone.
DropIndexOutcome::Dropped(d) => CommandOutcome::Schema(Some(d)),
DropIndexOutcome::Skipped => CommandOutcome::SchemaDropIndexSkipped,
}),
Command::SqlCreateIndex {
name,
table,
columns,
unique,
if_not_exists,
} => database
.sql_create_index(name, table, columns, unique, if_not_exists, src)
.await
.map(|outcome| match outcome {
CreateIndexOutcome::Created(d) => CommandOutcome::Schema(Some(d)),
CreateIndexOutcome::Skipped(n) => CommandOutcome::SchemaCreateIndexSkipped(n),
}),
// `ALTER TABLE` (ADR-0035 §4e) decomposes to the existing column
// executors — each already one snapshot (one undo step) and
// journalled. No new worker layer; the outcomes reuse the
// simple-mode add/drop/rename column paths.
Command::SqlAlterTable { table, action } => match action {
AlterTableAction::AddColumn(spec) => database
.add_column(table, *spec, src)
.await
.map(CommandOutcome::AddColumn),
// cascade = false: an index-covered column is refused (no SQL
// `--cascade` spelling), matching SQLite + the simple default.
AlterTableAction::DropColumn { column } => database
.drop_column(table, column, false, src)
.await
.map(CommandOutcome::DropColumn),
AlterTableAction::RenameColumn { old, new } => database
.rename_column(table, old, new, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
// `ALTER COLUMN … TYPE` reuses the simple `change column`
// executor with `ForceConversion` — the ADR-0035 §7
// advanced policy (lossy converts with a note; no force
// flag; static-refused / incompatible still refuse). The
// ChangeColumn outcome surfaces the client-side lossy note,
// shared with simple mode.
AlterTableAction::AlterColumnType { column, ty } => database
.change_column_type(table, column, ty, ChangeColumnMode::ForceConversion, src)
.await
.map(CommandOutcome::ChangeColumn),
// ADR-0035 Amendment 2: ALTER COLUMN constraint gap-fill.
// SET/DROP NOT NULL and DROP DEFAULT reuse the ADR-0029
// executors; SET DEFAULT needs the raw-SQL path (sql_expr has
// no typed Value).
AlterTableAction::SetColumnNotNull { column } => database
.add_constraint(table, column, Constraint::NotNull, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
AlterTableAction::DropColumnNotNull { column } => database
.drop_constraint(table, column, ConstraintKind::NotNull, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
AlterTableAction::SetColumnDefault { column, default_sql } => database
.set_column_default(table, column, default_sql, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
AlterTableAction::DropColumnDefault { column } => database
.drop_constraint(table, column, ConstraintKind::Default, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
// `ADD [CONSTRAINT <name>] (CHECK | UNIQUE | FOREIGN KEY)`
// (ADR-0035 §4g) — each reuses an existing low-level executor
// (the FK via the relationship machinery `add 1:n
// relationship` uses); one undo step each.
AlterTableAction::AddTableConstraint { name, constraint } => match *constraint {
TableConstraint::Check { expr_sql } => database
.alter_add_table_check(table, name, expr_sql, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
TableConstraint::Unique { columns } => database
.alter_add_unique(table, columns, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
TableConstraint::ForeignKey(fk) => database
.alter_add_foreign_key(table, name, fk, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
},
// `DROP CONSTRAINT <name>` — a named table-CHECK or a named
// FK, resolved by the executor (ADR-0035 §4g).
AlterTableAction::DropConstraint { name } => database
.alter_drop_constraint(table, name, src)
.await
.map(CommandOutcome::Schema),
// `RENAME TO <new>` — the one genuinely new low-level op
// (ADR-0035 §6, 4h): native table rename + CSV + metadata
// reconciliation, one undo step.
AlterTableAction::RenameTable { new } => database
.rename_table(table, new, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
},
Command::AddConstraint {
table,
column,
constraint,
} => database
.add_constraint(table, column, constraint, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::DropConstraint {
table,
column,
kind,
} => database
.drop_constraint(table, column, kind, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::ShowTable { name } => database
.describe_table(name, src)
.await
.map(|d| CommandOutcome::Schema(Some(d))),
Command::Insert {
table,
columns,
values,
} => database
.insert(table, columns, values, src)
.await
.map(CommandOutcome::Insert),
Command::Update {
table,
assignments,
filter,
} => database
.update(table, assignments, filter, src)
.await
.map(CommandOutcome::Update),
Command::Delete { table, filter } => database
.delete(table, filter, src)
.await
.map(CommandOutcome::Delete),
Command::ShowData {
name,
filter,
limit,
} => database
.query_data(name, filter, limit, src)
.await
.map(CommandOutcome::Query),
// A SQL `SELECT` (advanced mode; ADR-0030 §6, ADR-0031).
// The grammar walker has already validated `sql` is in
// the supported subset; the worker runs it as text.
Command::Select { sql } => database
.run_select(sql, src)
.await
.map(CommandOutcome::Query),
// A SQL `INSERT` (advanced mode; ADR-0033 §1). Grammar-as-
// text: the worker runs the validated `sql` and re-persists
// the parsed `target_table`'s CSV. Reuses the DSL insert
// outcome (affected-row count + auto-show).
Command::SqlInsert {
sql,
target_table,
listed_columns,
row_source,
returning,
literal_rows,
} => database
.run_sql_insert_with_literals(
sql,
src,
target_table,
listed_columns,
row_source,
returning,
literal_rows,
)
.await
.map(CommandOutcome::Insert),
// A SQL `UPDATE` (advanced mode; ADR-0033 §2). Grammar-as-
// text: the worker runs the validated `sql` and re-persists
// the parsed `target_table`'s CSV. Reuses the DSL update
// outcome (affected-row count).
Command::SqlUpdate {
sql,
target_table,
returning,
set_literals,
} => database
.run_sql_update_with_literals(sql, src, target_table, returning, set_literals)
.await
.map(CommandOutcome::Update),
// A SQL `DELETE` (advanced mode; ADR-0033 §1/§7). Grammar-
// as-text: the worker runs the validated `sql`, detects FK
// cascade by row-count diffing, and re-persists the target
// plus every cascade-affected child. Reuses the DSL delete
// outcome (affected-row count + per-relationship cascade
// summary).
Command::SqlDelete {
sql,
target_table,
returning,
} => database
.run_sql_delete(sql, src, target_table, returning)
.await
.map(CommandOutcome::Delete),
// `EXPLAIN QUERY PLAN` never executes the wrapped
// statement (ADR-0028 §2), so explaining a destructive
// command is safe. `src` is unused here — explain is a
// diagnostic and is not written to `history.log`.
Command::Explain { query } => database
.explain_query_plan(*query)
.await
.map(CommandOutcome::QueryPlan),
// `replay` is parsed as a DSL command but routed by
// App::dispatch_dsl as `Action::Replay` rather than
// `Action::ExecuteDsl`; it never reaches the worker
// thread. Hitting this arm would mean the dispatch
// routing was bypassed.
Command::Replay { .. } => unreachable!(
"Command::Replay is dispatched as Action::Replay; \
reaching execute_command_typed indicates a routing bug"
),
// App-lifecycle commands are dispatched in App, not by
// the database worker. Hitting this arm would mean the
// dispatch routing was bypassed.
Command::App(_) => unreachable!(
"Command::App is dispatched via App::dispatch_app_command; \
reaching execute_command_typed indicates a routing bug"
),
}
}
fn spawn_event_reader(tx: mpsc::Sender<AppEvent>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut stream = EventStream::new();
while let Some(maybe_event) = stream.next().await {
match maybe_event {
Ok(CtEvent::Key(key)) => {
if tx.send(AppEvent::Key(key)).await.is_err() {
break;
}
}
Ok(CtEvent::Resize(cols, rows)) => {
if tx.send(AppEvent::Resize { cols, rows }).await.is_err() {
break;
}
}
Ok(_) => {
// Ignore other event kinds (paste, focus, mouse) for now.
}
Err(e) => {
error!(error = %e, "crossterm event stream error");
break;
}
}
}
debug!("event reader exiting");
})
}
fn setup_terminal() -> Result<Terminal<CrosstermBackend<io::Stdout>>> {
enable_raw_mode().context("enable raw mode")?;
let mut stdout = io::stdout();
// Mouse capture is intentionally NOT enabled: it would prevent the
// host terminal's native text selection (the cost of capturing every
// mouse event), which we don't currently use for anything in-app.
// If we ever want click-to-select panes or scroll wheel handling,
// we'll need a different strategy than blanket capture.
execute!(stdout, EnterAlternateScreen).context("enter alternate screen")?;
let backend = CrosstermBackend::new(stdout);
let terminal = Terminal::new(backend).context("construct terminal")?;
Ok(terminal)
}
fn teardown_terminal(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
) -> Result<()> {
disable_raw_mode().context("disable raw mode")?;
execute!(terminal.backend_mut(), LeaveAlternateScreen)
.context("leave alternate screen")?;
terminal.show_cursor().context("show cursor")?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::IndicatorDebounce;
use crate::dsl::walker::Severity;
#[test]
fn starts_hidden_and_disarmed() {
let d = IndicatorDebounce::default();
assert_eq!(d.visible(), None);
assert!(!d.is_armed());
}
#[test]
fn a_keystroke_hides_the_indicator_and_arms_the_debounce() {
let mut d = IndicatorDebounce::default();
d.settle(Some(Severity::Error));
assert_eq!(d.visible(), Some(Severity::Error));
d.note_event(true);
assert_eq!(d.visible(), None, "a keystroke hides the indicator");
assert!(d.is_armed(), "a keystroke arms the debounce");
}
#[test]
fn settling_shows_the_verdict_and_disarms() {
let mut d = IndicatorDebounce::default();
d.note_event(true);
assert!(d.is_armed());
d.settle(Some(Severity::Warning));
assert_eq!(d.visible(), Some(Severity::Warning));
assert!(!d.is_armed(), "a settled indicator owes no recompute");
}
#[test]
fn settling_a_clean_verdict_shows_nothing() {
let mut d = IndicatorDebounce::default();
d.note_event(true);
d.settle(None);
assert_eq!(d.visible(), None);
assert!(!d.is_armed());
}
#[test]
fn a_non_key_event_does_not_disturb_a_shown_indicator() {
let mut d = IndicatorDebounce::default();
d.settle(Some(Severity::Error));
d.note_event(false);
assert_eq!(
d.visible(),
Some(Severity::Error),
"a non-key event leaves the indicator shown",
);
assert!(!d.is_armed(), "a non-key event does not arm the debounce");
}
#[test]
fn a_non_key_event_while_armed_keeps_the_debounce_armed() {
// A background event (a DSL result, a tables refresh)
// arriving mid-typing must not cancel the pending
// recompute.
let mut d = IndicatorDebounce::default();
d.note_event(true);
assert!(d.is_armed());
d.note_event(false);
assert!(d.is_armed(), "the owed recompute survives a non-key event");
assert_eq!(d.visible(), None, "and the indicator stays hidden");
}
#[test]
fn typing_resumes_after_a_settle() {
// The full cycle: type → settle → type again → settle.
let mut d = IndicatorDebounce::default();
d.note_event(true);
d.settle(Some(Severity::Warning));
assert_eq!(d.visible(), Some(Severity::Warning));
d.note_event(true);
assert_eq!(d.visible(), None, "new typing hides the indicator again");
assert!(d.is_armed());
d.settle(None);
assert_eq!(d.visible(), None);
assert!(!d.is_armed());
}
// --- ADR-0038: the `show data` teaching echo's primary-key sourcing ---
/// End-to-end cover for `build_show_data_echo` against a real worker:
/// the limited `show data` echo orders by the table's primary key,
/// resolved from the schema post-execution (handoff §5 / ADR-0038 §4).
/// The pure renderer is unit-tested in `echo`; this pins the describe →
/// PK → `ORDER BY` glue, plus the simple-mode gate and the
/// unlimited-no-lookup path.
#[tokio::test]
async fn show_data_echo_orders_by_resolved_primary_key_when_limited() {
use crate::app::EffectiveMode;
use crate::db::Database;
use crate::dsl::Command;
use crate::dsl::command::ColumnSpec;
use crate::dsl::types::Type;
let db = Database::open(":memory:").expect("open in-memory");
db.create_table(
"Customers".to_string(),
vec![
ColumnSpec::new("id", Type::Serial),
ColumnSpec::new("name", Type::Text),
],
vec!["id".to_string()],
None,
)
.await
.expect("create table");
let limited = Command::ShowData {
name: "Customers".to_string(),
filter: None,
limit: Some(5),
};
// Limited → ORDER BY the resolved primary key.
assert_eq!(
super::build_show_data_echo(&db, &limited, EffectiveMode::AdvancedPersistent).await,
Some(vec!["SELECT * FROM Customers ORDER BY id LIMIT 5".to_string()]),
);
// Simple mode → silent, gated before any lookup.
assert_eq!(
super::build_show_data_echo(&db, &limited, EffectiveMode::Simple).await,
None,
);
// Unlimited → no describe, no ORDER BY.
let unlimited = Command::ShowData {
name: "Customers".to_string(),
filter: None,
limit: None,
};
assert_eq!(
super::build_show_data_echo(&db, &unlimited, EffectiveMode::AdvancedPersistent).await,
Some(vec!["SELECT * FROM Customers".to_string()]),
);
}
/// End-to-end cover for the Bucket B resolved-name echoes (ADR-0038
/// §7) against a real worker: `add`/`drop index` (auto-named) and
/// `add`/`drop relationship`. The pure renderers are unit-tested in
/// `echo`; this pins the runtime glue — `collect_drop_lookups`
/// (pre-execution, for drops) and `build_schema_echo` (post-execution
/// for adds, post-pre-exec for drops) — both for adds (description
/// lookup) and drops (pre-execution lookup including the named-
/// selector child-table scan).
#[tokio::test]
async fn bucket_b_resolved_name_echoes_against_real_worker() {
use crate::app::EffectiveMode;
use crate::db::Database;
use crate::dsl::ReferentialAction;
use crate::dsl::command::{ColumnSpec, IndexSelector, RelationshipSelector};
use crate::dsl::types::Type;
use crate::dsl::Command;
let db = Database::open(":memory:").expect("open in-memory");
db.create_table(
"Customers".to_string(),
vec![
ColumnSpec::new("id", Type::Serial),
ColumnSpec::new("Email", Type::Text),
],
vec!["id".to_string()],
None,
)
.await
.expect("create Customers");
db.create_table(
"Orders".to_string(),
vec![
ColumnSpec::new("id", Type::Serial),
ColumnSpec::new("CustId", Type::Int),
],
vec!["id".to_string()],
None,
)
.await
.expect("create Orders");
// --- add index (auto-named) ----------------------------------
let desc_after_add_index = db
.add_index(None, "Customers".to_string(), vec!["Email".to_string()], None)
.await
.expect("add index");
let add_idx_cmd = Command::AddIndex {
name: None,
table: "Customers".to_string(),
columns: vec!["Email".to_string()],
};
assert_eq!(
super::build_schema_echo(
&add_idx_cmd,
EffectiveMode::AdvancedPersistent,
Some(&desc_after_add_index),
&super::EchoLookups::default(),
),
Some(vec![
"CREATE INDEX Customers_Email_idx ON Customers (Email)".to_string()
]),
"auto-named index resolved from post-exec description",
);
// --- drop index (positional) — pre-exec lookup ---------------
let drop_idx_cmd = Command::DropIndex {
selector: IndexSelector::Columns {
table: "Customers".to_string(),
columns: vec!["Email".to_string()],
},
};
let drop_idx_lookups =
super::collect_echo_lookups(&db, &drop_idx_cmd, EffectiveMode::AdvancedPersistent)
.await;
assert_eq!(
drop_idx_lookups.drop_index_name.as_deref(),
Some("Customers_Email_idx"),
"drop-index pre-exec lookup finds the index by column set",
);
let desc_after_drop_idx = db
.drop_index(
IndexSelector::Columns {
table: "Customers".to_string(),
columns: vec!["Email".to_string()],
},
None,
)
.await
.expect("drop index");
assert_eq!(
super::build_schema_echo(
&drop_idx_cmd,
EffectiveMode::AdvancedPersistent,
Some(&desc_after_drop_idx),
&drop_idx_lookups,
),
Some(vec!["DROP INDEX Customers_Email_idx".to_string()]),
);
// Simple mode → no lookup, no echo.
assert!(
super::collect_echo_lookups(&db, &drop_idx_cmd, EffectiveMode::Simple)
.await
.drop_index_name
.is_none(),
"simple-mode gate skips the pre-exec describe",
);
// --- add relationship (auto-named) ---------------------------
let desc_after_add_rel = db
.add_relationship(
None,
"Customers".to_string(),
"id".to_string(),
"Orders".to_string(),
"CustId".to_string(),
ReferentialAction::Cascade,
ReferentialAction::NoAction,
false,
None,
)
.await
.expect("add relationship");
let add_rel_cmd = Command::AddRelationship {
name: None,
parent_table: "Customers".to_string(),
parent_column: "id".to_string(),
child_table: "Orders".to_string(),
child_column: "CustId".to_string(),
on_delete: ReferentialAction::Cascade,
on_update: ReferentialAction::NoAction,
create_fk: false,
};
assert_eq!(
super::build_schema_echo(
&add_rel_cmd,
EffectiveMode::AdvancedPersistent,
Some(&desc_after_add_rel),
&super::EchoLookups::default(),
),
Some(vec![
"ALTER TABLE Orders ADD CONSTRAINT Customers_id_to_Orders_CustId FOREIGN KEY (CustId) REFERENCES Customers (id) ON DELETE CASCADE".to_string()
]),
"auto-named relationship resolved from parent's inbound_relationships",
);
// --- drop relationship by endpoints — pre-exec lookup --------
let drop_rel_endpoints = Command::DropRelationship {
selector: RelationshipSelector::Endpoints {
parent_table: "Customers".to_string(),
parent_column: "id".to_string(),
child_table: "Orders".to_string(),
child_column: "CustId".to_string(),
},
};
let endpoints_lookups = super::collect_echo_lookups(
&db,
&drop_rel_endpoints,
EffectiveMode::AdvancedPersistent,
)
.await;
assert_eq!(
endpoints_lookups.drop_relationship,
Some(("Customers_id_to_Orders_CustId".to_string(), "Orders".to_string())),
"endpoints selector resolves name via child describe",
);
// --- drop relationship by name — child-table scan ------------
let drop_rel_named = Command::DropRelationship {
selector: RelationshipSelector::Named {
name: "Customers_id_to_Orders_CustId".to_string(),
},
};
let named_lookups =
super::collect_echo_lookups(&db, &drop_rel_named, EffectiveMode::AdvancedPersistent)
.await;
assert_eq!(
named_lookups.drop_relationship,
Some(("Customers_id_to_Orders_CustId".to_string(), "Orders".to_string())),
"named selector scans user tables to find the child",
);
// Either selector → same echo.
for (cmd, lookups) in [
(&drop_rel_endpoints, &endpoints_lookups),
(&drop_rel_named, &named_lookups),
] {
assert_eq!(
super::build_schema_echo(
cmd,
EffectiveMode::AdvancedPersistent,
None, // description not needed for drops
lookups,
),
Some(vec![
"ALTER TABLE Orders DROP CONSTRAINT Customers_id_to_Orders_CustId".to_string()
]),
);
}
}
/// End-to-end cover for the Bucket B multi-statement echoes (ADR-0038
/// §7 / §6 category 2) against a real worker: `drop column --cascade`
/// (post-exec `DropColumnResult.dropped_indexes`) and `add
/// relationship --create-fk` (pre-exec lookup of the parent PK type +
/// whether the child column existed; the multi-line shape fires only
/// when the column was newly created).
#[tokio::test]
async fn bucket_b_multi_statement_echoes_against_real_worker() {
use crate::app::EffectiveMode;
use crate::db::Database;
use crate::dsl::ReferentialAction;
use crate::dsl::command::ColumnSpec;
use crate::dsl::types::Type;
use crate::dsl::Command;
// --- drop column --cascade -----------------------------------
let db = Database::open(":memory:").expect("open in-memory");
db.create_table(
"Customers".to_string(),
vec![
ColumnSpec::new("id", Type::Serial),
ColumnSpec::new("Email", Type::Text),
],
vec!["id".to_string()],
None,
)
.await
.expect("create Customers");
db.add_index(None, "Customers".to_string(), vec!["Email".to_string()], None)
.await
.expect("index Email");
let drop_cmd = Command::DropColumn {
table: "Customers".to_string(),
column: "Email".to_string(),
cascade: true,
};
let drop_result = db
.drop_column("Customers".to_string(), "Email".to_string(), true, None)
.await
.expect("drop column --cascade");
assert_eq!(
super::build_drop_column_cascade_echo(
&drop_cmd,
EffectiveMode::AdvancedPersistent,
&drop_result,
),
Some(vec![
"DROP INDEX Customers_Email_idx".to_string(),
"ALTER TABLE Customers DROP COLUMN Email".to_string(),
]),
);
// Simple mode → silent.
assert!(
super::build_drop_column_cascade_echo(
&drop_cmd,
EffectiveMode::Simple,
&drop_result,
)
.is_none(),
);
// --- add relationship --create-fk (column newly created) ----
let db = Database::open(":memory:").expect("open in-memory");
db.create_table(
"Customers".to_string(),
vec![ColumnSpec::new("id", Type::Serial)],
vec!["id".to_string()],
None,
)
.await
.expect("create Customers");
// Orders WITHOUT CustId — `--create-fk` will add it.
db.create_table(
"Orders".to_string(),
vec![ColumnSpec::new("id", Type::Serial)],
vec!["id".to_string()],
None,
)
.await
.expect("create Orders");
let add_fk_cmd = Command::AddRelationship {
name: None,
parent_table: "Customers".to_string(),
parent_column: "id".to_string(),
child_table: "Orders".to_string(),
child_column: "CustId".to_string(),
on_delete: ReferentialAction::Cascade,
on_update: ReferentialAction::NoAction,
create_fk: true,
};
// Pre-exec lookup: parent PK is `serial` → child type = `int`;
// child column did not exist → newly created.
let pre_lookups =
super::collect_echo_lookups(&db, &add_fk_cmd, EffectiveMode::AdvancedPersistent).await;
assert_eq!(
pre_lookups.add_rel_create_fk_new_column_type,
Some(Some(Type::Int)),
"pre-exec captures `serial → int` for the newly-created child column",
);
let parent_desc = db
.add_relationship(
None,
"Customers".to_string(),
"id".to_string(),
"Orders".to_string(),
"CustId".to_string(),
ReferentialAction::Cascade,
ReferentialAction::NoAction,
true,
None,
)
.await
.expect("add --create-fk");
assert_eq!(
super::build_schema_echo(
&add_fk_cmd,
EffectiveMode::AdvancedPersistent,
Some(&parent_desc),
&pre_lookups,
),
Some(vec![
"ALTER TABLE Orders ADD COLUMN CustId int".to_string(),
"ALTER TABLE Orders ADD CONSTRAINT Customers_id_to_Orders_CustId FOREIGN KEY (CustId) REFERENCES Customers (id) ON DELETE CASCADE".to_string(),
]),
"multi-line echo fires when the child column was newly created",
);
// --- add relationship --create-fk (column already existed) --
let db = Database::open(":memory:").expect("open in-memory");
db.create_table(
"Customers".to_string(),
vec![ColumnSpec::new("id", Type::Serial)],
vec!["id".to_string()],
None,
)
.await
.expect("create Customers");
db.create_table(
"Orders".to_string(),
vec![
ColumnSpec::new("id", Type::Serial),
ColumnSpec::new("CustId", Type::Int),
],
vec!["id".to_string()],
None,
)
.await
.expect("create Orders");
let pre_lookups =
super::collect_echo_lookups(&db, &add_fk_cmd, EffectiveMode::AdvancedPersistent).await;
assert_eq!(
pre_lookups.add_rel_create_fk_new_column_type,
Some(None),
"pre-exec records the child column already existed → single-line echo",
);
let parent_desc = db
.add_relationship(
None,
"Customers".to_string(),
"id".to_string(),
"Orders".to_string(),
"CustId".to_string(),
ReferentialAction::Cascade,
ReferentialAction::NoAction,
true,
None,
)
.await
.expect("add --create-fk (existing column)");
assert_eq!(
super::build_schema_echo(
&add_fk_cmd,
EffectiveMode::AdvancedPersistent,
Some(&parent_desc),
&pre_lookups,
),
Some(vec![
"ALTER TABLE Orders ADD CONSTRAINT Customers_id_to_Orders_CustId FOREIGN KEY (CustId) REFERENCES Customers (id) ON DELETE CASCADE".to_string()
]),
"single-line FK echo when the child column already existed",
);
}
}