e8fa859ab9
ADR-0052 moved success journaling out of the worker to the dispatch layer, leaving the `source` that handlers threaded purely for the worker's old history.log write dead. Remove it: - drop `_source` from finalize_persistence and do_rebuild_from_text - inline + delete the three read-only *_request wrappers - drop the now-unused `source` param from the ~30 forwarding worker handlers (leaf + composite), compiler-guided - remove the `source` field from the DescribeTable/QueryData/RunSelect requests and their DatabaseHandle methods (call sites updated) The only worker `source` left is the snapshot/undo label (snapshot_then / stage_pre_mutation / begin_batch). Purely mechanical, no behaviour change. 2471 pass / 0 fail / 1 ignored, clippy clean.
461 lines
15 KiB
Rust
461 lines
15 KiB
Rust
//! Tier-3 integration tests for the undo/snapshot ring wired into
|
|
//! the db worker (ADR-0006 Amendment 1, §8 step 3).
|
|
//!
|
|
//! These drive the real `Database` worker: a mutation takes a
|
|
//! pre-op snapshot, `undo` restores it through the live connection,
|
|
//! `redo` re-applies, a batch collapses to a single undo step, and
|
|
//! `--no-undo` (undo disabled) takes no snapshots at all.
|
|
|
|
use std::path::Path;
|
|
|
|
use rdbms_playground::db::Database;
|
|
use rdbms_playground::dsl::{ColumnSpec, Command, RowFilter, Type, Value, parse_command};
|
|
use rdbms_playground::persistence::Persistence;
|
|
use rdbms_playground::project;
|
|
|
|
fn tempdir() -> tempfile::TempDir {
|
|
tempfile::tempdir().expect("create tempdir")
|
|
}
|
|
|
|
fn rt() -> tokio::runtime::Runtime {
|
|
tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("tokio rt")
|
|
}
|
|
|
|
/// Open a fresh temp project with undo enabled (or not).
|
|
fn open_project(
|
|
data: &tempfile::TempDir,
|
|
undo_enabled: bool,
|
|
) -> (project::Project, Database, std::path::PathBuf) {
|
|
let project = project::open_or_create(None, Some(data.path())).expect("open project");
|
|
let path = project.path().to_path_buf();
|
|
let persistence = Persistence::new(path.clone());
|
|
let db = Database::open_with_persistence_and_undo(project.db_path(), persistence, undo_enabled)
|
|
.expect("open db");
|
|
(project, db, path)
|
|
}
|
|
|
|
async fn make_customers(db: &Database) {
|
|
db.create_table(
|
|
"Customers".to_string(),
|
|
vec![
|
|
ColumnSpec::new("id".to_string(), Type::Serial),
|
|
ColumnSpec::new("Name".to_string(), Type::Text),
|
|
],
|
|
vec!["id".to_string()],
|
|
Some("create table Customers with pk id(serial)".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
async fn insert_named(db: &Database, name: &str) {
|
|
db.insert(
|
|
"Customers".to_string(),
|
|
None,
|
|
vec![Value::Text(name.to_string())],
|
|
Some(format!("insert into Customers ('{name}')")),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
async fn row_count(db: &Database) -> usize {
|
|
db.query_data("Customers".to_string(), None, None)
|
|
.await
|
|
.unwrap()
|
|
.rows
|
|
.len()
|
|
}
|
|
|
|
fn snapshots_dir(path: &Path) -> std::path::PathBuf {
|
|
path.join(".snapshots")
|
|
}
|
|
|
|
#[test]
|
|
fn mutation_snapshots_and_undo_restores_through_the_worker() {
|
|
let data = tempdir();
|
|
let (_p, db, path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
make_customers(&db).await;
|
|
insert_named(&db, "Alice").await;
|
|
insert_named(&db, "Bob").await;
|
|
assert_eq!(row_count(&db).await, 2);
|
|
|
|
// Destructive op: delete Bob (id = 2).
|
|
db.delete(
|
|
"Customers".to_string(),
|
|
RowFilter::eq("id", Value::Number("2".to_string())),
|
|
Some("delete from Customers where id = 2".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(row_count(&db).await, 1);
|
|
|
|
// The pending undo names the delete.
|
|
let peek = db.peek_undo().await.unwrap().expect("an undo entry");
|
|
assert_eq!(peek.command, "delete from Customers where id = 2");
|
|
|
|
// Undo restores Bob.
|
|
let undone = db.undo().await.unwrap().expect("undo applied");
|
|
assert_eq!(undone.command, "delete from Customers where id = 2");
|
|
assert_eq!(row_count(&db).await, 2, "Bob restored by undo");
|
|
});
|
|
|
|
assert!(snapshots_dir(&path).exists(), "snapshots dir created");
|
|
}
|
|
|
|
#[test]
|
|
fn redo_reapplies_the_undone_command() {
|
|
let data = tempdir();
|
|
let (_p, db, _path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
make_customers(&db).await;
|
|
insert_named(&db, "Alice").await;
|
|
insert_named(&db, "Bob").await;
|
|
db.delete(
|
|
"Customers".to_string(),
|
|
RowFilter::eq("id", Value::Number("2".to_string())),
|
|
Some("delete from Customers where id = 2".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(row_count(&db).await, 1);
|
|
|
|
db.undo().await.unwrap();
|
|
assert_eq!(row_count(&db).await, 2);
|
|
|
|
let redone = db.redo().await.unwrap().expect("redo applied");
|
|
assert_eq!(redone.command, "delete from Customers where id = 2");
|
|
assert_eq!(row_count(&db).await, 1, "delete re-applied by redo");
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn new_work_after_undo_clears_redo() {
|
|
let data = tempdir();
|
|
let (_p, db, _path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
make_customers(&db).await;
|
|
insert_named(&db, "Alice").await;
|
|
db.delete(
|
|
"Customers".to_string(),
|
|
RowFilter::AllRows,
|
|
Some("delete from Customers --all-rows".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.undo().await.unwrap();
|
|
assert!(db.peek_redo().await.unwrap().is_some(), "redo available");
|
|
|
|
// New destructive work.
|
|
insert_named(&db, "Carol").await;
|
|
assert!(
|
|
db.peek_redo().await.unwrap().is_none(),
|
|
"new work discards the redo stack"
|
|
);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn undo_disabled_takes_no_snapshots() {
|
|
let data = tempdir();
|
|
let (_p, db, path) = open_project(&data, false);
|
|
|
|
rt().block_on(async {
|
|
make_customers(&db).await;
|
|
insert_named(&db, "Alice").await;
|
|
db.delete(
|
|
"Customers".to_string(),
|
|
RowFilter::AllRows,
|
|
Some("delete from Customers --all-rows".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Nothing to undo, and no snapshot machinery on disk.
|
|
assert!(db.undo().await.unwrap().is_none(), "undo is a no-op when disabled");
|
|
assert!(db.peek_undo().await.unwrap().is_none());
|
|
});
|
|
|
|
assert!(
|
|
!snapshots_dir(&path).exists(),
|
|
"no .snapshots dir when undo is disabled"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn batch_records_a_single_undo_step() {
|
|
let data = tempdir();
|
|
let (_p, db, _path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
make_customers(&db).await; // one undo entry (the create)
|
|
|
|
// A batch of three inserts → one boundary snapshot.
|
|
db.begin_batch(Some("replay history.log".to_string()))
|
|
.await
|
|
.unwrap();
|
|
insert_named(&db, "Alice").await;
|
|
insert_named(&db, "Bob").await;
|
|
insert_named(&db, "Carol").await;
|
|
db.end_batch().await.unwrap();
|
|
assert_eq!(row_count(&db).await, 3);
|
|
|
|
// The single batch undo names the batch command.
|
|
let peek = db.peek_undo().await.unwrap().expect("batch undo entry");
|
|
assert_eq!(peek.command, "replay history.log");
|
|
|
|
// One undo rolls the whole batch back to the pre-batch state
|
|
// (table exists, no rows).
|
|
db.undo().await.unwrap();
|
|
assert_eq!(row_count(&db).await, 0, "whole batch undone in one step");
|
|
|
|
// The next undo is the create_table (table gone).
|
|
let next = db.peek_undo().await.unwrap().expect("create entry");
|
|
assert_eq!(next.command, "create table Customers with pk id(serial)");
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn empty_undo_and_redo_are_no_ops() {
|
|
let data = tempdir();
|
|
let (_p, db, _path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
assert!(db.undo().await.unwrap().is_none());
|
|
assert!(db.redo().await.unwrap().is_none());
|
|
assert!(db.peek_undo().await.unwrap().is_none());
|
|
assert!(db.peek_redo().await.unwrap().is_none());
|
|
});
|
|
}
|
|
|
|
// ---- Step 7: full-stack flow across DSL *and* SQL (R21 / R22) ----
|
|
//
|
|
// R22: the snapshot hook lives in the worker dispatch, so SQL DML
|
|
// (SqlInsert/SqlUpdate/SqlDelete) is snapshotted exactly like DSL.
|
|
|
|
async fn make_t(db: &Database) {
|
|
db.create_table(
|
|
"T".to_string(),
|
|
vec![
|
|
ColumnSpec::new("id".to_string(), Type::Int),
|
|
ColumnSpec::new("n".to_string(), Type::Int),
|
|
],
|
|
vec!["id".to_string()],
|
|
Some("create table T with pk id(int), n(int)".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
async fn dsl_insert(db: &Database, id: i64, n: i64) {
|
|
db.insert(
|
|
"T".to_string(),
|
|
Some(vec!["id".to_string(), "n".to_string()]),
|
|
vec![Value::Number(id.to_string()), Value::Number(n.to_string())],
|
|
Some(format!("insert into T (id, n) values ({id}, {n})")),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
async fn sql_insert(db: &Database, input: &str) {
|
|
match parse_command(input).unwrap() {
|
|
Command::SqlInsert {
|
|
sql,
|
|
target_table,
|
|
listed_columns,
|
|
row_source,
|
|
returning,
|
|
..
|
|
} => {
|
|
db.run_sql_insert(
|
|
sql,
|
|
Some(input.to_string()),
|
|
target_table,
|
|
listed_columns,
|
|
row_source,
|
|
returning,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
other => panic!("expected SqlInsert from {input:?}, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
async fn sql_delete(db: &Database, input: &str) {
|
|
match parse_command(input).unwrap() {
|
|
Command::SqlDelete {
|
|
sql,
|
|
target_table,
|
|
returning,
|
|
} => {
|
|
db.run_sql_delete(sql, Some(input.to_string()), target_table, returning)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
other => panic!("expected SqlDelete from {input:?}, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
async fn count_t(db: &Database) -> usize {
|
|
db.query_data("T".to_string(), None, None)
|
|
.await
|
|
.unwrap()
|
|
.rows
|
|
.len()
|
|
}
|
|
|
|
#[test]
|
|
fn undo_steps_back_across_dsl_and_sql_mutations() {
|
|
let data = tempdir();
|
|
let (_p, db, _path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
make_t(&db).await; // snapshot 1: create
|
|
dsl_insert(&db, 1, 10).await; // snapshot 2: DSL insert
|
|
sql_insert(&db, "insert into T (id, n) values (2, 20)").await; // snapshot 3: SQL insert
|
|
assert_eq!(count_t(&db).await, 2);
|
|
sql_delete(&db, "delete from T where id = 1").await; // snapshot 4: SQL delete
|
|
assert_eq!(count_t(&db).await, 1);
|
|
|
|
// Walk back through SQL then DSL boundaries.
|
|
db.undo().await.unwrap();
|
|
assert_eq!(count_t(&db).await, 2, "SQL delete undone");
|
|
db.undo().await.unwrap();
|
|
assert_eq!(count_t(&db).await, 1, "SQL insert undone");
|
|
db.undo().await.unwrap();
|
|
assert_eq!(count_t(&db).await, 0, "DSL insert undone");
|
|
|
|
// Redo re-applies the DSL insert.
|
|
db.redo().await.unwrap();
|
|
assert_eq!(count_t(&db).await, 1, "DSL insert redone");
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn undo_restores_db_and_csv_consistently() {
|
|
let data = tempdir();
|
|
let (_p, db, path) = open_project(&data, true);
|
|
|
|
rt().block_on(async {
|
|
db.create_table(
|
|
"T".to_string(),
|
|
vec![
|
|
ColumnSpec::new("id".to_string(), Type::Int),
|
|
ColumnSpec::new("name".to_string(), Type::Text),
|
|
],
|
|
vec!["id".to_string()],
|
|
Some("create table T".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.insert(
|
|
"T".to_string(),
|
|
Some(vec!["id".to_string(), "name".to_string()]),
|
|
vec![Value::Number("1".to_string()), Value::Text("Alice".to_string())],
|
|
Some("insert Alice".to_string()),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
sql_insert(&db, "insert into T (id, name) values (2, 'Bob')").await;
|
|
sql_delete(&db, "delete from T where id = 2").await;
|
|
|
|
let csv = std::fs::read_to_string(path.join("data").join("T.csv")).unwrap();
|
|
assert!(
|
|
csv.contains("Alice") && !csv.contains("Bob"),
|
|
"post-delete csv: {csv}"
|
|
);
|
|
|
|
db.undo().await.unwrap();
|
|
// Both the database read model and the on-disk CSV are
|
|
// restored — the (db, csv) pair stays consistent.
|
|
assert_eq!(
|
|
db.query_data("T".to_string(), None, None)
|
|
.await
|
|
.unwrap()
|
|
.rows
|
|
.len(),
|
|
2
|
|
);
|
|
let csv2 = std::fs::read_to_string(path.join("data").join("T.csv")).unwrap();
|
|
assert!(csv2.contains("Bob"), "csv restored on disk: {csv2}");
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn redo_is_cleared_when_new_work_commits_without_a_snapshot() {
|
|
// Regression for a /runda finding: with the non-fatal
|
|
// snapshot-failure policy, a committed mutation whose snapshot
|
|
// can't be staged left the redo stack stale — a later `redo`
|
|
// would silently discard the new work. Any committed user work
|
|
// must clear redo, even when no snapshot was recorded.
|
|
let data = tempdir();
|
|
let (_p, db, path) = open_project(&data, true);
|
|
rt().block_on(async {
|
|
make_t(&db).await;
|
|
dsl_insert(&db, 1, 10).await;
|
|
sql_delete(&db, "delete from T where id = 1").await; // → 0 rows
|
|
db.undo().await.unwrap(); // redo now holds the delete; → 1 row
|
|
assert!(db.peek_redo().await.unwrap().is_some(), "redo populated");
|
|
});
|
|
|
|
// Force the next staging to fail while the rest of the ring stays
|
|
// writable: a plain file where the `.staging` dir is expected makes
|
|
// `stage` error, but `clear_redo` (index + payload deletes in the
|
|
// ring root) still succeeds.
|
|
let staging = path.join(".snapshots").join(".staging");
|
|
std::fs::write(&staging, b"block").unwrap();
|
|
|
|
rt().block_on(async {
|
|
dsl_insert(&db, 2, 20).await; // commits; snapshot staging fails
|
|
assert_eq!(count_t(&db).await, 2, "new work applied");
|
|
assert!(
|
|
db.peek_redo().await.unwrap().is_none(),
|
|
"stale redo must be cleared when new work commits without a snapshot"
|
|
);
|
|
// Redo is now a no-op — it cannot resurrect the discarded state.
|
|
assert!(db.redo().await.unwrap().is_none());
|
|
assert_eq!(count_t(&db).await, 2, "new work preserved");
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn undo_ring_persists_across_reopen() {
|
|
let data = tempdir();
|
|
let (project, db, path) = open_project(&data, true);
|
|
let db_path = project.db_path();
|
|
|
|
rt().block_on(async {
|
|
make_t(&db).await;
|
|
dsl_insert(&db, 1, 10).await;
|
|
sql_delete(&db, "delete from T where id = 1").await;
|
|
assert_eq!(count_t(&db).await, 0);
|
|
});
|
|
|
|
// Close the worker, then reopen the *same* project (lock still
|
|
// held by `project`). The persisted ring must survive.
|
|
drop(db);
|
|
let persistence = Persistence::new(path);
|
|
let db2 = Database::open_with_persistence_and_undo(&db_path, persistence, true)
|
|
.expect("reopen db");
|
|
|
|
rt().block_on(async {
|
|
let peek = db2
|
|
.peek_undo()
|
|
.await
|
|
.unwrap()
|
|
.expect("ring persisted across reopen");
|
|
assert_eq!(peek.command, "delete from T where id = 1");
|
|
db2.undo().await.unwrap();
|
|
assert_eq!(count_t(&db2).await, 1, "row restored after reopen + undo");
|
|
});
|
|
}
|