//! Tier-3 integration tests for the undo/snapshot ring wired into //! the db worker (ADR-0006 Amendment 1, §8 step 3). //! //! These drive the real `Database` worker: a mutation takes a //! pre-op snapshot, `undo` restores it through the live connection, //! `redo` re-applies, a batch collapses to a single undo step, and //! `--no-undo` (undo disabled) takes no snapshots at all. use std::path::Path; use rdbms_playground::db::Database; use rdbms_playground::dsl::{ColumnSpec, Command, RowFilter, Type, Value, parse_command}; use rdbms_playground::persistence::Persistence; use rdbms_playground::project; fn tempdir() -> tempfile::TempDir { tempfile::tempdir().expect("create tempdir") } fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("tokio rt") } /// Open a fresh temp project with undo enabled (or not). fn open_project( data: &tempfile::TempDir, undo_enabled: bool, ) -> (project::Project, Database, std::path::PathBuf) { let project = project::open_or_create(None, Some(data.path())).expect("open project"); let path = project.path().to_path_buf(); let persistence = Persistence::new(path.clone()); let db = Database::open_with_persistence_and_undo(project.db_path(), persistence, undo_enabled) .expect("open db"); (project, db, path) } async fn make_customers(db: &Database) { db.create_table( "Customers".to_string(), vec![ ColumnSpec::new("id".to_string(), Type::Serial), ColumnSpec::new("Name".to_string(), Type::Text), ], vec!["id".to_string()], Some("create table Customers with pk id(serial)".to_string()), ) .await .unwrap(); } async fn insert_named(db: &Database, name: &str) { db.insert( "Customers".to_string(), None, vec![Value::Text(name.to_string())], Some(format!("insert into Customers ('{name}')")), ) .await .unwrap(); } async fn row_count(db: &Database) -> usize { db.query_data("Customers".to_string(), None, None) .await .unwrap() .rows .len() } fn snapshots_dir(path: &Path) -> std::path::PathBuf { path.join(".snapshots") } #[test] fn mutation_snapshots_and_undo_restores_through_the_worker() { let data = tempdir(); let (_p, db, path) = open_project(&data, true); rt().block_on(async { make_customers(&db).await; insert_named(&db, "Alice").await; insert_named(&db, "Bob").await; assert_eq!(row_count(&db).await, 2); // Destructive op: delete Bob (id = 2). db.delete( "Customers".to_string(), RowFilter::eq("id", Value::Number("2".to_string())), Some("delete from Customers where id = 2".to_string()), ) .await .unwrap(); assert_eq!(row_count(&db).await, 1); // The pending undo names the delete. let peek = db.peek_undo().await.unwrap().expect("an undo entry"); assert_eq!(peek.command, "delete from Customers where id = 2"); // Undo restores Bob. let undone = db.undo().await.unwrap().expect("undo applied"); assert_eq!(undone.command, "delete from Customers where id = 2"); assert_eq!(row_count(&db).await, 2, "Bob restored by undo"); }); assert!(snapshots_dir(&path).exists(), "snapshots dir created"); } #[test] fn redo_reapplies_the_undone_command() { let data = tempdir(); let (_p, db, _path) = open_project(&data, true); rt().block_on(async { make_customers(&db).await; insert_named(&db, "Alice").await; insert_named(&db, "Bob").await; db.delete( "Customers".to_string(), RowFilter::eq("id", Value::Number("2".to_string())), Some("delete from Customers where id = 2".to_string()), ) .await .unwrap(); assert_eq!(row_count(&db).await, 1); db.undo().await.unwrap(); assert_eq!(row_count(&db).await, 2); let redone = db.redo().await.unwrap().expect("redo applied"); assert_eq!(redone.command, "delete from Customers where id = 2"); assert_eq!(row_count(&db).await, 1, "delete re-applied by redo"); }); } #[test] fn new_work_after_undo_clears_redo() { let data = tempdir(); let (_p, db, _path) = open_project(&data, true); rt().block_on(async { make_customers(&db).await; insert_named(&db, "Alice").await; db.delete( "Customers".to_string(), RowFilter::AllRows, Some("delete from Customers --all-rows".to_string()), ) .await .unwrap(); db.undo().await.unwrap(); assert!(db.peek_redo().await.unwrap().is_some(), "redo available"); // New destructive work. insert_named(&db, "Carol").await; assert!( db.peek_redo().await.unwrap().is_none(), "new work discards the redo stack" ); }); } #[test] fn undo_disabled_takes_no_snapshots() { let data = tempdir(); let (_p, db, path) = open_project(&data, false); rt().block_on(async { make_customers(&db).await; insert_named(&db, "Alice").await; db.delete( "Customers".to_string(), RowFilter::AllRows, Some("delete from Customers --all-rows".to_string()), ) .await .unwrap(); // Nothing to undo, and no snapshot machinery on disk. assert!(db.undo().await.unwrap().is_none(), "undo is a no-op when disabled"); assert!(db.peek_undo().await.unwrap().is_none()); }); assert!( !snapshots_dir(&path).exists(), "no .snapshots dir when undo is disabled" ); } #[test] fn batch_records_a_single_undo_step() { let data = tempdir(); let (_p, db, _path) = open_project(&data, true); rt().block_on(async { make_customers(&db).await; // one undo entry (the create) // A batch of three inserts → one boundary snapshot. db.begin_batch(Some("replay history.log".to_string())) .await .unwrap(); insert_named(&db, "Alice").await; insert_named(&db, "Bob").await; insert_named(&db, "Carol").await; db.end_batch().await.unwrap(); assert_eq!(row_count(&db).await, 3); // The single batch undo names the batch command. let peek = db.peek_undo().await.unwrap().expect("batch undo entry"); assert_eq!(peek.command, "replay history.log"); // One undo rolls the whole batch back to the pre-batch state // (table exists, no rows). db.undo().await.unwrap(); assert_eq!(row_count(&db).await, 0, "whole batch undone in one step"); // The next undo is the create_table (table gone). let next = db.peek_undo().await.unwrap().expect("create entry"); assert_eq!(next.command, "create table Customers with pk id(serial)"); }); } #[test] fn empty_undo_and_redo_are_no_ops() { let data = tempdir(); let (_p, db, _path) = open_project(&data, true); rt().block_on(async { assert!(db.undo().await.unwrap().is_none()); assert!(db.redo().await.unwrap().is_none()); assert!(db.peek_undo().await.unwrap().is_none()); assert!(db.peek_redo().await.unwrap().is_none()); }); } // ---- Step 7: full-stack flow across DSL *and* SQL (R21 / R22) ---- // // R22: the snapshot hook lives in the worker dispatch, so SQL DML // (SqlInsert/SqlUpdate/SqlDelete) is snapshotted exactly like DSL. async fn make_t(db: &Database) { db.create_table( "T".to_string(), vec![ ColumnSpec::new("id".to_string(), Type::Int), ColumnSpec::new("n".to_string(), Type::Int), ], vec!["id".to_string()], Some("create table T with pk id(int), n(int)".to_string()), ) .await .unwrap(); } async fn dsl_insert(db: &Database, id: i64, n: i64) { db.insert( "T".to_string(), Some(vec!["id".to_string(), "n".to_string()]), vec![Value::Number(id.to_string()), Value::Number(n.to_string())], Some(format!("insert into T (id, n) values ({id}, {n})")), ) .await .unwrap(); } async fn sql_insert(db: &Database, input: &str) { match parse_command(input).unwrap() { Command::SqlInsert { sql, target_table, listed_columns, row_source, returning, .. } => { db.run_sql_insert( sql, Some(input.to_string()), target_table, listed_columns, row_source, returning, ) .await .unwrap(); } other => panic!("expected SqlInsert from {input:?}, got {other:?}"), } } async fn sql_delete(db: &Database, input: &str) { match parse_command(input).unwrap() { Command::SqlDelete { sql, target_table, returning, } => { db.run_sql_delete(sql, Some(input.to_string()), target_table, returning) .await .unwrap(); } other => panic!("expected SqlDelete from {input:?}, got {other:?}"), } } async fn count_t(db: &Database) -> usize { db.query_data("T".to_string(), None, None) .await .unwrap() .rows .len() } #[test] fn undo_steps_back_across_dsl_and_sql_mutations() { let data = tempdir(); let (_p, db, _path) = open_project(&data, true); rt().block_on(async { make_t(&db).await; // snapshot 1: create dsl_insert(&db, 1, 10).await; // snapshot 2: DSL insert sql_insert(&db, "insert into T (id, n) values (2, 20)").await; // snapshot 3: SQL insert assert_eq!(count_t(&db).await, 2); sql_delete(&db, "delete from T where id = 1").await; // snapshot 4: SQL delete assert_eq!(count_t(&db).await, 1); // Walk back through SQL then DSL boundaries. db.undo().await.unwrap(); assert_eq!(count_t(&db).await, 2, "SQL delete undone"); db.undo().await.unwrap(); assert_eq!(count_t(&db).await, 1, "SQL insert undone"); db.undo().await.unwrap(); assert_eq!(count_t(&db).await, 0, "DSL insert undone"); // Redo re-applies the DSL insert. db.redo().await.unwrap(); assert_eq!(count_t(&db).await, 1, "DSL insert redone"); }); } #[test] fn undo_restores_db_and_csv_consistently() { let data = tempdir(); let (_p, db, path) = open_project(&data, true); rt().block_on(async { db.create_table( "T".to_string(), vec![ ColumnSpec::new("id".to_string(), Type::Int), ColumnSpec::new("name".to_string(), Type::Text), ], vec!["id".to_string()], Some("create table T".to_string()), ) .await .unwrap(); db.insert( "T".to_string(), Some(vec!["id".to_string(), "name".to_string()]), vec![Value::Number("1".to_string()), Value::Text("Alice".to_string())], Some("insert Alice".to_string()), ) .await .unwrap(); sql_insert(&db, "insert into T (id, name) values (2, 'Bob')").await; sql_delete(&db, "delete from T where id = 2").await; let csv = std::fs::read_to_string(path.join("data").join("T.csv")).unwrap(); assert!( csv.contains("Alice") && !csv.contains("Bob"), "post-delete csv: {csv}" ); db.undo().await.unwrap(); // Both the database read model and the on-disk CSV are // restored — the (db, csv) pair stays consistent. assert_eq!( db.query_data("T".to_string(), None, None) .await .unwrap() .rows .len(), 2 ); let csv2 = std::fs::read_to_string(path.join("data").join("T.csv")).unwrap(); assert!(csv2.contains("Bob"), "csv restored on disk: {csv2}"); }); } #[test] fn redo_is_cleared_when_new_work_commits_without_a_snapshot() { // Regression for a /runda finding: with the non-fatal // snapshot-failure policy, a committed mutation whose snapshot // can't be staged left the redo stack stale — a later `redo` // would silently discard the new work. Any committed user work // must clear redo, even when no snapshot was recorded. let data = tempdir(); let (_p, db, path) = open_project(&data, true); rt().block_on(async { make_t(&db).await; dsl_insert(&db, 1, 10).await; sql_delete(&db, "delete from T where id = 1").await; // → 0 rows db.undo().await.unwrap(); // redo now holds the delete; → 1 row assert!(db.peek_redo().await.unwrap().is_some(), "redo populated"); }); // Force the next staging to fail while the rest of the ring stays // writable: a plain file where the `.staging` dir is expected makes // `stage` error, but `clear_redo` (index + payload deletes in the // ring root) still succeeds. let staging = path.join(".snapshots").join(".staging"); std::fs::write(&staging, b"block").unwrap(); rt().block_on(async { dsl_insert(&db, 2, 20).await; // commits; snapshot staging fails assert_eq!(count_t(&db).await, 2, "new work applied"); assert!( db.peek_redo().await.unwrap().is_none(), "stale redo must be cleared when new work commits without a snapshot" ); // Redo is now a no-op — it cannot resurrect the discarded state. assert!(db.redo().await.unwrap().is_none()); assert_eq!(count_t(&db).await, 2, "new work preserved"); }); } #[test] fn undo_ring_persists_across_reopen() { let data = tempdir(); let (project, db, path) = open_project(&data, true); let db_path = project.db_path(); rt().block_on(async { make_t(&db).await; dsl_insert(&db, 1, 10).await; sql_delete(&db, "delete from T where id = 1").await; assert_eq!(count_t(&db).await, 0); }); // Close the worker, then reopen the *same* project (lock still // held by `project`). The persisted ring must survive. drop(db); let persistence = Persistence::new(path); let db2 = Database::open_with_persistence_and_undo(&db_path, persistence, true) .expect("reopen db"); rt().block_on(async { let peek = db2 .peek_undo() .await .unwrap() .expect("ring persisted across reopen"); assert_eq!(peek.command, "delete from T where id = 1"); db2.undo().await.unwrap(); assert_eq!(count_t(&db2).await, 1, "row restored after reopen + undo"); }); }