diff --git a/Cargo.lock b/Cargo.lock index 5529fe2..52634be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,6 +263,27 @@ dependencies = [ "phf", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "darling" version = "0.23.0" @@ -1327,8 +1348,10 @@ name = "rdbms-playground" version = "0.1.0" dependencies = [ "anyhow", + "base64", "chumsky", "crossterm", + "csv", "directories", "futures-util", "gethostname", diff --git a/Cargo.toml b/Cargo.toml index dd748f7..402e63c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,10 @@ publish = false [dependencies] anyhow = "1.0.102" +base64 = "0.22.1" chumsky = "0.13.0" crossterm = { version = "0.29.0", features = ["event-stream"] } +csv = "1.4.0" directories = "6.0.0" futures-util = "0.3.32" gethostname = "1.1.0" diff --git a/src/action.rs b/src/action.rs index 675fbca..a7ddede 100644 --- a/src/action.rs +++ b/src/action.rs @@ -13,8 +13,15 @@ use crate::dsl::Command; pub enum Action { /// Stop the event loop and exit cleanly. Quit, - /// Hand a parsed DSL command to the database worker. The - /// runtime executes it and feeds the result back as - /// `AppEvent::DslSucceeded` or `AppEvent::DslFailed`. - ExecuteDsl(Command), + /// Hand a parsed DSL command to the database worker. + /// + /// `command` is the parsed AST that the worker executes; + /// `source` is the original user-typed text, retained for + /// `history.log` per ADR-0015 §5. The runtime feeds the + /// result back as `AppEvent::DslSucceeded` / + /// `AppEvent::DslFailed`. + ExecuteDsl { + command: Command, + source: String, + }, } diff --git a/src/app.rs b/src/app.rs index 06c66da..ad9debd 100644 --- a/src/app.rs +++ b/src/app.rs @@ -106,6 +106,11 @@ pub struct App { /// during very-early startup before the runtime has opened a /// project; otherwise always populated. pub project_name: Option, + /// Set when a fatal persistence failure has occurred + /// (ADR-0015 §8). The runtime reads this after the event + /// loop exits and prints it to stderr post-teardown so the + /// banner remains above the shell prompt. + pub fatal_message: Option, } const PAGE_SCROLL_LINES: usize = 5; @@ -136,6 +141,7 @@ impl App { last_output_visible: 0, last_output_total_wrapped: 0, project_name: None, + fatal_message: None, } } @@ -208,6 +214,20 @@ impl App { self.tables = tables; Vec::new() } + AppEvent::PersistenceFatal { + operation, + path, + message, + } => { + let banner = format!( + "FATAL: failed to {operation} `{}` — {message}. \ + Quitting; investigate and restart.", + path.display(), + ); + self.note_error(banner.clone()); + self.fatal_message = Some(banner); + vec![Action::Quit] + } } } @@ -469,7 +489,10 @@ impl App { kind: OutputKind::Echo, mode_at_submission: submission_mode, }); - vec![Action::ExecuteDsl(cmd)] + vec![Action::ExecuteDsl { + command: cmd, + source: input.to_string(), + }] } Err(ParseError::Empty) => Vec::new(), Err(err) => { @@ -820,16 +843,20 @@ mod tests { let mut app = App::new(); type_str(&mut app, "create table Customers with pk"); let actions = submit(&mut app); + assert_eq!(actions.len(), 1); + let Action::ExecuteDsl { command, .. } = &actions[0] else { + panic!("expected ExecuteDsl, got {:?}", actions[0]); + }; assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::CreateTable { + command, + &Command::CreateTable { name: "Customers".to_string(), columns: vec![crate::dsl::ColumnSpec { name: "id".to_string(), ty: Type::Serial, }], primary_key: vec!["id".to_string()], - })] + }, ); // The input is echoed back as a "running:" notice so the // user sees something happened while the DB worker runs. @@ -1071,11 +1098,15 @@ mod tests { let mut app = App::new(); type_str(&mut app, "drop table T"); let actions = submit(&mut app); + assert_eq!(actions.len(), 1); + let Action::ExecuteDsl { command, .. } = &actions[0] else { + panic!("expected ExecuteDsl, got {:?}", actions[0]); + }; assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::DropTable { - name: "T".to_string() - })] + command, + &Command::DropTable { + name: "T".to_string(), + }, ); } @@ -1425,13 +1456,17 @@ mod tests { let mut app = App::new(); type_str(&mut app, "add column to table T: Name (text)"); let actions = submit(&mut app); + assert_eq!(actions.len(), 1); + let Action::ExecuteDsl { command, .. } = &actions[0] else { + panic!("expected ExecuteDsl, got {:?}", actions[0]); + }; assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::AddColumn { + command, + &Command::AddColumn { table: "T".to_string(), column: "Name".to_string(), ty: Type::Text, - })] + }, ); } } diff --git a/src/db.rs b/src/db.rs index ab1d6aa..28013e2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -37,6 +37,10 @@ use crate::dsl::ColumnSpec; use crate::dsl::shortid; use crate::dsl::types::Type; use crate::dsl::value::{Bound, Value, ValueError}; +use crate::persistence::{ + CellValue, ColumnSchema, Persistence, PersistenceError, RelationshipSchema, SchemaSnapshot, + TableSchema, TableSnapshot, +}; /// Inbox capacity. The worker is fast enough that this rarely /// matters; `64` is a generous head-room for bursts. @@ -107,6 +111,12 @@ pub enum DbError { Unsupported(String), #[error("invalid value: {0}")] InvalidValue(String), + #[error("could not {operation} `{path}`: {message}")] + PersistenceFatal { + operation: &'static str, + path: std::path::PathBuf, + message: String, + }, #[error("database worker is no longer available")] WorkerGone, #[error("io error: {0}")] @@ -189,6 +199,22 @@ impl DbError { let kind = classify_sqlite_error(&err, &message); Self::Sqlite { message, kind } } + + fn from_persistence(err: PersistenceError) -> Self { + Self::PersistenceFatal { + operation: err.operation(), + path: err.path().to_path_buf(), + message: err.to_string(), + } + } + + /// Whether this error means the application cannot + /// continue and must quit (per ADR-0015 §8). The runtime + /// surfaces these as fatal banners. + #[must_use] + pub const fn is_fatal(&self) -> bool { + matches!(self, Self::PersistenceFatal { .. }) + } } fn classify_sqlite_error(err: &rusqlite::Error, message: &str) -> SqliteErrorKind { @@ -218,16 +244,19 @@ enum Request { name: String, columns: Vec, primary_key: Vec, + source: Option, reply: oneshot::Sender>, }, DropTable { name: String, + source: Option, reply: oneshot::Sender>, }, AddColumn { table: String, column: String, ty: Type, + source: Option, reply: oneshot::Sender>, }, ListTables { @@ -235,6 +264,7 @@ enum Request { }, DescribeTable { name: String, + source: Option, reply: oneshot::Sender>, }, AddRelationship { @@ -246,40 +276,70 @@ enum Request { on_delete: ReferentialAction, on_update: ReferentialAction, create_fk: bool, + source: Option, reply: oneshot::Sender>, }, DropRelationship { selector: RelationshipSelector, + source: Option, reply: oneshot::Sender, DbError>>, }, Insert { table: String, columns: Option>, values: Vec, + source: Option, reply: oneshot::Sender>, }, Update { table: String, assignments: Vec<(String, Value)>, filter: RowFilter, + source: Option, reply: oneshot::Sender>, }, Delete { table: String, filter: RowFilter, + source: Option, reply: oneshot::Sender>, }, QueryData { table: String, + source: Option, reply: oneshot::Sender>, }, } impl Database { - /// Open a database. The path may be a filesystem location - /// or `":memory:"` for an ephemeral in-memory database. The - /// connection is moved onto a dedicated worker thread. + /// Open a database without per-command persistence. + /// + /// The path may be a filesystem location or `":memory:"` + /// for an ephemeral in-memory database. The connection is + /// moved onto a dedicated worker thread. With no + /// persistence handle, the YAML/CSV/`history.log` writes + /// are skipped — useful for unit tests that exercise the + /// SQLite layer in isolation. pub fn open>(path: P) -> Result { + Self::open_inner(path, None) + } + + /// Open a database with per-command persistence wired in + /// (ADR-0015 §6). Every successful user-issued mutation + /// writes through to `project.yaml`, the affected + /// `data/.csv` files, and `history.log` *before* + /// the SQLite tx commits. + pub fn open_with_persistence>( + path: P, + persistence: Persistence, + ) -> Result { + Self::open_inner(path, Some(persistence)) + } + + fn open_inner>( + path: P, + persistence: Option, + ) -> Result { let path_display = path.as_ref().to_string_lossy().into_owned(); let conn = match path.as_ref().to_str() { Some(":memory:") => Connection::open_in_memory(), @@ -292,7 +352,7 @@ impl Database { let (tx, rx) = mpsc::channel::(REQUEST_CHANNEL_CAPACITY); thread::Builder::new() .name("rdbms-db-worker".to_string()) - .spawn(move || worker_loop(conn, rx)) + .spawn(move || worker_loop(conn, persistence, rx)) .map_err(|e| DbError::Io(e.to_string()))?; Ok(Self { inbox: tx }) @@ -303,21 +363,23 @@ impl Database { name: String, columns: Vec, primary_key: Vec, + source: Option, ) -> Result { let (reply, recv) = oneshot::channel(); self.send(Request::CreateTable { name, columns, primary_key, + source, reply, }) .await?; recv.await.map_err(|_| DbError::WorkerGone)? } - pub async fn drop_table(&self, name: String) -> Result<(), DbError> { + pub async fn drop_table(&self, name: String, source: Option) -> Result<(), DbError> { let (reply, recv) = oneshot::channel(); - self.send(Request::DropTable { name, reply }).await?; + self.send(Request::DropTable { name, source, reply }).await?; recv.await.map_err(|_| DbError::WorkerGone)? } @@ -326,12 +388,14 @@ impl Database { table: String, column: String, ty: Type, + source: Option, ) -> Result { let (reply, recv) = oneshot::channel(); self.send(Request::AddColumn { table, column, ty, + source, reply, }) .await?; @@ -344,9 +408,18 @@ impl Database { recv.await.map_err(|_| DbError::WorkerGone)? } - pub async fn describe_table(&self, name: String) -> Result { + pub async fn describe_table( + &self, + name: String, + source: Option, + ) -> Result { let (reply, recv) = oneshot::channel(); - self.send(Request::DescribeTable { name, reply }).await?; + self.send(Request::DescribeTable { + name, + source, + reply, + }) + .await?; recv.await.map_err(|_| DbError::WorkerGone)? } @@ -361,6 +434,7 @@ impl Database { on_delete: ReferentialAction, on_update: ReferentialAction, create_fk: bool, + source: Option, ) -> Result { let (reply, recv) = oneshot::channel(); self.send(Request::AddRelationship { @@ -372,6 +446,7 @@ impl Database { on_delete, on_update, create_fk, + source, reply, }) .await?; @@ -381,10 +456,15 @@ impl Database { pub async fn drop_relationship( &self, selector: RelationshipSelector, + source: Option, ) -> Result, DbError> { let (reply, recv) = oneshot::channel(); - self.send(Request::DropRelationship { selector, reply }) - .await?; + self.send(Request::DropRelationship { + selector, + source, + reply, + }) + .await?; recv.await.map_err(|_| DbError::WorkerGone)? } @@ -393,12 +473,14 @@ impl Database { table: String, columns: Option>, values: Vec, + source: Option, ) -> Result { let (reply, recv) = oneshot::channel(); self.send(Request::Insert { table, columns, values, + source, reply, }) .await?; @@ -410,12 +492,14 @@ impl Database { table: String, assignments: Vec<(String, Value)>, filter: RowFilter, + source: Option, ) -> Result { let (reply, recv) = oneshot::channel(); self.send(Request::Update { table, assignments, filter, + source, reply, }) .await?; @@ -426,20 +510,31 @@ impl Database { &self, table: String, filter: RowFilter, + source: Option, ) -> Result { let (reply, recv) = oneshot::channel(); self.send(Request::Delete { table, filter, + source, reply, }) .await?; recv.await.map_err(|_| DbError::WorkerGone)? } - pub async fn query_data(&self, table: String) -> Result { + pub async fn query_data( + &self, + table: String, + source: Option, + ) -> Result { let (reply, recv) = oneshot::channel(); - self.send(Request::QueryData { table, reply }).await?; + self.send(Request::QueryData { + table, + source, + reply, + }) + .await?; recv.await.map_err(|_| DbError::WorkerGone)? } @@ -457,6 +552,11 @@ impl Database { /// makes round-trip rendering work today. const META_TABLE: &str = "__rdbms_playground_columns"; const REL_TABLE: &str = "__rdbms_playground_relationships"; +/// Single-row key/value table that carries project metadata +/// the YAML serializer needs to round-trip — currently +/// `created_at`. Created on first connect and only ever +/// written by us; the user never touches it directly. +const META_PROJECT_TABLE: &str = "__rdbms_playground_meta"; fn configure_connection(conn: &Connection) -> Result<(), rusqlite::Error> { conn.execute_batch(&format!( @@ -476,45 +576,118 @@ fn configure_connection(conn: &Connection) -> Result<(), rusqlite::Error> { on_delete TEXT NOT NULL,\n\ on_update TEXT NOT NULL,\n\ PRIMARY KEY (child_table, child_column)\n\ + ) STRICT;\n\ + CREATE TABLE IF NOT EXISTS {META_PROJECT_TABLE} (\n\ + key TEXT NOT NULL PRIMARY KEY,\n\ + value TEXT NOT NULL\n\ ) STRICT;" ))?; + // Seed `created_at` once. INSERT OR IGNORE keeps the value + // stable across re-opens of the same database. + conn.execute( + &format!( + "INSERT OR IGNORE INTO {META_PROJECT_TABLE} (key, value) VALUES ('created_at', ?1)" + ), + [iso8601_now()], + )?; Ok(()) } -fn worker_loop(conn: Connection, mut rx: mpsc::Receiver) { +/// Current UTC time as ISO-8601 with second precision. +/// Duplicates the helper in `persistence::history` rather +/// than depending on it, since this code path runs at +/// connection setup before the rest of the worker is up. +fn iso8601_now() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + let day_secs = secs.rem_euclid(86_400); + let h = day_secs / 3600; + let m = (day_secs % 3600) / 60; + let s = day_secs % 60; + let days = secs.div_euclid(86_400); + let z = days + 719_468; + let era = if z >= 0 { z } else { z - 146_096 } / 146_097; + let doe = (z - era * 146_097) as u64; + let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; + let y = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d = doy - (153 * mp + 2) / 5 + 1; + let m_cal = if mp < 10 { mp + 3 } else { mp - 9 }; + let y_cal = if m_cal <= 2 { y + 1 } else { y }; + format!("{y_cal:04}-{m_cal:02}-{d:02}T{h:02}:{m:02}:{s:02}Z") +} + +fn worker_loop( + conn: Connection, + persistence: Option, + mut rx: mpsc::Receiver, +) { debug!("db worker started"); while let Some(req) = rx.blocking_recv() { - handle_request(&conn, req); + handle_request(&conn, persistence.as_ref(), req); } debug!("db worker exiting"); } -fn handle_request(conn: &Connection, req: Request) { +fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Request) { match req { Request::CreateTable { name, columns, primary_key, + source, reply, } => { - let _ = reply.send(do_create_table(conn, &name, &columns, &primary_key)); + let _ = reply.send(do_create_table( + conn, + persistence, + source.as_deref(), + &name, + &columns, + &primary_key, + )); } - Request::DropTable { name, reply } => { - let _ = reply.send(do_drop_table(conn, &name)); + Request::DropTable { + name, + source, + reply, + } => { + let _ = reply.send(do_drop_table(conn, persistence, source.as_deref(), &name)); } Request::AddColumn { table, column, ty, + source, reply, } => { - let _ = reply.send(do_add_column(conn, &table, &column, ty)); + let _ = reply.send(do_add_column( + conn, + persistence, + source.as_deref(), + &table, + &column, + ty, + )); } Request::ListTables { reply } => { let _ = reply.send(do_list_tables(conn)); } - Request::DescribeTable { name, reply } => { - let _ = reply.send(do_describe_table(conn, &name)); + Request::DescribeTable { + name, + source, + reply, + } => { + let _ = reply.send(do_describe_table_request( + conn, + persistence, + source.as_deref(), + &name, + )); } Request::AddRelationship { name, @@ -525,10 +698,13 @@ fn handle_request(conn: &Connection, req: Request) { on_delete, on_update, create_fk, + source, reply, } => { let _ = reply.send(do_add_relationship( conn, + persistence, + source.as_deref(), name.as_deref(), &parent_table, &parent_column, @@ -539,38 +715,285 @@ fn handle_request(conn: &Connection, req: Request) { create_fk, )); } - Request::DropRelationship { selector, reply } => { - let _ = reply.send(do_drop_relationship(conn, &selector)); + Request::DropRelationship { + selector, + source, + reply, + } => { + let _ = reply.send(do_drop_relationship( + conn, + persistence, + source.as_deref(), + &selector, + )); } Request::Insert { table, columns, values, + source, reply, } => { - let _ = reply.send(do_insert(conn, &table, columns.as_deref(), &values)); + let _ = reply.send(do_insert( + conn, + persistence, + source.as_deref(), + &table, + columns.as_deref(), + &values, + )); } Request::Update { table, assignments, filter, + source, reply, } => { - let _ = reply.send(do_update(conn, &table, &assignments, &filter)); + let _ = reply.send(do_update( + conn, + persistence, + source.as_deref(), + &table, + &assignments, + &filter, + )); } Request::Delete { table, filter, + source, reply, } => { - let _ = reply.send(do_delete(conn, &table, &filter)); + let _ = reply.send(do_delete( + conn, + persistence, + source.as_deref(), + &table, + &filter, + )); } - Request::QueryData { table, reply } => { - let _ = reply.send(do_query_data(conn, &table)); + Request::QueryData { + table, + source, + reply, + } => { + let _ = reply.send(do_query_data_request( + conn, + persistence, + source.as_deref(), + &table, + )); } } } +/// Set of changes a mutation made, used by the post-mutation +/// persistence phase to know which text targets need refreshing +/// (ADR-0015 §6). +#[derive(Debug, Default)] +struct Changes { + /// Schema (tables / columns / relationships) was modified — + /// `project.yaml` needs to be rewritten. + schema_dirty: bool, + /// Tables whose row data changed and whose CSVs need to be + /// re-emitted from the post-mutation state. + rewritten_tables: Vec, + /// Tables that were dropped — their CSVs should be removed. + deleted_tables: Vec, +} + +/// Drive the post-mutation persistence phase: write the YAML +/// schema, rewrite affected CSVs, append `history.log`. Called +/// before `tx.commit()` so a failure here causes the SQLite +/// transaction to roll back automatically (the `Drop` impl on +/// `Transaction` rolls back on drop). +/// +/// Read-only requests (no schema change, no row writes, no +/// drops) still use this to append `history.log` if `source` +/// is set; they pass an empty `Changes`. +fn finalize_persistence( + conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, + changes: &Changes, +) -> Result<(), DbError> { + let Some(p) = persistence else { + return Ok(()); + }; + if changes.schema_dirty { + let schema = read_schema_snapshot(conn)?; + p.write_schema(&schema).map_err(DbError::from_persistence)?; + } + for table in &changes.rewritten_tables { + if let Some(snapshot) = read_table_snapshot(conn, table)? { + p.write_table_data(&snapshot) + .map_err(DbError::from_persistence)?; + } + } + for table in &changes.deleted_tables { + p.delete_table_data(table) + .map_err(DbError::from_persistence)?; + } + if let Some(text) = source { + p.append_history(text) + .map_err(DbError::from_persistence)?; + } + Ok(()) +} + +/// Read the full user-facing schema (tables, columns, +/// relationships, project metadata) from the database. Reads +/// committed *or* in-tx state because SQLite presents the +/// same connection's writes back through the same connection. +fn read_schema_snapshot(conn: &Connection) -> Result { + let table_names = do_list_tables(conn)?; + let mut tables: Vec = Vec::with_capacity(table_names.len()); + for name in &table_names { + let read = read_schema(conn, name)?; + let columns: Vec = read + .columns + .iter() + .map(|c| ColumnSchema { + name: c.name.clone(), + // user_type is always populated for tables we + // created; the fallback is defensive. + user_type: c.user_type.unwrap_or(Type::Text), + }) + .collect(); + tables.push(TableSchema { + name: name.clone(), + primary_key: read.primary_key.clone(), + columns, + }); + } + + let relationships = read_all_relationships(conn)?; + let created_at = read_project_created_at(conn)?; + Ok(SchemaSnapshot { + created_at, + tables, + relationships, + }) +} + +fn read_all_relationships(conn: &Connection) -> Result, DbError> { + let mut stmt = conn + .prepare(&format!( + "SELECT name, parent_table, parent_column, child_table, child_column, \ + on_delete, on_update \ + FROM {REL_TABLE} \ + ORDER BY rowid" + )) + .map_err(DbError::from_rusqlite)?; + let rows = stmt + .query_map([], |row| { + Ok(RelationshipSchema { + name: row.get(0)?, + parent_table: row.get(1)?, + parent_column: row.get(2)?, + child_table: row.get(3)?, + child_column: row.get(4)?, + on_delete: parse_action_from_sqlite(row.get::<_, String>(5)?.as_str()), + on_update: parse_action_from_sqlite(row.get::<_, String>(6)?.as_str()), + }) + }) + .map_err(DbError::from_rusqlite)?; + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(DbError::from_rusqlite)?); + } + Ok(out) +} + +fn read_project_created_at(conn: &Connection) -> Result { + let value: Option = conn + .query_row( + &format!("SELECT value FROM {META_PROJECT_TABLE} WHERE key = 'created_at'"), + [], + |row| row.get(0), + ) + .or_else(|e| match e { + rusqlite::Error::QueryReturnedNoRows => Ok(None), + other => Err(other), + }) + .map_err(DbError::from_rusqlite)?; + Ok(value.unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string())) +} + +/// Read a single table's full row data, returning `None` if +/// the table no longer exists (e.g. a recent `drop_table`). +fn read_table_snapshot( + conn: &Connection, + table: &str, +) -> Result, DbError> { + if !user_table_exists(conn, table)? { + return Ok(None); + } + let read = read_schema(conn, table)?; + let columns: Vec = read + .columns + .iter() + .map(|c| ColumnSchema { + name: c.name.clone(), + user_type: c.user_type.unwrap_or(Type::Text), + }) + .collect(); + let column_idents: Vec = read + .columns + .iter() + .map(|c| quote_ident(&c.name)) + .collect(); + let sql = format!( + "SELECT {} FROM {} ORDER BY rowid", + column_idents.join(", "), + quote_ident(table), + ); + let mut stmt = conn.prepare(&sql).map_err(DbError::from_rusqlite)?; + let column_count = read.columns.len(); + let mut rows: Vec> = Vec::new(); + let mut iter = stmt.query([]).map_err(DbError::from_rusqlite)?; + while let Some(row) = iter.next().map_err(DbError::from_rusqlite)? { + let mut record: Vec = Vec::with_capacity(column_count); + for i in 0..column_count { + record.push(row_value_to_cell(row, i)?); + } + rows.push(record); + } + Ok(Some(TableSnapshot { + name: table.to_string(), + columns, + rows, + })) +} + +fn user_table_exists(conn: &Connection, table: &str) -> Result { + let count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM sqlite_schema \ + WHERE type = 'table' AND name = ?1 \ + AND substr(name, 1, 8) != '__rdbms_'", + [table], + |row| row.get(0), + ) + .map_err(DbError::from_rusqlite)?; + Ok(count > 0) +} + +fn row_value_to_cell(row: &rusqlite::Row<'_>, idx: usize) -> Result { + use rusqlite::types::ValueRef; + let v = row.get_ref(idx).map_err(DbError::from_rusqlite)?; + Ok(match v { + ValueRef::Null => CellValue::Null, + ValueRef::Integer(n) => CellValue::Integer(n), + ValueRef::Real(f) => CellValue::Real(f), + ValueRef::Text(bytes) => { + CellValue::Text(String::from_utf8_lossy(bytes).into_owned()) + } + ValueRef::Blob(bytes) => CellValue::Blob(bytes.to_vec()), + }) +} + /// Quote an identifier for safe inclusion in DDL. Doubles any /// embedded double-quotes per SQL convention. fn quote_ident(name: &str) -> String { @@ -589,6 +1012,8 @@ fn quote_ident(name: &str) -> String { fn do_create_table( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, name: &str, columns: &[ColumnSpec], primary_key: &[String], @@ -660,11 +1085,23 @@ fn do_create_table( .map_err(DbError::from_rusqlite)?; } } + let description = do_describe_table(conn, name)?; + let changes = Changes { + schema_dirty: true, + rewritten_tables: vec![name.to_string()], + ..Changes::default() + }; + finalize_persistence(conn, persistence, source, &changes)?; tx.commit().map_err(DbError::from_rusqlite)?; - do_describe_table(conn, name) + Ok(description) } -fn do_drop_table(conn: &Connection, name: &str) -> Result<(), DbError> { +fn do_drop_table( + conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, + name: &str, +) -> Result<(), DbError> { // Refuse the drop while any *other* table still has a // relationship pointing at this one — dropping the parent // would leave dangling FK constraints in the children. The @@ -697,12 +1134,20 @@ fn do_drop_table(conn: &Connection, name: &str) -> Result<(), DbError> { [name], ) .map_err(DbError::from_rusqlite)?; + let changes = Changes { + schema_dirty: true, + rewritten_tables: Vec::new(), + deleted_tables: vec![name.to_string()], + }; + finalize_persistence(conn, persistence, source, &changes)?; tx.commit().map_err(DbError::from_rusqlite)?; Ok(()) } fn do_add_column( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, table: &str, column: &str, ty: Type, @@ -738,8 +1183,15 @@ fn do_add_column( [table, column, ty.keyword()], ) .map_err(DbError::from_rusqlite)?; + let description = do_describe_table(conn, table)?; + let changes = Changes { + schema_dirty: true, + rewritten_tables: vec![table.to_string()], + ..Changes::default() + }; + finalize_persistence(conn, persistence, source, &changes)?; tx.commit().map_err(DbError::from_rusqlite)?; - do_describe_table(conn, table) + Ok(description) } fn do_list_tables(conn: &Connection) -> Result, DbError> { @@ -1036,6 +1488,8 @@ where #[allow(clippy::too_many_arguments)] fn do_add_relationship( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, name: Option<&str>, parent_table: &str, parent_column: &str, @@ -1178,6 +1632,18 @@ fn do_add_relationship( ], ) .map_err(DbError::from_rusqlite)?; + // Persistence runs inside the same tx so a write + // failure rolls back both the schema and the metadata + // (commit-db-last per ADR-0015 §6). + let changes = Changes { + schema_dirty: true, + // The child table was rebuilt — its CSV needs to + // be re-emitted from the new state too, in case + // the FK column was newly created. + rewritten_tables: vec![child_table.to_string()], + ..Changes::default() + }; + finalize_persistence(tx, persistence, source, &changes)?; Ok(()) })?; @@ -1190,6 +1656,8 @@ fn do_add_relationship( fn do_drop_relationship( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, selector: &RelationshipSelector, ) -> Result, DbError> { // Resolve to a single relationship row. @@ -1233,12 +1701,19 @@ fn do_drop_relationship( let mut new_schema = old_schema.clone(); new_schema.foreign_keys.retain(|fk| fk.child_column != child_column); + let child_table_for_persist = child_table.clone(); rebuild_table(conn, &child_table, &old_schema, &new_schema, |tx| { tx.execute( &format!("DELETE FROM {REL_TABLE} WHERE name = ?1;"), [rel_name.as_str()], ) .map_err(DbError::from_rusqlite)?; + let changes = Changes { + schema_dirty: true, + rewritten_tables: vec![child_table_for_persist.clone()], + ..Changes::default() + }; + finalize_persistence(tx, persistence, source, &changes)?; Ok(()) })?; @@ -1247,6 +1722,23 @@ fn do_drop_relationship( Ok(Some(do_describe_table(conn, &parent_table)?)) } +/// Read-only wrapper around `do_describe_table` that runs an +/// auxiliary `history.log` append for user-issued +/// `show table` commands. +fn do_describe_table_request( + conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, + name: &str, +) -> Result { + let description = do_describe_table(conn, name)?; + if let (Some(p), Some(text)) = (persistence, source) { + p.append_history(text) + .map_err(DbError::from_persistence)?; + } + Ok(description) +} + fn do_describe_table(conn: &Connection, name: &str) -> Result { // `pragma_table_info` is a table-valued function in modern // SQLite; using it as a SELECT lets us bind the table name @@ -1481,6 +1973,8 @@ fn count_rows(conn: &Connection, table: &str) -> Result { fn do_insert( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, table: &str, user_columns: Option<&[String]>, user_values: &[Value], @@ -1548,9 +2042,19 @@ fn do_insert( debug!(sql = %sql, "insert"); let params: Vec = bindings.iter().map(|(_, b)| bound_to_sqlite_value(b)).collect(); + let tx = conn + .unchecked_transaction() + .map_err(DbError::from_rusqlite)?; let rows_affected = execute_with_fk_enrichment(conn, table, &sql, ¶ms)?; let new_rowid = conn.last_insert_rowid(); let data = query_rows_by_rowid(conn, table, &[new_rowid])?; + let changes = Changes { + schema_dirty: false, + rewritten_tables: vec![table.to_string()], + ..Changes::default() + }; + finalize_persistence(conn, persistence, source, &changes)?; + tx.commit().map_err(DbError::from_rusqlite)?; Ok(InsertResult { rows_affected, data, @@ -1559,6 +2063,8 @@ fn do_insert( fn do_update( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, table: &str, assignments: &[(String, Value)], filter: &RowFilter, @@ -1626,8 +2132,18 @@ fn do_update( sets = set_clauses.join(", "), ); debug!(sql = %sql, "update"); + let tx = conn + .unchecked_transaction() + .map_err(DbError::from_rusqlite)?; let rows_affected = execute_with_fk_enrichment(conn, table, &sql, ¶ms)?; let data = query_rows_by_rowid(conn, table, &rowids)?; + let changes = Changes { + schema_dirty: false, + rewritten_tables: vec![table.to_string()], + ..Changes::default() + }; + finalize_persistence(conn, persistence, source, &changes)?; + tx.commit().map_err(DbError::from_rusqlite)?; Ok(UpdateResult { rows_affected, data, @@ -1653,6 +2169,8 @@ fn select_all_rowids(conn: &Connection, table: &str) -> Result, DbError fn do_delete( conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, table: &str, filter: &RowFilter, ) -> Result { @@ -1687,11 +2205,16 @@ fn do_delete( ident = quote_ident(table), ); debug!(sql = %sql, "delete"); + let tx = conn + .unchecked_transaction() + .map_err(DbError::from_rusqlite)?; let rows_affected = execute_with_fk_enrichment(conn, table, &sql, ¶ms)?; // Compare child-table counts after the delete; non-zero - // diffs are cascade effects. + // diffs are cascade effects. We also collect cascaded + // tables so the persistence phase rewrites their CSVs too. let mut cascade: Vec = Vec::new(); + let mut rewritten_tables: Vec = vec![table.to_string()]; for (rel, (_child_table, before_count)) in inbound.iter().zip(before_counts.iter()) { let after_count = count_rows(conn, &rel.other_table)?; let diff = before_count - after_count; @@ -1702,15 +2225,40 @@ fn do_delete( rows_changed: diff, action: rel.on_delete, }); + rewritten_tables.push(rel.other_table.clone()); } } + let changes = Changes { + schema_dirty: false, + rewritten_tables, + ..Changes::default() + }; + finalize_persistence(conn, persistence, source, &changes)?; + tx.commit().map_err(DbError::from_rusqlite)?; + Ok(DeleteResult { rows_affected, cascade, }) } +/// Read-only wrapper that adds the `history.log` append for +/// `show data` user commands. +fn do_query_data_request( + conn: &Connection, + persistence: Option<&Persistence>, + source: Option<&str>, + table: &str, +) -> Result { + let data = do_query_data(conn, table)?; + if let (Some(p), Some(text)) = (persistence, source) { + p.append_history(text) + .map_err(DbError::from_persistence)?; + } + Ok(data) +} + fn do_query_data(conn: &Connection, table: &str) -> Result { let schema = read_schema(conn, table)?; let column_names: Vec = schema.columns.iter().map(|c| c.name.clone()).collect(); @@ -1867,7 +2415,7 @@ mod tests { name.to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .expect("create table") } @@ -1906,7 +2454,7 @@ mod tests { "Customers".to_string(), vec![col("email", Type::Text)], vec!["email".to_string()], - ) + None) .await .unwrap(); assert_eq!(desc.columns.len(), 1); @@ -1924,7 +2472,7 @@ mod tests { "OrderLines".to_string(), vec![col("order_id", Type::Int), col("product_id", Type::Int)], vec!["order_id".to_string(), "product_id".to_string()], - ) + None) .await .unwrap(); assert_eq!(desc.columns.len(), 2); @@ -1941,7 +2489,7 @@ mod tests { "T".to_string(), vec![col("flag", Type::Bool)], vec!["flag".to_string()], - ) + None) .await .unwrap(); assert!(desc.columns[0].primary_key); @@ -1951,7 +2499,7 @@ mod tests { async fn create_table_rejects_zero_columns() { let db = db(); let err = db - .create_table("T".to_string(), Vec::new(), Vec::new()) + .create_table("T".to_string(), Vec::new(), Vec::new(), None) .await .unwrap_err(); assert!(matches!(err, DbError::Unsupported(_)), "got {err:?}"); @@ -1961,7 +2509,7 @@ mod tests { async fn drop_table_removes_it_from_list() { let db = db(); make_id_table(&db, "T").await; - db.drop_table("T".to_string()).await.unwrap(); + db.drop_table("T".to_string(), None).await.unwrap(); let tables = db.list_tables().await.unwrap(); assert!(tables.is_empty()); } @@ -1971,7 +2519,7 @@ mod tests { let db = db(); make_id_table(&db, "Customers").await; let desc = db - .add_column("Customers".to_string(), "Name".to_string(), Type::Text) + .add_column("Customers".to_string(), "Name".to_string(), Type::Text, None) .await .unwrap(); let names: Vec<_> = desc.columns.iter().map(|c| c.name.as_str()).collect(); @@ -1989,11 +2537,11 @@ mod tests { // datetime, decimal — all backed by TEXT). make_id_table(&db, "T").await; for ty in [Type::Date, Type::DateTime, Type::Decimal, Type::ShortId] { - db.add_column("T".to_string(), format!("c_{ty}"), ty) + db.add_column("T".to_string(), format!("c_{ty}"), ty, None) .await .unwrap(); } - let desc = db.describe_table("T".to_string()).await.unwrap(); + let desc = db.describe_table("T".to_string(), None).await.unwrap(); let id_col = desc.columns.iter().find(|c| c.name == "id").unwrap(); assert_eq!(id_col.user_type, Some(Type::Serial)); for ty in [Type::Date, Type::DateTime, Type::Decimal, Type::ShortId] { @@ -2021,14 +2569,14 @@ mod tests { "T".to_string(), vec![col("when", Type::Date)], vec!["when".to_string()], - ) + None) .await .unwrap(); - let before = db.describe_table("T".to_string()).await.unwrap(); + let before = db.describe_table("T".to_string(), None).await.unwrap(); assert_eq!(before.columns[0].user_type, Some(Type::Date)); // Drop it. - db.drop_table("T".to_string()).await.unwrap(); + db.drop_table("T".to_string(), None).await.unwrap(); // Recreate with a different type for the same-named column; // the metadata for the new table must reflect the new type @@ -2038,10 +2586,10 @@ mod tests { "T".to_string(), vec![col("when", Type::DateTime)], vec!["when".to_string()], - ) + None) .await .unwrap(); - let after = db.describe_table("T".to_string()).await.unwrap(); + let after = db.describe_table("T".to_string(), None).await.unwrap(); assert_eq!(after.columns[0].user_type, Some(Type::DateTime)); } @@ -2051,11 +2599,11 @@ mod tests { make_id_table(&db, "T").await; for ty in [Type::Text, Type::Int, Type::Real, Type::Bool, Type::ShortId] { let col_name = format!("c_{ty}"); - db.add_column("T".to_string(), col_name.clone(), ty) + db.add_column("T".to_string(), col_name.clone(), ty, None) .await .unwrap_or_else(|e| panic!("type {ty} failed: {e}")); } - let desc = db.describe_table("T".to_string()).await.unwrap(); + let desc = db.describe_table("T".to_string(), None).await.unwrap(); // 5 user columns + the id PK column. assert_eq!(desc.columns.len(), 6); } @@ -2065,7 +2613,7 @@ mod tests { let db = db(); make_id_table(&db, "T").await; let err = db - .add_column("T".to_string(), "id2".to_string(), Type::Serial) + .add_column("T".to_string(), "id2".to_string(), Type::Serial, None) .await .unwrap_err(); assert!(matches!(err, DbError::Unsupported(_)), "got {err:?}"); @@ -2080,7 +2628,7 @@ mod tests { "T".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap_err(); match err { @@ -2092,7 +2640,7 @@ mod tests { #[tokio::test] async fn drop_nonexistent_table_returns_no_such_table() { let db = db(); - let err = db.drop_table("Ghost".to_string()).await.unwrap_err(); + let err = db.drop_table("Ghost".to_string(), None).await.unwrap_err(); match err { DbError::Sqlite { kind, .. } => assert_eq!(kind, SqliteErrorKind::NoSuchTable), other => panic!("unexpected error: {other:?}"), @@ -2103,7 +2651,7 @@ mod tests { async fn add_column_to_missing_table_returns_no_such_table() { let db = db(); let err = db - .add_column("Ghost".to_string(), "x".to_string(), Type::Text) + .add_column("Ghost".to_string(), "x".to_string(), Type::Text, None) .await .unwrap_err(); match err { @@ -2115,7 +2663,7 @@ mod tests { #[tokio::test] async fn describe_missing_table_returns_no_such_table() { let db = db(); - let err = db.describe_table("Ghost".to_string()).await.unwrap_err(); + let err = db.describe_table("Ghost".to_string(), None).await.unwrap_err(); match err { DbError::Sqlite { kind, .. } => assert_eq!(kind, SqliteErrorKind::NoSuchTable), other => panic!("unexpected error: {other:?}"), @@ -2129,17 +2677,17 @@ mod tests { "Customers".to_string(), vec![col("id", Type::Serial), col("Name", Type::Text)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.create_table( "Orders".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); - db.add_column("Orders".to_string(), "CustId".to_string(), Type::Int) + db.add_column("Orders".to_string(), "CustId".to_string(), Type::Int, None) .await .unwrap(); } @@ -2157,10 +2705,10 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); - let orders = db.describe_table("Orders".to_string()).await.unwrap(); + let orders = db.describe_table("Orders".to_string(), None).await.unwrap(); assert_eq!(orders.outbound_relationships.len(), 1); let rel = &orders.outbound_relationships[0]; assert_eq!(rel.local_column, "CustId"); @@ -2182,10 +2730,10 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); - let customers = db.describe_table("Customers".to_string()).await.unwrap(); + let customers = db.describe_table("Customers".to_string(), None).await.unwrap(); assert_eq!(customers.inbound_relationships.len(), 1); let rel = &customers.inbound_relationships[0]; assert_eq!(rel.local_column, "id"); @@ -2206,10 +2754,10 @@ mod tests { ReferentialAction::Cascade, ReferentialAction::SetNull, false, - ) + None) .await .unwrap(); - let orders = db.describe_table("Orders".to_string()).await.unwrap(); + let orders = db.describe_table("Orders".to_string(), None).await.unwrap(); let rel = &orders.outbound_relationships[0]; assert_eq!(rel.name, "cust_orders"); assert_eq!(rel.on_delete, ReferentialAction::Cascade); @@ -2224,14 +2772,14 @@ mod tests { "Customers".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.create_table( "Orders".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); @@ -2244,10 +2792,11 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, true, // --create-fk + None, ) .await .unwrap(); - let orders = db.describe_table("Orders".to_string()).await.unwrap(); + let orders = db.describe_table("Orders".to_string(), None).await.unwrap(); // The auto-created FK column has user_type Int (Serial's // fk_target_type), not Serial. let cust = orders @@ -2266,14 +2815,14 @@ mod tests { "Customers".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.create_table( "Orders".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); let err = db @@ -2286,7 +2835,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap_err(); assert!(matches!(err, DbError::Unsupported(_)), "got {err:?}"); @@ -2299,18 +2848,18 @@ mod tests { "Customers".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.create_table( "Orders".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); // Wrong type — text instead of int. - db.add_column("Orders".to_string(), "CustId".to_string(), Type::Text) + db.add_column("Orders".to_string(), "CustId".to_string(), Type::Text, None) .await .unwrap(); @@ -2324,7 +2873,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap_err(); match err { @@ -2344,17 +2893,17 @@ mod tests { "Customers".to_string(), vec![col("id", Type::Serial), col("Name", Type::Text)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.create_table( "Orders".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); - db.add_column("Orders".to_string(), "CustName".to_string(), Type::Text) + db.add_column("Orders".to_string(), "CustName".to_string(), Type::Text, None) .await .unwrap(); let err = db @@ -2367,7 +2916,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap_err(); match err { @@ -2389,16 +2938,16 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); db.drop_relationship(RelationshipSelector::Named { name: "cust_orders".to_string(), - }) + }, None) .await .unwrap(); - let orders = db.describe_table("Orders".to_string()).await.unwrap(); - let customers = db.describe_table("Customers".to_string()).await.unwrap(); + let orders = db.describe_table("Orders".to_string(), None).await.unwrap(); + let customers = db.describe_table("Customers".to_string(), None).await.unwrap(); assert!(orders.outbound_relationships.is_empty()); assert!(customers.inbound_relationships.is_empty()); } @@ -2416,7 +2965,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); db.drop_relationship(RelationshipSelector::Endpoints { @@ -2424,10 +2973,10 @@ mod tests { parent_column: "id".to_string(), child_table: "Orders".to_string(), child_column: "CustId".to_string(), - }) + }, None) .await .unwrap(); - let orders = db.describe_table("Orders".to_string()).await.unwrap(); + let orders = db.describe_table("Orders".to_string(), None).await.unwrap(); assert!(orders.outbound_relationships.is_empty()); } @@ -2444,10 +2993,10 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); - let err = db.drop_table("Customers".to_string()).await.unwrap_err(); + let err = db.drop_table("Customers".to_string(), None).await.unwrap_err(); match err { DbError::Unsupported(msg) => { assert!(msg.contains("referenced by"), "{msg}"); @@ -2469,13 +3018,13 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); // Dropping the child is allowed (no inbound relationships // on Orders) and cleans the metadata. - db.drop_table("Orders".to_string()).await.unwrap(); - let customers = db.describe_table("Customers".to_string()).await.unwrap(); + db.drop_table("Orders".to_string(), None).await.unwrap(); + let customers = db.describe_table("Customers".to_string(), None).await.unwrap(); assert!(customers.inbound_relationships.is_empty()); } @@ -2483,7 +3032,7 @@ mod tests { async fn add_relationship_with_duplicate_name_errors() { let db = db(); customers_orders_setup(&db).await; - db.add_column("Orders".to_string(), "OtherCust".to_string(), Type::Int) + db.add_column("Orders".to_string(), "OtherCust".to_string(), Type::Int, None) .await .unwrap(); db.add_relationship( @@ -2495,7 +3044,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); let err = db @@ -2508,7 +3057,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap_err(); match err { @@ -2535,13 +3084,13 @@ mod tests { ReferentialAction::Cascade, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); // After the rebuild, the original columns are still // present with the right user types, and any extra // metadata (Name on Customers) survives. - let customers = db.describe_table("Customers".to_string()).await.unwrap(); + let customers = db.describe_table("Customers".to_string(), None).await.unwrap(); let names: Vec<&str> = customers.columns.iter().map(|c| c.name.as_str()).collect(); assert_eq!(names, vec!["id", "Name"]); let name_col = customers.columns.iter().find(|c| c.name == "Name").unwrap(); @@ -2555,7 +3104,7 @@ mod tests { "Customers".to_string(), vec![col("id", Type::Serial), col("Name", Type::Text)], vec!["id".to_string()], - ) + None) .await .unwrap(); } @@ -2569,14 +3118,14 @@ mod tests { "Customers".to_string(), None, vec![Value::Text("Alice".to_string())], - ) + None) .await .unwrap(); assert_eq!(result.rows_affected, 1); // The InsertResult itself carries the just-inserted row. assert_eq!(result.data.rows.len(), 1); assert_eq!(result.data.rows[0][1], Some("Alice".to_string())); - let data = db.query_data("Customers".to_string()).await.unwrap(); + let data = db.query_data("Customers".to_string(), None).await.unwrap(); assert_eq!(data.columns, vec!["id".to_string(), "Name".to_string()]); assert_eq!(data.rows.len(), 1); assert_eq!(data.rows[0][1], Some("Alice".to_string())); @@ -2591,17 +3140,17 @@ mod tests { "Tags".to_string(), vec![col("id", Type::ShortId), col("Label", Type::Text)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.insert( "Tags".to_string(), None, vec![Value::Text("database".to_string())], - ) + None) .await .unwrap(); - let data = db.query_data("Tags".to_string()).await.unwrap(); + let data = db.query_data("Tags".to_string(), None).await.unwrap(); let id = data.rows[0][0].as_ref().expect("auto-generated id"); assert!( id.len() >= 10 && id.len() <= 12, @@ -2617,10 +3166,10 @@ mod tests { "Customers".to_string(), Some(vec!["id".to_string(), "Name".to_string()]), vec![Value::Number("99".to_string()), Value::Text("Bob".to_string())], - ) + None) .await .unwrap(); - let data = db.query_data("Customers".to_string()).await.unwrap(); + let data = db.query_data("Customers".to_string(), None).await.unwrap(); assert_eq!(data.rows[0][0], Some("99".to_string())); assert_eq!(data.rows[0][1], Some("Bob".to_string())); } @@ -2635,7 +3184,7 @@ mod tests { "Customers".to_string(), Some(vec!["Name".to_string()]), vec![Value::Number("42".to_string())], - ) + None) .await .unwrap_err(); assert!(matches!(err, DbError::InvalidValue(_)), "got {err:?}"); @@ -2650,7 +3199,7 @@ mod tests { "Customers".to_string(), None, vec![Value::Text(name.to_string())], - ) + None) .await .unwrap(); } @@ -2662,14 +3211,14 @@ mod tests { column: "id".to_string(), value: Value::Number("1".to_string()), }, - ) + None) .await .unwrap(); assert_eq!(result.rows_affected, 1); // The UpdateResult contains only the updated rows. assert_eq!(result.data.rows.len(), 1); assert_eq!(result.data.rows[0][1], Some("Alicia".to_string())); - let data = db.query_data("Customers".to_string()).await.unwrap(); + let data = db.query_data("Customers".to_string(), None).await.unwrap(); assert_eq!(data.rows[0][1], Some("Alicia".to_string())); assert_eq!(data.rows[1][1], Some("Bob".to_string())); } @@ -2683,7 +3232,7 @@ mod tests { "Customers".to_string(), None, vec![Value::Text(name.to_string())], - ) + None) .await .unwrap(); } @@ -2692,7 +3241,7 @@ mod tests { "Customers".to_string(), vec![("Name".to_string(), Value::Text("X".to_string()))], RowFilter::AllRows, - ) + None) .await .unwrap(); assert_eq!(result.rows_affected, 3); @@ -2708,7 +3257,7 @@ mod tests { "Customers".to_string(), None, vec![Value::Text(name.to_string())], - ) + None) .await .unwrap(); } @@ -2719,12 +3268,12 @@ mod tests { column: "id".to_string(), value: Value::Number("1".to_string()), }, - ) + None) .await .unwrap(); assert_eq!(result.rows_affected, 1); assert!(result.cascade.is_empty(), "no children to cascade to"); - let data = db.query_data("Customers".to_string()).await.unwrap(); + let data = db.query_data("Customers".to_string(), None).await.unwrap(); assert_eq!(data.rows.len(), 1); assert_eq!(data.rows[0][1], Some("Bob".to_string())); } @@ -2738,7 +3287,7 @@ mod tests { "Orders".to_string(), vec![col("id", Type::Serial), col("CustId", Type::Int)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.add_relationship( @@ -2750,7 +3299,7 @@ mod tests { ReferentialAction::NoAction, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); @@ -2760,7 +3309,7 @@ mod tests { "Orders".to_string(), Some(vec!["CustId".to_string()]), vec![Value::Number("999".to_string())], - ) + None) .await .unwrap_err(); match err { @@ -2786,7 +3335,7 @@ mod tests { "Orders".to_string(), vec![col("id", Type::Serial), col("CustId", Type::Int)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.add_relationship( @@ -2798,21 +3347,21 @@ mod tests { ReferentialAction::Cascade, ReferentialAction::NoAction, false, - ) + None) .await .unwrap(); db.insert( "Customers".to_string(), None, vec![Value::Text("Alice".to_string())], - ) + None) .await .unwrap(); db.insert( "Orders".to_string(), Some(vec!["CustId".to_string()]), vec![Value::Number("1".to_string())], - ) + None) .await .unwrap(); // Delete Alice — cascades to Orders. @@ -2822,10 +3371,10 @@ mod tests { column: "id".to_string(), value: Value::Number("1".to_string()), }, - ) + None) .await .unwrap(); - let orders = db.query_data("Orders".to_string()).await.unwrap(); + let orders = db.query_data("Orders".to_string(), None).await.unwrap(); assert!(orders.rows.is_empty(), "child rows should be cascaded"); } @@ -2836,24 +3385,24 @@ mod tests { "Flags".to_string(), vec![col("id", Type::Serial), col("Active", Type::Bool)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.insert( "Flags".to_string(), None, vec![Value::Bool(true)], - ) + None) .await .unwrap(); db.insert( "Flags".to_string(), None, vec![Value::Bool(false)], - ) + None) .await .unwrap(); - let data = db.query_data("Flags".to_string()).await.unwrap(); + let data = db.query_data("Flags".to_string(), None).await.unwrap(); assert_eq!(data.rows[0][1], Some("true".to_string())); assert_eq!(data.rows[1][1], Some("false".to_string())); } @@ -2865,17 +3414,17 @@ mod tests { "T".to_string(), vec![col("id", Type::Serial), col("note", Type::Text)], vec!["id".to_string()], - ) + None) .await .unwrap(); db.insert( "T".to_string(), None, vec![Value::Null], - ) + None) .await .unwrap(); - let data = db.query_data("T".to_string()).await.unwrap(); + let data = db.query_data("T".to_string(), None).await.unwrap(); assert_eq!(data.rows[0][1], None); } @@ -2888,12 +3437,12 @@ mod tests { "Order Lines".to_string(), vec![col("id", Type::Serial)], vec!["id".to_string()], - ) + None) .await .unwrap(); let tables = db.list_tables().await.unwrap(); assert_eq!(tables, vec!["Order Lines".to_string()]); - let desc = db.describe_table("Order Lines".to_string()).await.unwrap(); + let desc = db.describe_table("Order Lines".to_string(), None).await.unwrap(); assert_eq!(desc.name, "Order Lines"); } } diff --git a/src/event.rs b/src/event.rs index ad15fda..75f1daf 100644 --- a/src/event.rs +++ b/src/event.rs @@ -47,4 +47,12 @@ pub enum AppEvent { }, /// Refreshed list of tables in the database. TablesRefreshed(Vec), + /// A persistence failure occurred (ADR-0015 §8). The + /// application surfaces a fatal banner and exits cleanly so + /// the message remains above the shell prompt. + PersistenceFatal { + operation: String, + path: std::path::PathBuf, + message: String, + }, } diff --git a/src/lib.rs b/src/lib.rs index 186e364..c196d93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod dsl; pub mod event; pub mod logging; pub mod mode; +pub mod persistence; pub mod project; pub mod runtime; pub mod theme; diff --git a/src/persistence/csv_io.rs b/src/persistence/csv_io.rs new file mode 100644 index 0000000..c336cca --- /dev/null +++ b/src/persistence/csv_io.rs @@ -0,0 +1,303 @@ +//! Per-type CSV writer (ADR-0015 §4). +//! +//! Encoding rules per type are exactly as specified in the +//! ADR; the cell-level encoder lives in `encode_cell`. The +//! `csv` crate handles RFC 4180 quoting around our encoded +//! strings. +//! +//! NULL representation: an empty unquoted field. The `csv` +//! crate's writer emits a non-quoted empty field for an empty +//! string by default; we map `CellValue::Null` to that, and +//! `CellValue::Text(String::new())` to a *quoted* empty +//! field (`""`) by emitting a sentinel that round-trips. +//! +//! For the writer, the trick is: `WriterBuilder::quote_style(QuoteStyle::Necessary)` +//! is the default and quotes only when needed (separator, +//! quote, newline). We handle the empty-string-vs-null +//! distinction manually by always quoting non-null empty +//! Text and never quoting Null. + +use std::io::Write as _; + +use base64::Engine as _; + +use crate::dsl::types::Type; + +use super::{CellValue, TableSnapshot}; + +/// Serialize a `TableSnapshot` to a CSV body. Returns the raw +/// bytes (UTF-8) ready to be written to disk. +pub(super) fn serialize_table(table: &TableSnapshot) -> Result, String> { + // We bypass the `csv` crate for cell-level emission so the + // NULL-vs-empty distinction stays under our control. The + // header and per-line framing are still simple enough to + // emit directly. + let mut out: Vec = Vec::new(); + write_record( + &mut out, + table.columns.iter().map(|c| Cell::Plain(c.name.clone())), + )?; + for row in &table.rows { + if row.len() != table.columns.len() { + return Err(format!( + "row width {} does not match column count {} for table `{}`", + row.len(), + table.columns.len(), + table.name, + )); + } + let mut cells: Vec = Vec::with_capacity(row.len()); + for (col, value) in table.columns.iter().zip(row.iter()) { + cells.push(encode_cell(col.user_type, value)?); + } + write_record(&mut out, cells.into_iter())?; + } + Ok(out) +} + +/// One cell to write. `Plain` is unquoted; `Quoted` is +/// always RFC 4180 double-quoted (used for the empty-string +/// vs NULL distinction). +enum Cell { + Plain(String), + Quoted(String), +} + +/// Emit a record (header or row) to `out`. Adds the trailing +/// `\n` (RFC 4180 says CRLF, but `\n` is universally accepted +/// and matches what every CSV reader on every platform +/// handles cleanly; line endings are deliberately uniform +/// across our generated artefacts). +fn write_record>(out: &mut Vec, cells: I) -> Result<(), String> { + let mut first = true; + for cell in cells { + if !first { + out.push(b','); + } + first = false; + match cell { + Cell::Plain(s) => { + if needs_quoting(&s) { + write_quoted(out, &s); + } else { + out.write_all(s.as_bytes()).map_err(|e| e.to_string())?; + } + } + Cell::Quoted(s) => write_quoted(out, &s), + } + } + out.push(b'\n'); + Ok(()) +} + +fn write_quoted(out: &mut Vec, s: &str) { + out.push(b'"'); + for &b in s.as_bytes() { + if b == b'"' { + out.extend_from_slice(b"\"\""); + } else { + out.push(b); + } + } + out.push(b'"'); +} + +fn needs_quoting(s: &str) -> bool { + s.bytes().any(|b| matches!(b, b',' | b'"' | b'\n' | b'\r')) +} + +/// Encode a single cell per type (ADR-0015 §4 table). Returns +/// the cell wrapped in `Plain` or `Quoted` as appropriate for +/// the NULL/empty distinction. +fn encode_cell(ty: Type, value: &CellValue) -> Result { + if matches!(value, CellValue::Null) { + return Ok(Cell::Plain(String::new())); + } + match ty { + Type::Text => match value { + CellValue::Text(s) if s.is_empty() => Ok(Cell::Quoted(String::new())), + CellValue::Text(s) => Ok(Cell::Plain(s.clone())), + other => Err(format!("expected text, got {other:?}")), + }, + Type::Int => match value { + CellValue::Integer(n) => Ok(Cell::Plain(n.to_string())), + other => Err(format!("expected int, got {other:?}")), + }, + Type::Real => match value { + CellValue::Real(f) => Ok(Cell::Plain(format_real(*f))), + other => Err(format!("expected real, got {other:?}")), + }, + Type::Decimal => match value { + // Decimals are stored as TEXT to preserve precision. + CellValue::Text(s) if s.is_empty() => Ok(Cell::Quoted(String::new())), + CellValue::Text(s) => Ok(Cell::Plain(s.clone())), + other => Err(format!("expected decimal (text), got {other:?}")), + }, + Type::Bool => match value { + CellValue::Integer(0) => Ok(Cell::Plain("false".to_string())), + CellValue::Integer(1) => Ok(Cell::Plain("true".to_string())), + other => Err(format!("expected bool (0 or 1), got {other:?}")), + }, + Type::Date | Type::DateTime => match value { + CellValue::Text(s) if s.is_empty() => Ok(Cell::Quoted(String::new())), + CellValue::Text(s) => Ok(Cell::Plain(s.clone())), + other => Err(format!("expected date/datetime (text), got {other:?}")), + }, + Type::Blob => match value { + CellValue::Blob(bytes) => Ok(Cell::Plain(base64::engine::general_purpose::STANDARD.encode(bytes))), + other => Err(format!("expected blob, got {other:?}")), + }, + Type::Serial => match value { + CellValue::Integer(n) => Ok(Cell::Plain(n.to_string())), + other => Err(format!("expected serial (int), got {other:?}")), + }, + Type::ShortId => match value { + CellValue::Text(s) if s.is_empty() => Ok(Cell::Quoted(String::new())), + CellValue::Text(s) => Ok(Cell::Plain(s.clone())), + other => Err(format!("expected shortid (text), got {other:?}")), + }, + } +} + +fn format_real(f: f64) -> String { + if f.is_nan() { + "nan".to_string() + } else if f.is_infinite() { + if f > 0.0 { "inf".to_string() } else { "-inf".to_string() } + } else { + // Default `{}` formatting on f64 emits a shortest + // round-tripping decimal — exactly what the ADR asks + // for. + format!("{f}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::persistence::ColumnSchema; + + fn col(name: &str, ty: Type) -> ColumnSchema { + ColumnSchema { name: name.to_string(), user_type: ty } + } + + #[test] + fn empty_table_emits_header_only() { + let body = serialize_table(&TableSnapshot { + name: "Customers".to_string(), + columns: vec![col("id", Type::Serial), col("Name", Type::Text)], + rows: vec![], + }) + .unwrap(); + assert_eq!(String::from_utf8(body).unwrap(), "id,Name\n"); + } + + #[test] + fn null_is_empty_unquoted_field() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("Name", Type::Text)], + rows: vec![vec![CellValue::Null]], + }) + .unwrap(); + assert_eq!(String::from_utf8(body).unwrap(), "Name\n\n"); + } + + #[test] + fn empty_string_is_double_quoted() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("Name", Type::Text)], + rows: vec![vec![CellValue::Text(String::new())]], + }) + .unwrap(); + assert_eq!(String::from_utf8(body).unwrap(), "Name\n\"\"\n"); + } + + #[test] + fn text_with_comma_or_quote_is_rfc4180_quoted() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("Name", Type::Text)], + rows: vec![ + vec![CellValue::Text("hello, world".to_string())], + vec![CellValue::Text("she said \"hi\"".to_string())], + ], + }) + .unwrap(); + let s = String::from_utf8(body).unwrap(); + assert!(s.contains("\"hello, world\"")); + assert!(s.contains("\"she said \"\"hi\"\"\"")); + } + + #[test] + fn ints_and_reals_round_trip_simply() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("n", Type::Int), col("r", Type::Real)], + rows: vec![ + vec![CellValue::Integer(42), CellValue::Real(std::f64::consts::PI)], + vec![CellValue::Integer(-7), CellValue::Real(0.0)], + ], + }) + .unwrap(); + let s = String::from_utf8(body).unwrap(); + let lines: Vec<&str> = s.trim_end().lines().collect(); + assert_eq!(lines[0], "n,r"); + assert!(lines[1].starts_with("42,")); + assert_eq!(lines[2], "-7,0"); + } + + #[test] + fn bools_use_words_not_digits() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("b", Type::Bool)], + rows: vec![ + vec![CellValue::Integer(1)], + vec![CellValue::Integer(0)], + ], + }) + .unwrap(); + let s = String::from_utf8(body).unwrap(); + assert_eq!(s, "b\ntrue\nfalse\n"); + } + + #[test] + fn blobs_use_base64() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("blob", Type::Blob)], + rows: vec![vec![CellValue::Blob(b"hello".to_vec())]], + }) + .unwrap(); + let s = String::from_utf8(body).unwrap(); + assert!(s.contains("aGVsbG8=")); + } + + #[test] + fn dates_and_datetimes_pass_through() { + let body = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("d", Type::Date), col("ts", Type::DateTime)], + rows: vec![vec![ + CellValue::Text("2026-05-07".to_string()), + CellValue::Text("2026-05-07T14:30:12Z".to_string()), + ]], + }) + .unwrap(); + let s = String::from_utf8(body).unwrap(); + assert!(s.contains("2026-05-07,2026-05-07T14:30:12Z")); + } + + #[test] + fn row_width_mismatch_errors() { + let err = serialize_table(&TableSnapshot { + name: "T".to_string(), + columns: vec![col("a", Type::Int), col("b", Type::Int)], + rows: vec![vec![CellValue::Integer(1)]], + }) + .unwrap_err(); + assert!(err.contains("row width")); + } +} diff --git a/src/persistence/history.rs b/src/persistence/history.rs new file mode 100644 index 0000000..d5bfc6a --- /dev/null +++ b/src/persistence/history.rs @@ -0,0 +1,160 @@ +//! Append-only `history.log` writer (ADR-0015 §5). +//! +//! Format: one record per line, three pipe-separated fields: +//! +//! ```text +//! 2026-05-07T14:30:12Z|ok|create table Customers with pk id:serial +//! ``` +//! +//! Status is always `ok` in v1; failed commands are not +//! recorded. The status field is kept in the line shape so +//! future use cases can carry additional values without a +//! format break. +//! +//! Newlines inside the command (which do not yet appear, but +//! will when multi-line input I1 lands) are escaped to a +//! literal `\n`. + +use std::fs::OpenOptions; +use std::io::Write as _; +use std::path::Path; +use std::time::{SystemTime, UNIX_EPOCH}; + +use super::PersistenceError; + +/// Format a single log record. Pure; no I/O. +pub(super) fn format_record(command_text: &str, timestamp_iso: String) -> String { + let escaped = escape_command(command_text); + format!("{timestamp_iso}|ok|{escaped}\n") +} + +/// Append `line` (which already ends in `\n`) to the file at +/// `path`. Creates the file if it doesn't exist. fsyncs after +/// the write so a power-cut doesn't lose the latest entry. +pub(super) fn append(path: &Path, line: &str) -> Result<(), PersistenceError> { + let mut f = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .map_err(|source| PersistenceError::Io { + operation: "open", + path: path.to_path_buf(), + source, + })?; + f.write_all(line.as_bytes()) + .map_err(|source| PersistenceError::Io { + operation: "write", + path: path.to_path_buf(), + source, + })?; + f.sync_all().map_err(|source| PersistenceError::Io { + operation: "fsync", + path: path.to_path_buf(), + source, + })?; + Ok(()) +} + +fn escape_command(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for c in s.chars() { + match c { + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\\' => out.push_str("\\\\"), + _ => out.push(c), + } + } + out +} + +/// Current UTC time as ISO-8601 with second precision and a +/// `Z` suffix. +pub(super) fn utc_iso8601_now() -> String { + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + iso8601_from_unix_secs(secs) +} + +fn iso8601_from_unix_secs(secs: i64) -> String { + let day_secs = secs.rem_euclid(86_400); + let h = day_secs / 3600; + let m = (day_secs % 3600) / 60; + let s = day_secs % 60; + let (y, mo, d) = ymd_from_unix_secs(secs); + format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z") +} + +const fn ymd_from_unix_secs(secs: i64) -> (u32, u32, u32) { + let days = secs.div_euclid(86_400); + let z = days + 719_468; + let era = if z >= 0 { z } else { z - 146_096 } / 146_097; + let doe = (z - era * 146_097) as u64; + let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; + let y = yoe as i64 + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let d = doy - (153 * mp + 2) / 5 + 1; + let m = if mp < 10 { mp + 3 } else { mp - 9 }; + let y = if m <= 2 { y + 1 } else { y }; + (y as u32, m as u32, d as u32) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + #[test] + fn record_format() { + let line = format_record( + "create table Customers with pk id:serial", + "2026-05-07T14:30:12Z".to_string(), + ); + assert_eq!( + line, + "2026-05-07T14:30:12Z|ok|create table Customers with pk id:serial\n", + ); + } + + #[test] + fn newlines_in_command_are_escaped() { + let line = format_record("foo\nbar", "T".to_string()); + assert_eq!(line, "T|ok|foo\\nbar\n"); + } + + #[test] + fn backslash_is_escaped() { + let line = format_record("a\\b", "T".to_string()); + assert_eq!(line, "T|ok|a\\\\b\n"); + } + + #[test] + fn iso8601_format_is_well_formed() { + let s = utc_iso8601_now(); + // YYYY-MM-DDTHH:MM:SSZ has length 20. + assert_eq!(s.len(), 20); + assert!(s.ends_with('Z')); + assert_eq!(&s[4..5], "-"); + assert_eq!(&s[10..11], "T"); + } + + #[test] + fn iso8601_known_seconds() { + assert_eq!(iso8601_from_unix_secs(0), "1970-01-01T00:00:00Z"); + assert_eq!(iso8601_from_unix_secs(1_778_112_000), "2026-05-07T00:00:00Z"); + } + + #[test] + fn append_creates_and_grows_file() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("history.log"); + + append(&path, "first|ok|a\n").unwrap(); + append(&path, "second|ok|b\n").unwrap(); + let body = fs::read_to_string(&path).unwrap(); + assert_eq!(body, "first|ok|a\nsecond|ok|b\n"); + } +} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs new file mode 100644 index 0000000..b6823da --- /dev/null +++ b/src/persistence/mod.rs @@ -0,0 +1,323 @@ +//! Per-command persistence to `project.yaml`, `data/*.csv`, +//! and `history.log` (ADR-0015 §3–§6). +//! +//! Iteration 2 wiring: every successful user command, after +//! its SQLite mutations are staged but before the transaction +//! commits, asks `Persistence` to write the affected text +//! targets atomically (write-temp + fsync + rename). The +//! commit-db-last ordering (ADR-0015 §6) is enforced in +//! `db.rs`; this module owns the file-format details and the +//! atomic-write primitive. +//! +//! Failure semantics: any write or rename failure produces a +//! `PersistenceError`. The caller (the db worker) is +//! responsible for translating that into a fatal error and +//! letting the SQLite tx roll back. + +use std::fs; +use std::io::Write as _; +use std::path::{Path, PathBuf}; + +use crate::dsl::action::ReferentialAction; +use crate::dsl::types::Type; +use crate::project::{DATA_DIR, HISTORY_LOG, PROJECT_YAML}; + +mod csv_io; +mod history; +mod yaml; + +/// Owns persistence to a single project on disk. Cheap to +/// move; the db worker holds one instance for its lifetime. +#[derive(Debug, Clone)] +pub struct Persistence { + project_path: PathBuf, +} + +#[derive(Debug, thiserror::Error)] +pub enum PersistenceError { + #[error("could not {operation} `{path}`: {source}")] + Io { + operation: &'static str, + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("could not encode {kind} for `{path}`: {message}")] + Encode { + kind: &'static str, + path: PathBuf, + message: String, + }, +} + +impl PersistenceError { + /// Path the failure was associated with. + #[must_use] + pub fn path(&self) -> &Path { + match self { + Self::Io { path, .. } | Self::Encode { path, .. } => path, + } + } + + /// Short label for the kind of operation that failed, + /// suitable for the fatal banner. + #[must_use] + pub const fn operation(&self) -> &'static str { + match self { + Self::Io { operation, .. } => operation, + Self::Encode { .. } => "encode", + } + } +} + +/// Snapshot of the full schema as it is to be written to +/// `project.yaml`. +/// +/// Read from the database after the in-flight mutation has +/// staged its changes (within the same SQLite tx) so the YAML +/// reflects the post-mutation state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SchemaSnapshot { + pub created_at: String, + pub tables: Vec, + pub relationships: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableSchema { + pub name: String, + pub primary_key: Vec, + pub columns: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ColumnSchema { + pub name: String, + pub user_type: Type, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RelationshipSchema { + pub name: String, + pub parent_table: String, + pub parent_column: String, + pub child_table: String, + pub child_column: String, + pub on_delete: ReferentialAction, + pub on_update: ReferentialAction, +} + +/// Snapshot of one table's full row data, for writing +/// `data/
.csv`. The column order matches the table's +/// declaration order; the row tuples are aligned to it. +#[derive(Debug, Clone, PartialEq)] +pub struct TableSnapshot { + pub name: String, + pub columns: Vec, + pub rows: Vec>, +} + +/// A scalar cell value, in the small ADT understood by the +/// CSV encoder. +/// +/// `Null` and `Text("")` are distinct. `Eq` is intentionally +/// NOT derived because `Real(f64)` does not satisfy it (NaN); +/// use `PartialEq` for comparison. +#[derive(Debug, Clone, PartialEq)] +pub enum CellValue { + Null, + Integer(i64), + Real(f64), + Text(String), + Blob(Vec), +} + +impl Persistence { + #[must_use] + pub const fn new(project_path: PathBuf) -> Self { + Self { project_path } + } + + /// Project root directory. Used in tests and diagnostics. + #[must_use] + pub fn project_path(&self) -> &Path { + &self.project_path + } + + /// Write `project.yaml` from a full schema snapshot. + /// Atomic: writes to `project.yaml.tmp`, fsyncs, then + /// renames over the destination. + pub fn write_schema(&self, schema: &SchemaSnapshot) -> Result<(), PersistenceError> { + let body = yaml::serialize_schema(schema); + atomic_write(&self.project_path.join(PROJECT_YAML), body.as_bytes()) + } + + /// Write `data/
.csv` from a table snapshot. Atomic + /// per file. Creates the `data/` directory if missing + /// (tolerant of fresh projects). + pub fn write_table_data(&self, table: &TableSnapshot) -> Result<(), PersistenceError> { + let data_dir = self.project_path.join(DATA_DIR); + fs::create_dir_all(&data_dir).map_err(|source| PersistenceError::Io { + operation: "create", + path: data_dir.clone(), + source, + })?; + let body = + csv_io::serialize_table(table).map_err(|message| PersistenceError::Encode { + kind: "CSV", + path: data_dir.join(format!("{}.csv", table.name)), + message, + })?; + atomic_write(&data_dir.join(format!("{}.csv", table.name)), &body) + } + + /// Remove `data/
.csv` if present. Used when a + /// table is dropped so stale CSVs don't linger. + pub fn delete_table_data(&self, table_name: &str) -> Result<(), PersistenceError> { + let path = self + .project_path + .join(DATA_DIR) + .join(format!("{table_name}.csv")); + match fs::remove_file(&path) { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(source) => Err(PersistenceError::Io { + operation: "delete", + path, + source, + }), + } + } + + /// Append one record to `history.log`. + pub fn append_history(&self, command_text: &str) -> Result<(), PersistenceError> { + let path = self.project_path.join(HISTORY_LOG); + let line = history::format_record(command_text, history::utc_iso8601_now()); + history::append(&path, &line) + } +} + +/// Write `body` to `path` atomically via temp file + fsync + +/// rename. The temp file is named `.tmp` in the same +/// directory so the rename stays on the same filesystem. +fn atomic_write(path: &Path, body: &[u8]) -> Result<(), PersistenceError> { + let tmp_path = path.with_extension(extension_with_tmp(path)); + { + let mut tmp = fs::File::create(&tmp_path).map_err(|source| PersistenceError::Io { + operation: "create", + path: tmp_path.clone(), + source, + })?; + tmp.write_all(body).map_err(|source| PersistenceError::Io { + operation: "write", + path: tmp_path.clone(), + source, + })?; + tmp.sync_all().map_err(|source| PersistenceError::Io { + operation: "fsync", + path: tmp_path.clone(), + source, + })?; + } + fs::rename(&tmp_path, path).map_err(|source| PersistenceError::Io { + operation: "rename", + path: path.to_path_buf(), + source, + })?; + Ok(()) +} + +/// Build the `.tmp` extension for a path. +/// +/// If the path already has an extension (`project.yaml`), the +/// tmp variant is `project.yaml.tmp`. If the path has no +/// extension, the extension becomes plain `tmp`. +fn extension_with_tmp(path: &Path) -> String { + path.extension().map_or_else( + || "tmp".to_string(), + |ext| format!("{}.tmp", ext.to_string_lossy()), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn tempdir() -> tempfile::TempDir { + tempfile::tempdir().expect("create tempdir") + } + + #[test] + fn extension_with_tmp_appends_to_existing_extension() { + assert_eq!(extension_with_tmp(Path::new("a/b/project.yaml")), "yaml.tmp"); + assert_eq!(extension_with_tmp(Path::new("a/b/Customers.csv")), "csv.tmp"); + assert_eq!(extension_with_tmp(Path::new("a/b/lockfile")), "tmp"); + } + + #[test] + fn atomic_write_roundtrips() { + let dir = tempdir(); + let target = dir.path().join("file.txt"); + atomic_write(&target, b"hello\n").unwrap(); + assert_eq!(fs::read_to_string(&target).unwrap(), "hello\n"); + + // Calling again replaces atomically — no .tmp left behind. + atomic_write(&target, b"world\n").unwrap(); + assert_eq!(fs::read_to_string(&target).unwrap(), "world\n"); + assert!(!target.with_extension("txt.tmp").exists()); + } + + #[test] + fn write_schema_writes_yaml() { + let dir = tempdir(); + let p = Persistence::new(dir.path().to_path_buf()); + let schema = SchemaSnapshot { + created_at: "2026-05-07T14:30:12Z".to_string(), + tables: vec![], + relationships: vec![], + }; + p.write_schema(&schema).unwrap(); + let body = fs::read_to_string(dir.path().join(PROJECT_YAML)).unwrap(); + assert!(body.contains("version: 1")); + assert!(body.contains("created_at:")); + } + + #[test] + fn write_and_delete_table_data() { + let dir = tempdir(); + let p = Persistence::new(dir.path().to_path_buf()); + let table = TableSnapshot { + name: "Customers".to_string(), + columns: vec![ColumnSchema { + name: "Name".to_string(), + user_type: Type::Text, + }], + rows: vec![vec![CellValue::Text("Alice".to_string())]], + }; + p.write_table_data(&table).unwrap(); + let csv_path = dir.path().join(DATA_DIR).join("Customers.csv"); + assert!(csv_path.exists()); + let body = fs::read_to_string(&csv_path).unwrap(); + assert!(body.contains("Name")); + assert!(body.contains("Alice")); + + p.delete_table_data("Customers").unwrap(); + assert!(!csv_path.exists()); + + // Idempotent on a missing file. + p.delete_table_data("Customers").unwrap(); + } + + #[test] + fn append_history_creates_and_appends() { + let dir = tempdir(); + let p = Persistence::new(dir.path().to_path_buf()); + p.append_history("create table Foo with pk id:serial").unwrap(); + p.append_history("insert into Foo (1)").unwrap(); + let body = fs::read_to_string(dir.path().join(HISTORY_LOG)).unwrap(); + let lines: Vec<&str> = body.trim_end().lines().collect(); + assert_eq!(lines.len(), 2); + assert!(lines[0].ends_with("|ok|create table Foo with pk id:serial")); + assert!(lines[1].ends_with("|ok|insert into Foo (1)")); + } +} diff --git a/src/persistence/yaml.rs b/src/persistence/yaml.rs new file mode 100644 index 0000000..12b62c5 --- /dev/null +++ b/src/persistence/yaml.rs @@ -0,0 +1,254 @@ +//! Hand-rolled `project.yaml` writer (ADR-0015 §3). +//! +//! The schema YAML uses a small, fixed set of structures — +//! tables, columns, relationships — and the values it carries +//! are all known-safe (identifiers from the DSL, types from +//! the fixed `Type` enum, action names from `ReferentialAction`). +//! Hand-rolling the writer avoids pulling a YAML serializer +//! dep just for this file. The reader (Iteration 3) will use +//! a real YAML parser. + +use std::fmt::Write as _; + +use crate::dsl::action::ReferentialAction; + +use super::{ColumnSchema, RelationshipSchema, SchemaSnapshot, TableSchema}; + +/// Serialize a `SchemaSnapshot` to a `project.yaml` body. +#[must_use] +pub(super) fn serialize_schema(schema: &SchemaSnapshot) -> String { + let mut out = String::new(); + let _ = writeln!(out, "version: 1"); + let _ = writeln!(out, "project:"); + let _ = writeln!(out, " created_at: {}", quote_if_needed(&schema.created_at)); + + if schema.tables.is_empty() { + let _ = writeln!(out, "tables: []"); + } else { + let _ = writeln!(out, "tables:"); + for table in &schema.tables { + write_table(&mut out, table); + } + } + + if schema.relationships.is_empty() { + let _ = writeln!(out, "relationships: []"); + } else { + let _ = writeln!(out, "relationships:"); + for rel in &schema.relationships { + write_relationship(&mut out, rel); + } + } + + out +} + +fn write_table(out: &mut String, table: &TableSchema) { + let _ = writeln!(out, " - name: {}", quote_if_needed(&table.name)); + write!(out, " primary_key: [").unwrap(); + for (i, key) in table.primary_key.iter().enumerate() { + if i > 0 { + out.push_str(", "); + } + out.push_str("e_if_needed(key)); + } + let _ = writeln!(out, "]"); + let _ = writeln!(out, " columns:"); + for col in &table.columns { + write_column(out, col); + } +} + +fn write_column(out: &mut String, col: &ColumnSchema) { + let _ = writeln!( + out, + " - {{ name: {}, type: {} }}", + quote_if_needed(&col.name), + col.user_type.keyword(), + ); +} + +fn write_relationship(out: &mut String, rel: &RelationshipSchema) { + let _ = writeln!(out, " - name: {}", quote_if_needed(&rel.name)); + let _ = writeln!( + out, + " parent: {{ table: {}, column: {} }}", + quote_if_needed(&rel.parent_table), + quote_if_needed(&rel.parent_column), + ); + let _ = writeln!( + out, + " child: {{ table: {}, column: {} }}", + quote_if_needed(&rel.child_table), + quote_if_needed(&rel.child_column), + ); + let _ = writeln!(out, " on_delete: {}", action_keyword(rel.on_delete)); + let _ = writeln!(out, " on_update: {}", action_keyword(rel.on_update)); +} + +const fn action_keyword(action: ReferentialAction) -> &'static str { + match action { + ReferentialAction::NoAction => "no_action", + ReferentialAction::Restrict => "restrict", + ReferentialAction::SetNull => "set_null", + ReferentialAction::Cascade => "cascade", + } +} + +/// Quote a string for safe inclusion as a YAML scalar. +/// +/// We're conservative: anything not made of safe characters +/// (alphanumerics, `_`, `-`, `:` for ISO timestamps, `.`) +/// gets double-quoted with `"` and `\` escaped. Common +/// identifiers from the DSL (which restricts to alnum + `_`) +/// pass through unquoted, which keeps the YAML pleasantly +/// readable. +fn quote_if_needed(s: &str) -> String { + if needs_quoting(s) { + let mut out = String::with_capacity(s.len() + 2); + out.push('"'); + for c in s.chars() { + match c { + '"' => out.push_str("\\\""), + '\\' => out.push_str("\\\\"), + '\n' => out.push_str("\\n"), + _ => out.push(c), + } + } + out.push('"'); + out + } else { + s.to_string() + } +} + +fn needs_quoting(s: &str) -> bool { + if s.is_empty() { + return true; + } + // YAML reserves several leading characters and the empty + // string. Be defensive on anything outside the safe set. + let first = s.chars().next().unwrap(); + if !is_safe_yaml_char(first) || first == '-' { + return true; + } + // Scalar text that looks like a YAML keyword needs quoting + // even if every character is safe. + if matches!(s, "true" | "false" | "null" | "~" | "yes" | "no" | "on" | "off") { + return true; + } + s.chars().any(|c| !is_safe_yaml_char(c)) +} + +const fn is_safe_yaml_char(c: char) -> bool { + c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.' | ':') +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dsl::types::Type; + + fn snapshot() -> SchemaSnapshot { + SchemaSnapshot { + created_at: "2026-05-07T14:30:12Z".to_string(), + tables: vec![ + TableSchema { + name: "Customers".to_string(), + primary_key: vec!["id".to_string()], + columns: vec![ + ColumnSchema { name: "id".to_string(), user_type: Type::Serial }, + ColumnSchema { name: "Name".to_string(), user_type: Type::Text }, + ], + }, + TableSchema { + name: "Orders".to_string(), + primary_key: vec!["id".to_string()], + columns: vec![ + ColumnSchema { name: "id".to_string(), user_type: Type::Serial }, + ColumnSchema { name: "CustId".to_string(), user_type: Type::Int }, + ], + }, + ], + relationships: vec![RelationshipSchema { + name: "Customers_id_to_Orders_CustId".to_string(), + parent_table: "Customers".to_string(), + parent_column: "id".to_string(), + child_table: "Orders".to_string(), + child_column: "CustId".to_string(), + on_delete: ReferentialAction::Cascade, + on_update: ReferentialAction::NoAction, + }], + } + } + + #[test] + fn writes_expected_yaml_for_full_schema() { + let body = serialize_schema(&snapshot()); + // Spot-check structural lines rather than asserting on + // the whole blob — easier to read in failure output. + assert!(body.contains("version: 1")); + assert!(body.contains("created_at: 2026-05-07T14:30:12Z")); + assert!(body.contains("- name: Customers")); + assert!(body.contains("primary_key: [id]")); + assert!(body.contains("{ name: id, type: serial }")); + assert!(body.contains("{ name: Name, type: text }")); + assert!(body.contains("- name: Customers_id_to_Orders_CustId")); + assert!(body.contains("parent: { table: Customers, column: id }")); + assert!(body.contains("child: { table: Orders, column: CustId }")); + assert!(body.contains("on_delete: cascade")); + assert!(body.contains("on_update: no_action")); + } + + #[test] + fn empty_lists_use_inline_brackets() { + let body = serialize_schema(&SchemaSnapshot { + created_at: "2026-05-07T14:30:12Z".to_string(), + tables: vec![], + relationships: vec![], + }); + assert!(body.contains("tables: []")); + assert!(body.contains("relationships: []")); + } + + #[test] + fn quotes_yaml_keywords_used_as_identifiers() { + let body = serialize_schema(&SchemaSnapshot { + created_at: "2026-05-07T14:30:12Z".to_string(), + tables: vec![TableSchema { + name: "true".to_string(), // reserved keyword + primary_key: vec!["id".to_string()], + columns: vec![ColumnSchema { + name: "yes".to_string(), + user_type: Type::Bool, + }], + }], + relationships: vec![], + }); + assert!(body.contains("- name: \"true\"")); + assert!(body.contains("{ name: \"yes\", type: bool }")); + } + + #[test] + fn quotes_strings_with_unsafe_characters() { + assert_eq!(quote_if_needed("My Project"), "\"My Project\""); + assert_eq!(quote_if_needed("with\"quote"), "\"with\\\"quote\""); + } + + #[test] + fn preserves_compound_primary_key_order() { + let body = serialize_schema(&SchemaSnapshot { + created_at: "2026-05-07T14:30:12Z".to_string(), + tables: vec![TableSchema { + name: "Items".to_string(), + primary_key: vec!["a".to_string(), "b".to_string()], + columns: vec![ + ColumnSchema { name: "a".to_string(), user_type: Type::Int }, + ColumnSchema { name: "b".to_string(), user_type: Type::Int }, + ], + }], + relationships: vec![], + }); + assert!(body.contains("primary_key: [a, b]")); + } +} diff --git a/src/runtime.rs b/src/runtime.rs index a79f85f..fe6195b 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -46,7 +46,9 @@ pub async fn run(args: Args) -> Result<()> { .context("open or create project")?; let db_path = project.db_path(); let display_name = project.display_name().to_string(); - let database = Database::open(db_path.as_path()).context("open database")?; + let persistence = crate::persistence::Persistence::new(project.path().to_path_buf()); + let database = Database::open_with_persistence(db_path.as_path(), persistence) + .context("open database")?; let mut terminal = setup_terminal().context("setup terminal")?; let result = run_loop(&mut terminal, args.theme, database, display_name).await; @@ -57,7 +59,14 @@ pub async fn run(args: Args) -> Result<()> { // `project` (and the lock it holds) is dropped here, releasing // the lock file *after* the terminal has been restored. drop(project); - result + + // ADR-0015 §8: a fatal persistence failure makes its + // banner visible above the shell prompt by writing to + // stderr after the alternate screen has been left. + if let Ok(Some(banner)) = &result { + eprintln!("{banner}"); + } + result.map(|_| ()) } async fn run_loop( @@ -65,7 +74,7 @@ async fn run_loop( theme: Theme, database: Database, project_display_name: String, -) -> Result<()> { +) -> Result> { let (event_tx, mut event_rx) = mpsc::channel::(EVENT_CHANNEL_CAPACITY); let reader_handle = spawn_event_reader(event_tx.clone()); @@ -92,8 +101,8 @@ async fn run_loop( debug!("quit action received"); should_quit = true; } - Action::ExecuteDsl(command) => { - spawn_dsl_dispatch(database.clone(), event_tx.clone(), command); + Action::ExecuteDsl { command, source } => { + spawn_dsl_dispatch(database.clone(), event_tx.clone(), command, source); } } } @@ -108,7 +117,7 @@ async fn run_loop( let _ = tokio::time::timeout(SHUTDOWN_GRACE, reader_handle).await; info!("event loop exited"); - Ok(()) + Ok(app.fatal_message.clone()) } async fn seed_initial_tables(database: &Database, event_tx: &mpsc::Sender) { @@ -128,9 +137,10 @@ fn spawn_dsl_dispatch( database: Database, event_tx: mpsc::Sender, command: Command, + source: String, ) { tokio::spawn(async move { - let outcome = execute_command(&database, command.clone()).await; + let outcome = execute_command_typed(&database, command.clone(), source).await; let event = match outcome { Ok(CommandOutcome::Schema(description)) => AppEvent::DslSucceeded { command: command.clone(), @@ -152,9 +162,18 @@ fn spawn_dsl_dispatch( command: command.clone(), result, }, + Err(DbError::PersistenceFatal { + operation, + path, + message, + }) => AppEvent::PersistenceFatal { + operation: operation.to_string(), + path, + message, + }, Err(error) => AppEvent::DslFailed { command: command.clone(), - error, + error: error.friendly_message(), }, }; if event_tx.send(event).await.is_err() { @@ -181,30 +200,33 @@ enum CommandOutcome { Delete(DeleteResult), } -async fn execute_command( +/// Execute a parsed user command and return either a typed +/// `CommandOutcome` or the raw `DbError`. Keeping the typed +/// error here lets us distinguish persistence-fatal failures +/// from ordinary user errors at dispatch time (ADR-0015 §8). +async fn execute_command_typed( database: &Database, command: Command, -) -> Result { + source: String, +) -> Result { + let src = Some(source); match command { Command::CreateTable { name, columns, primary_key, } => database - .create_table(name, columns, primary_key) + .create_table(name, columns, primary_key, src) .await - .map(|d| CommandOutcome::Schema(Some(d))) - .map_err(friendly), + .map(|d| CommandOutcome::Schema(Some(d))), Command::DropTable { name } => database - .drop_table(name) + .drop_table(name, src) .await - .map(|()| CommandOutcome::Schema(None)) - .map_err(friendly), + .map(|()| CommandOutcome::Schema(None)), Command::AddColumn { table, column, ty } => database - .add_column(table, column, ty) + .add_column(table, column, ty, src) .await - .map(|d| CommandOutcome::Schema(Some(d))) - .map_err(friendly), + .map(|d| CommandOutcome::Schema(Some(d))), Command::AddRelationship { name, parent_table, @@ -224,55 +246,45 @@ async fn execute_command( on_delete, on_update, create_fk, + src, ) .await - .map(|d| CommandOutcome::Schema(Some(d))) - .map_err(friendly), + .map(|d| CommandOutcome::Schema(Some(d))), Command::DropRelationship { selector } => database - .drop_relationship(selector) + .drop_relationship(selector, src) .await - .map(CommandOutcome::Schema) - .map_err(friendly), + .map(CommandOutcome::Schema), Command::ShowTable { name } => database - .describe_table(name) + .describe_table(name, src) .await - .map(|d| CommandOutcome::Schema(Some(d))) - .map_err(friendly), + .map(|d| CommandOutcome::Schema(Some(d))), Command::Insert { table, columns, values, } => database - .insert(table, columns, values) + .insert(table, columns, values, src) .await - .map(CommandOutcome::Insert) - .map_err(friendly), + .map(CommandOutcome::Insert), Command::Update { table, assignments, filter, } => database - .update(table, assignments, filter) + .update(table, assignments, filter, src) .await - .map(CommandOutcome::Update) - .map_err(friendly), + .map(CommandOutcome::Update), Command::Delete { table, filter } => database - .delete(table, filter) + .delete(table, filter, src) .await - .map(CommandOutcome::Delete) - .map_err(friendly), + .map(CommandOutcome::Delete), Command::ShowData { name } => database - .query_data(name) + .query_data(name, src) .await - .map(CommandOutcome::Query) - .map_err(friendly), + .map(CommandOutcome::Query), } } -fn friendly(err: DbError) -> String { - err.friendly_message() -} - fn spawn_event_reader(tx: mpsc::Sender) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut stream = EventStream::new(); diff --git a/tests/iteration2_persistence.rs b/tests/iteration2_persistence.rs new file mode 100644 index 0000000..3d1b7b6 --- /dev/null +++ b/tests/iteration2_persistence.rs @@ -0,0 +1,371 @@ +//! Iteration-2 integration tests: per-command write-through +//! to `project.yaml`, `data/
.csv`, and `history.log` +//! (ADR-0015 §3-§6). +//! +//! These tests exercise the full path from +//! `Database::open_with_persistence` through a successful +//! command into the on-disk text targets. They use +//! `Database::open_with_persistence(...)` so the worker +//! thread runs the persistence callbacks the runtime would. + +use std::fs; +use std::path::Path; + +use rdbms_playground::db::Database; +use rdbms_playground::dsl::{ColumnSpec, ReferentialAction, RowFilter, Type, Value}; +use rdbms_playground::persistence::Persistence; +use rdbms_playground::project::{ + self, DATA_DIR, HISTORY_LOG, PROJECT_YAML, +}; + +fn tempdir() -> tempfile::TempDir { + tempfile::tempdir().expect("create tempdir") +} + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio rt") +} + +/// Open a project under a fresh data root and return the +/// `Database` (with persistence wired) plus the path so the +/// test can inspect on-disk state. The project is held alive +/// implicitly via the leaked `TempDir` returned alongside. +fn open_project( + data: &tempfile::TempDir, +) -> (project::Project, Database, std::path::PathBuf) { + let project = project::open_or_create(None, Some(data.path())).expect("open project"); + let path = project.path().to_path_buf(); + let persistence = Persistence::new(path.clone()); + let db = Database::open_with_persistence(project.db_path(), persistence) + .expect("open db with persistence"); + (project, db, path) +} + +fn read_history(project_path: &Path) -> Vec { + let body = fs::read_to_string(project_path.join(HISTORY_LOG)).unwrap_or_default(); + body.lines().map(str::to_string).collect() +} + +fn read_yaml(project_path: &Path) -> String { + fs::read_to_string(project_path.join(PROJECT_YAML)).expect("project.yaml") +} + +fn read_csv(project_path: &Path, table: &str) -> Option { + fs::read_to_string(project_path.join(DATA_DIR).join(format!("{table}.csv"))).ok() +} + +#[test] +fn create_table_writes_yaml_and_history() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ + ColumnSpec { name: "id".to_string(), ty: Type::Serial }, + ColumnSpec { name: "Name".to_string(), ty: Type::Text }, + ], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .unwrap(); + }); + + let yaml = read_yaml(&path); + assert!(yaml.contains("- name: Customers"), "yaml missing table:\n{yaml}"); + assert!(yaml.contains("primary_key: [id]"), "yaml: {yaml}"); + assert!(yaml.contains("type: serial"), "yaml: {yaml}"); + assert!(yaml.contains("type: text"), "yaml: {yaml}"); + + let history = read_history(&path); + assert_eq!(history.len(), 1, "expected one history line; got {history:?}"); + assert!(history[0].ends_with("|ok|create table Customers with pk id:serial")); +} + +#[test] +fn insert_writes_csv_and_history() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ + ColumnSpec { name: "id".to_string(), ty: Type::Serial }, + ColumnSpec { name: "Name".to_string(), ty: Type::Text }, + ], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .unwrap(); + db.insert( + "Customers".to_string(), + None, + vec![Value::Text("Alice".to_string())], + Some("insert into Customers ('Alice')".to_string()), + ) + .await + .unwrap(); + }); + + let csv = read_csv(&path, "Customers").expect("Customers.csv missing"); + let lines: Vec<&str> = csv.trim_end().lines().collect(); + assert_eq!(lines[0], "id,Name"); + assert_eq!(lines[1], "1,Alice"); + + let history = read_history(&path); + assert!( + history.iter().any(|l| l.ends_with("|ok|insert into Customers ('Alice')")), + "history missing insert: {history:?}", + ); +} + +#[test] +fn drop_table_removes_its_csv() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial }], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .unwrap(); + db.insert( + "Customers".to_string(), + Some(vec!["id".to_string()]), + vec![Value::Number("42".to_string())], + Some("insert into Customers (id) values (42)".to_string()), + ) + .await + .unwrap(); + // The CSV exists before drop. + assert!(read_csv(&path, "Customers").is_some()); + + db.drop_table( + "Customers".to_string(), + Some("drop table Customers".to_string()), + ) + .await + .unwrap(); + }); + + assert!(read_csv(&path, "Customers").is_none(), "CSV should be deleted"); + let yaml = read_yaml(&path); + assert!(!yaml.contains("- name: Customers"), "table should be gone from yaml:\n{yaml}"); +} + +#[test] +fn delete_with_cascade_rewrites_both_csvs() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial }], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .unwrap(); + db.create_table( + "Orders".to_string(), + vec![ + ColumnSpec { name: "id".to_string(), ty: Type::Serial }, + ColumnSpec { name: "CustId".to_string(), ty: Type::Int }, + ], + vec!["id".to_string()], + Some("create table Orders with pk id:serial, CustId:int".to_string()), + ) + .await + .unwrap(); + db.add_relationship( + None, + "Customers".to_string(), + "id".to_string(), + "Orders".to_string(), + "CustId".to_string(), + ReferentialAction::Cascade, + ReferentialAction::NoAction, + false, + Some( + "add 1:n relationship from Customers.id to Orders.CustId on delete cascade" + .to_string(), + ), + ) + .await + .unwrap(); + // Customers has only a serial PK; long-form INSERT with + // an explicit id keeps the test independent of short-form + // semantics for "all-auto-generated" tables. + db.insert( + "Customers".to_string(), + Some(vec!["id".to_string()]), + vec![Value::Number("1".to_string())], + Some("insert into Customers (id) values (1)".to_string()), + ) + .await + .unwrap(); + db.insert( + "Orders".to_string(), + Some(vec!["CustId".to_string()]), + vec![Value::Number("1".to_string())], + Some("insert into Orders (CustId) values (1)".to_string()), + ) + .await + .unwrap(); + + // Cascade delete from Customers should also clean Orders. + let result = db + .delete( + "Customers".to_string(), + RowFilter::Where { + column: "id".to_string(), + value: Value::Number("1".to_string()), + }, + Some("delete from Customers where id=1".to_string()), + ) + .await + .unwrap(); + assert_eq!(result.rows_affected, 1); + }); + + let customers_csv = read_csv(&path, "Customers").expect("Customers.csv"); + let orders_csv = read_csv(&path, "Orders").expect("Orders.csv"); + // Both CSVs should be header-only after cascade. + assert_eq!(customers_csv.lines().count(), 1, "got: {customers_csv}"); + assert_eq!(orders_csv.lines().count(), 1, "got: {orders_csv}"); +} + +#[test] +fn show_table_appends_history_only() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial }], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .unwrap(); + let yaml_before = read_yaml(&path); + db.describe_table( + "Customers".to_string(), + Some("show table Customers".to_string()), + ) + .await + .unwrap(); + let yaml_after = read_yaml(&path); + // YAML body did not change for a read-only command. + assert_eq!(yaml_before, yaml_after); + }); + + let history = read_history(&path); + assert!( + history.iter().any(|l| l.ends_with("|ok|show table Customers")), + "history missing show entry: {history:?}", + ); +} + +#[test] +fn failed_command_does_not_append_history_or_change_yaml() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial }], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .unwrap(); + let yaml_before = read_yaml(&path); + + // Same name again — should fail. + let err = db + .create_table( + "Customers".to_string(), + vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial }], + vec!["id".to_string()], + Some("create table Customers with pk id:serial".to_string()), + ) + .await + .expect_err("must fail"); + let _ = err; + + let yaml_after = read_yaml(&path); + assert_eq!(yaml_before, yaml_after, "failed cmd must not change yaml"); + }); + + let history = read_history(&path); + // Only the first (successful) create_table should have logged. + let create_count = history + .iter() + .filter(|l| l.contains("|ok|create table Customers")) + .count(); + assert_eq!(create_count, 1, "expected exactly one logged create; got: {history:?}"); +} + +#[test] +fn project_yaml_carries_relationship_after_add() { + let data = tempdir(); + let (_p, db, path) = open_project(&data); + + rt().block_on(async { + db.create_table( + "Customers".to_string(), + vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial }], + vec!["id".to_string()], + None, + ) + .await + .unwrap(); + db.create_table( + "Orders".to_string(), + vec![ + ColumnSpec { name: "id".to_string(), ty: Type::Serial }, + ColumnSpec { name: "CustId".to_string(), ty: Type::Int }, + ], + vec!["id".to_string()], + None, + ) + .await + .unwrap(); + db.add_relationship( + None, + "Customers".to_string(), + "id".to_string(), + "Orders".to_string(), + "CustId".to_string(), + ReferentialAction::Cascade, + ReferentialAction::NoAction, + false, + Some( + "add 1:n relationship from Customers.id to Orders.CustId on delete cascade" + .to_string(), + ), + ) + .await + .unwrap(); + }); + + let yaml = read_yaml(&path); + assert!(yaml.contains("- name: Customers_id_to_Orders_CustId"), "yaml: {yaml}"); + assert!(yaml.contains("on_delete: cascade"), "yaml: {yaml}"); + assert!(yaml.contains("on_update: no_action"), "yaml: {yaml}"); +} diff --git a/tests/project_lifecycle.rs b/tests/project_lifecycle.rs index 81eb818..f06df99 100644 --- a/tests/project_lifecycle.rs +++ b/tests/project_lifecycle.rs @@ -175,7 +175,7 @@ fn db_persists_across_open_close_cycles() { }, ], vec!["id".to_string()], - ) + None) .await .expect("create_table"); }); diff --git a/tests/walking_skeleton.rs b/tests/walking_skeleton.rs index 0ebc300..2134982 100644 --- a/tests/walking_skeleton.rs +++ b/tests/walking_skeleton.rs @@ -42,6 +42,20 @@ fn submit(app: &mut App) -> Vec { app.update(key(KeyCode::Enter)) } +/// Assert that `actions` is exactly one `Action::ExecuteDsl` +/// whose parsed command equals `expected`. The original source +/// text carried alongside the command is allowed to be +/// anything — tests construct the expected `Command` directly +/// and don't care about the verbatim user input. +#[track_caller] +fn assert_one_execute_dsl(actions: &[Action], expected: &Command) { + assert_eq!(actions.len(), 1, "expected exactly one action; got {actions:?}"); + match &actions[0] { + Action::ExecuteDsl { command, .. } => assert_eq!(command, expected), + other => panic!("expected ExecuteDsl, got {other:?}"), + } +} + fn rendered_text(app: &mut App, theme: &Theme, width: u16, height: u16) -> String { let backend = TestBackend::new(width, height); let mut terminal = Terminal::new(backend).expect("create terminal"); @@ -72,16 +86,16 @@ fn typing_then_submitting_a_dsl_command_emits_execute_action() { ); let actions = submit(&mut app); - assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::CreateTable { + assert_one_execute_dsl( + &actions, + &Command::CreateTable { name: "Customers".to_string(), columns: vec![ColumnSpec { name: "id".to_string(), ty: Type::Serial, }], primary_key: vec!["id".to_string()], - })] + }, ); assert!(app.input.is_empty(), "input buffer cleared on submit"); let post_render = rendered_text(&mut app, &theme, 80, 24); @@ -262,7 +276,7 @@ fn create_table_flow_updates_tables_list_and_structure_view() { }], primary_key: vec!["id".to_string()], }; - assert_eq!(actions, vec![Action::ExecuteDsl(expected_cmd.clone())]); + assert_one_execute_dsl(&actions, &expected_cmd); // Runtime would now dispatch and feed back DslSucceeded + TablesRefreshed. let desc = fake_table("Customers", &[("id", Type::Serial, true)]); @@ -302,13 +316,13 @@ fn add_column_flow_updates_structure_view() { type_str(&mut app, "add column to table Customers: Name (text)"); let actions = submit(&mut app); - assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::AddColumn { + assert_one_execute_dsl( + &actions, + &Command::AddColumn { table: "Customers".to_string(), column: "Name".to_string(), ty: Type::Text, - })] + }, ); let updated = fake_table( @@ -336,11 +350,11 @@ fn drop_table_flow_clears_items_list() { type_str(&mut app, "drop table Customers"); let actions = submit(&mut app); - assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::DropTable { - name: "Customers".to_string() - })] + assert_one_execute_dsl( + &actions, + &Command::DropTable { + name: "Customers".to_string(), + }, ); app.update(AppEvent::DslSucceeded { @@ -366,9 +380,9 @@ fn add_relationship_flow_shows_parent_side_with_inbound_section() { "add 1:n relationship from Customers.Id to Orders.CustId on delete cascade", ); let actions = submit(&mut app); - assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::AddRelationship { + assert_one_execute_dsl( + &actions, + &Command::AddRelationship { name: None, parent_table: "Customers".to_string(), parent_column: "Id".to_string(), @@ -377,7 +391,7 @@ fn add_relationship_flow_shows_parent_side_with_inbound_section() { on_delete: ReferentialAction::Cascade, on_update: ReferentialAction::NoAction, create_fk: false, - })] + }, ); // The runtime now feeds back the parent (Customers) so the @@ -470,13 +484,13 @@ fn insert_flow_emits_action_and_renders_data() { type_str(&mut app, "insert into Customers values ('Alice')"); let actions = submit(&mut app); - assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::Insert { + assert_one_execute_dsl( + &actions, + &Command::Insert { table: "Customers".to_string(), columns: None, values: vec![Value::Text("Alice".to_string())], - })] + }, ); // Simulate the runtime feeding back an InsertResult. @@ -516,12 +530,12 @@ fn delete_with_all_rows_emits_correct_action() { let mut app = App::new(); type_str(&mut app, "delete from Customers --all-rows"); let actions = submit(&mut app); - assert_eq!( - actions, - vec![Action::ExecuteDsl(Command::Delete { + assert_one_execute_dsl( + &actions, + &Command::Delete { table: "Customers".to_string(), filter: RowFilter::AllRows, - })] + }, ); }