feat: ADR-0006 §8 steps 1-2 — --no-undo flag + snapshot ring module

Step 1 (Cargo + CLI):
- add the `backup` feature to rusqlite (online backup API)
- `--no-undo` flag (test-first) + help-banner entry

Step 2 (snapshot store, src/undo.rs):
- SnapshotStore: a persisted undo ring + redo stack under
  <project>/.snapshots/ (index.yaml + per-snapshot payload dirs)
- hybrid whole-project snapshot: db via backup API + project.yaml /
  data/*.csv copied as files; restore is text-first, db-last
  (ADR-0015 §6 commit-db-last)
- stage/finalize/discard, undo/redo (each snapshots current to keep
  the inverse possible), N=50 eviction, redo-cleared-on-new-work,
  orphan/staging cleanup, monotonic ids
- 12 Tier-1 tests; adds a crate-visible persistence::utc_iso8601_now

No worker wiring yet (step 3). 1674 passed / 0 failed / 1 ignored; clippy clean.
This commit is contained in:
claude@clouddev1
2026-05-24 20:17:03 +00:00
parent 6cf5705022
commit 64eee3ed6d
7 changed files with 824 additions and 7 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ futures-util = "0.3.32"
gethostname = "1.1.0" gethostname = "1.1.0"
rand = "0.10.1" rand = "0.10.1"
ratatui = "0.30.0" ratatui = "0.30.0"
rusqlite = { version = "0.39.0", features = ["bundled", "column_metadata"] } rusqlite = { version = "0.39.0", features = ["backup", "bundled", "column_metadata"] }
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
serde_yml = "0.0.12" serde_yml = "0.0.12"
sysinfo = { version = "0.39.0", default-features = false, features = ["system"] } sysinfo = { version = "0.39.0", default-features = false, features = ["system"] }
@@ -207,7 +207,7 @@ amendment records:
data/<table>.csv data/<table>.csv
playground.db playground.db
.snapshots/ .snapshots/
index.json # ordered ring + redo stack: ids, timestamps, index.yaml # ordered ring + redo stack: ids, timestamps,
# command text, status; the source of truth # command text, status; the source of truth
# for ordering/eviction # for ordering/eviction
0001/ 0001/
@@ -221,7 +221,7 @@ amendment records:
- **`.snapshots/` is git-ignored, export-excluded, and on the - **`.snapshots/` is git-ignored, export-excluded, and on the
temp-cleanup allowlist** (R13). temp-cleanup allowlist** (R13).
- The **ring** (undo) and the **redo stack** are both recorded in - The **ring** (undo) and the **redo stack** are both recorded in
`index.json`; snapshot payload dirs are shared storage referenced by `index.yaml`; snapshot payload dirs are shared storage referenced by
id. Eviction beyond 50 deletes the oldest payload dir. id. Eviction beyond 50 deletes the oldest payload dir.
- Ids are monotonic; never reused, to avoid stale references. - Ids are monotonic; never reused, to avoid stale references.
@@ -253,7 +253,7 @@ A snapshot brackets the existing 4-step persistence sequence:
3. atomic-rename text into place (step 3) 3. atomic-rename text into place (step 3)
4. commit db (step 4) 4. commit db (step 4)
5. FINALIZE snapshot (NEW) — atomic-rename .staging/ → .snapshots/<id>/, 5. FINALIZE snapshot (NEW) — atomic-rename .staging/ → .snapshots/<id>/,
append to index.json ring, evict oldest if append to index.yaml ring, evict oldest if
>50, clear redo stack. >50, clear redo stack.
On any failure in 14 → txn rolls back; DISCARD .staging/. On any failure in 14 → txn rolls back; DISCARD .staging/.
``` ```
@@ -296,7 +296,7 @@ Mirrors the existing two-phase `rebuild` modal flow, with the
1. `undo` parses to `Command::App(AppCommand::Undo)` 1. `undo` parses to `Command::App(AppCommand::Undo)`
`dispatch_app_command` returns `Action::PrepareUndo`. `dispatch_app_command` returns `Action::PrepareUndo`.
2. Runtime handles `PrepareUndo`: reads `.snapshots/index.json` top 2. Runtime handles `PrepareUndo`: reads `.snapshots/index.yaml` top
entry (command text + timestamp) — a cheap file read, like entry (command text + timestamp) — a cheap file read, like
`summarize_project` does for rebuild — and posts `summarize_project` does for rebuild — and posts
`AppEvent::UndoPrepared { command, when }` (or `AppEvent::UndoPrepared { command, when }` (or
@@ -356,7 +356,7 @@ primitive collapses a batch to a single ring entry:
| `src/app.rs` | `Modal::UndoConfirm` (+ redo) struct; event arms; `handle_undo_confirm_key`; dispatch arms | | `src/app.rs` | `Modal::UndoConfirm` (+ redo) struct; event arms; `handle_undo_confirm_key`; dispatch arms |
| `src/ui.rs` | `render_undo_confirm` (mirror `render_rebuild_confirm`) | | `src/ui.rs` | `render_undo_confirm` (mirror `render_rebuild_confirm`) |
| `src/db.rs` | `is_mutating`; `stage/finalize/discard_snapshot`; `Request::Undo`/`Redo`/`BeginBatch`/`EndBatch`; dispatcher wrap; `undo_enabled` + `in_batch` worker fields | | `src/db.rs` | `is_mutating`; `stage/finalize/discard_snapshot`; `Request::Undo`/`Redo`/`BeginBatch`/`EndBatch`; dispatcher wrap; `undo_enabled` + `in_batch` worker fields |
| `src/persistence/` (or new `src/snapshots/`) | ring + redo store, `index.json`, eviction, restore | | `src/undo.rs` (new) | snapshot ring + redo store, `index.yaml`, stage/finalize/discard/undo/redo/cleanup, eviction, restore (named `undo``src/snapshots/` is the insta dir) |
| `src/runtime.rs` | `is_app_lifecycle_entry_word += undo,redo`; `PrepareUndo/Undo` (+redo) handling; thread `no_undo` to worker; bracket `run_replay` with Begin/EndBatch | | `src/runtime.rs` | `is_app_lifecycle_entry_word += undo,redo`; `PrepareUndo/Undo` (+redo) handling; thread `no_undo` to worker; bracket `run_replay` with Begin/EndBatch |
| `src/project/mod.rs` | `.snapshots/` in `.gitignore` template | | `src/project/mod.rs` | `.snapshots/` in `.gitignore` template |
| `src/archive.rs` | exclude `.snapshots/` from export | | `src/archive.rs` | exclude `.snapshots/` from export |
@@ -423,7 +423,7 @@ explicit future items (hardlink optimisation) for user awareness.
1. **Cargo + CLI:** add `backup` feature; `--no-undo` flag + parse 1. **Cargo + CLI:** add `backup` feature; `--no-undo` flag + parse
tests. (R14, R9-partial) tests. (R14, R9-partial)
2. **Snapshot store module:** `index.json` model, ring + redo, stage/ 2. **Snapshot store module:** `index.yaml` model, ring + redo, stage/
finalize/discard/restore, eviction — Tier-1 tests first against a finalize/discard/restore, eviction — Tier-1 tests first against a
temp dir + in-memory/temp db. (R2, R3, R4, R6, R7) temp dir + in-memory/temp db. (R2, R3, R4, R6, R7)
3. **Worker integration:** `is_mutating` (exhaustive), dispatcher wrap, 3. **Worker integration:** `is_mutating` (exhaustive), dispatcher wrap,
+33
View File
@@ -29,6 +29,12 @@ pub struct Args {
/// `--help` / `-h`: print usage to stdout and exit. The /// `--help` / `-h`: print usage to stdout and exit. The
/// runtime checks this flag before doing any other work. /// runtime checks this flag before doing any other work.
pub help: bool, pub help: bool,
/// `--no-undo`: disable the auto-snapshot / undo machinery for
/// this run (ADR-0006 Amendment 1). When set, no snapshots are
/// taken — zero per-command overhead — and `undo` / `redo`
/// report that undo is turned off. The escape hatch for small
/// hardware where per-command snapshotting is too heavy.
pub no_undo: bool,
} }
/// Usage banner printed by `--help`. /// Usage banner printed by `--help`.
@@ -109,6 +115,7 @@ impl Args {
let mut project_path: Option<PathBuf> = None; let mut project_path: Option<PathBuf> = None;
let mut resume = false; let mut resume = false;
let mut help = false; let mut help = false;
let mut no_undo = false;
let mut iter = iter.into_iter().map(Into::into); let mut iter = iter.into_iter().map(Into::into);
while let Some(arg) = iter.next() { while let Some(arg) = iter.next() {
match arg.as_str() { match arg.as_str() {
@@ -118,6 +125,9 @@ impl Args {
"--resume" => { "--resume" => {
resume = true; resume = true;
} }
"--no-undo" => {
no_undo = true;
}
"--theme" => { "--theme" => {
let value = iter.next().ok_or(ArgsError::MissingValue("theme"))?; let value = iter.next().ok_or(ArgsError::MissingValue("theme"))?;
theme = match value.as_str() { theme = match value.as_str() {
@@ -164,6 +174,7 @@ impl Args {
project_path, project_path,
resume, resume,
help, help,
no_undo,
}) })
} }
} }
@@ -300,6 +311,28 @@ mod tests {
assert!(matches!(err, ArgsError::ResumeWithPath), "got: {err:?}"); assert!(matches!(err, ArgsError::ResumeWithPath), "got: {err:?}");
} }
#[test]
fn no_undo_flag_parses() {
let args = Args::parse(["--no-undo"]).unwrap();
assert!(args.no_undo);
}
#[test]
fn no_undo_defaults_off() {
let args = Args::parse(std::iter::empty::<&str>()).unwrap();
assert!(!args.no_undo, "undo is enabled unless --no-undo is given");
}
#[test]
fn no_undo_coexists_with_positional_path() {
let args = Args::parse(["--no-undo", "/home/me/MyProject"]).unwrap();
assert!(args.no_undo);
assert_eq!(
args.project_path.as_deref(),
Some(std::path::Path::new("/home/me/MyProject"))
);
}
#[test] #[test]
fn unknown_double_dash_flag_errors_even_with_positional() { fn unknown_double_dash_flag_errors_even_with_positional() {
// Make sure the path-vs-flag distinction is robust: // Make sure the path-vs-flag distinction is robust:
+4
View File
@@ -191,6 +191,10 @@ help:
Errors out if no previous project is Errors out if no previous project is
recorded. Mutually exclusive with recorded. Mutually exclusive with
<project-path>. <project-path>.
--no-undo Disable the undo machinery for this run:
no snapshot is taken before each change
(no per-command overhead), and undo/redo
report that undo is turned off.
App-level commands (typed inside the app, available in both modes): App-level commands (typed inside the app, available in both modes):
quit Exit cleanly. quit Exit cleanly.
+1
View File
@@ -24,3 +24,4 @@ pub mod runtime;
pub mod theme; pub mod theme;
pub mod type_change; pub mod type_change;
pub mod ui; pub mod ui;
pub mod undo;
+8
View File
@@ -32,6 +32,14 @@ mod yaml;
pub(crate) use csv_io::{decode_cell, parse_csv}; pub(crate) use csv_io::{decode_cell, parse_csv};
pub(crate) use yaml::parse_schema; pub(crate) use yaml::parse_schema;
/// Current UTC time as an ISO-8601 `Z` string — the same shape
/// `history.log` records (ADR-0015 §5). Exposed crate-wide so the
/// undo snapshot ring (ADR-0006) timestamps entries identically.
#[must_use]
pub(crate) fn utc_iso8601_now() -> String {
history::utc_iso8601_now()
}
/// Owns persistence to a single project on disk. Cheap to /// Owns persistence to a single project on disk. Cheap to
/// move; the db worker holds one instance for its lifetime. /// move; the db worker holds one instance for its lifetime.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
+771
View File
@@ -0,0 +1,771 @@
//! Undo/redo snapshot ring (ADR-0006 Amendment 1).
//!
//! A *snapshot* is a whole-project copy taken before a mutation:
//! the database (via SQLite's online backup API) plus the
//! authoritative text (`project.yaml` + `data/*.csv`, copied as
//! files). Undo restores all three directly, re-establishing a
//! consistent `(db, yaml, csv)` triple (ADR-0015). Storing both the
//! database and its text projection means undo is a direct restore —
//! no rebuild step.
//!
//! On-disk layout under `<project>/.snapshots/`:
//!
//! ```text
//! index.yaml — the ordered undo ring + redo stack (source of truth)
//! <id>/ — one payload per snapshot: playground.db, project.yaml,
//! data/*.csv
//! .staging/ — a snapshot being assembled (only one at a time; the
//! db worker is single-threaded)
//! ```
//!
//! **Payload semantics.** Each undo entry's payload is the state that
//! existed *before* its command ran; each redo entry's payload is the
//! state *after* its command ran (the state undo moved away from).
//! Undo pops the newest undo entry, snapshots the current state onto
//! the redo stack, then restores the popped payload. Redo is the
//! mirror. New work (a fresh `finalize`) clears the redo stack
//! (ADR-0006 Amendment 1: redo discarded on new work).
//!
//! **Worker-only.** Every method that touches the database takes the
//! live `&Connection` / `&mut Connection` owned by the db worker, so
//! all live-database access stays on the worker thread (the `db.rs`
//! invariant). `backup` / `restore` go through that connection and
//! snapshot *files*; this module never opens the live database file
//! directly.
use std::fs;
use std::io::Write as _;
use std::path::{Path, PathBuf};
use rusqlite::{Connection, MAIN_DB};
use serde::{Deserialize, Serialize};
use crate::persistence::utc_iso8601_now;
use crate::project::{DATA_DIR, PLAYGROUND_DB, PROJECT_YAML};
/// Default undo ring depth (ADR-0006 Amendment 1: N = 50, raised
/// from the original ADR's N = 10 because single-step undo counts
/// commands). A single tunable constant.
pub const DEFAULT_RING_CAPACITY: usize = 50;
/// Directory under the project that holds the ring.
const SNAPSHOTS_DIR: &str = ".snapshots";
/// In-flight snapshot directory (one at a time).
const STAGING_DIR: &str = ".staging";
/// The ring index file.
const INDEX_FILE: &str = "index.yaml";
#[derive(Debug)]
pub enum SnapshotError {
Io {
op: &'static str,
path: PathBuf,
source: std::io::Error,
},
Db(rusqlite::Error),
Index {
message: String,
},
}
impl std::fmt::Display for SnapshotError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io { op, path, source } => {
write!(f, "snapshot {op} failed for {}: {source}", path.display())
}
Self::Db(e) => write!(f, "snapshot database operation failed: {e}"),
Self::Index { message } => write!(f, "snapshot index error: {message}"),
}
}
}
impl std::error::Error for SnapshotError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io { source, .. } => Some(source),
Self::Db(e) => Some(e),
Self::Index { .. } => None,
}
}
}
impl From<rusqlite::Error> for SnapshotError {
fn from(e: rusqlite::Error) -> Self {
Self::Db(e)
}
}
type Result<T> = std::result::Result<T, SnapshotError>;
/// A staged-but-not-committed snapshot. Held by the worker between
/// `stage` and `finalize`/`discard`; for a batch command the worker
/// holds one of these across the whole batch.
#[derive(Debug, Clone)]
pub struct Staged {
command: String,
timestamp: String,
}
/// What `peek_*` / `undo` / `redo` report back to the caller — enough
/// to build the confirmation prompt ("Undo `<command>` (run <when>)?").
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SnapshotMeta {
pub id: u64,
pub timestamp: String,
pub command: String,
}
#[derive(Clone, Serialize, Deserialize)]
struct Entry {
id: u64,
timestamp: String,
command: String,
}
impl Entry {
fn meta(&self) -> SnapshotMeta {
SnapshotMeta {
id: self.id,
timestamp: self.timestamp.clone(),
command: self.command.clone(),
}
}
}
#[derive(Default, Serialize, Deserialize)]
struct Index {
/// Monotonic id allocator; never reuses an id.
next_id: u64,
/// Undo ring; the *last* element is the most recent (top).
undo: Vec<Entry>,
/// Redo stack; the *last* element is the most recently undone.
redo: Vec<Entry>,
}
/// The snapshot ring for one project. Cheap to construct; the worker
/// holds one for the project's lifetime (or `None` when `--no-undo`).
#[derive(Debug, Clone)]
pub struct SnapshotStore {
project_dir: PathBuf,
root: PathBuf,
cap: usize,
}
impl SnapshotStore {
#[must_use]
pub fn new(project_dir: &Path, cap: usize) -> Self {
Self {
project_dir: project_dir.to_path_buf(),
root: project_dir.join(SNAPSHOTS_DIR),
cap,
}
}
// ---- public API ----------------------------------------------------
/// Stage a snapshot of the *current* committed state (database +
/// text) into `.staging/`. Call before a mutation's transaction
/// opens; `finalize` commits it into the ring after the db commit,
/// or `discard` drops it if the mutation rolled back.
pub fn stage(&self, live: &Connection, command: &str) -> Result<Staged> {
let staging = self.staging_dir();
remove_dir_all_if_exists(&staging)?;
tracing::debug!(command, dir = %staging.display(), "staging undo snapshot");
self.snapshot_into(live, &staging)?;
Ok(Staged {
command: command.to_string(),
timestamp: utc_iso8601_now(),
})
}
/// Commit a staged snapshot into the undo ring: rename `.staging/`
/// to a fresh `<id>/`, push the entry, **clear the redo stack**
/// (new work) and evict the oldest beyond the cap. Returns the new
/// undo entry's metadata.
pub fn finalize(&self, staged: Staged) -> Result<SnapshotMeta> {
let mut index = self.load_index()?;
let id = index.next_id;
index.next_id += 1;
let dest = self.payload_dir(id);
rename(&self.staging_dir(), &dest)?;
let entry = Entry {
id,
timestamp: staged.timestamp,
command: staged.command,
};
let meta = entry.meta();
index.undo.push(entry);
// New work invalidates redo (ADR-0006 Amendment 1).
for r in std::mem::take(&mut index.redo) {
remove_dir_all_if_exists(&self.payload_dir(r.id))?;
}
// Evict oldest beyond the cap.
while index.undo.len() > self.cap {
let old = index.undo.remove(0);
tracing::debug!(id = old.id, "evicting oldest undo snapshot (ring full)");
remove_dir_all_if_exists(&self.payload_dir(old.id))?;
}
self.save_index(&index)?;
tracing::info!(id = meta.id, command = %meta.command, "undo snapshot recorded");
Ok(meta)
}
/// Drop a staged snapshot (the mutation rolled back, so there is
/// nothing to undo).
pub fn discard(&self, _staged: Staged) -> Result<()> {
tracing::debug!("discarding staged undo snapshot (op did not commit)");
remove_dir_all_if_exists(&self.staging_dir())
}
/// Metadata of the snapshot `undo` would restore, without
/// restoring it. `None` when the ring is empty.
pub fn peek_undo(&self) -> Result<Option<SnapshotMeta>> {
Ok(self.load_index()?.undo.last().map(Entry::meta))
}
/// Metadata of the snapshot `redo` would restore. `None` when the
/// redo stack is empty.
pub fn peek_redo(&self) -> Result<Option<SnapshotMeta>> {
Ok(self.load_index()?.redo.last().map(Entry::meta))
}
/// Restore the most recent snapshot: snapshot the current state
/// onto the redo stack (so redo is possible), then restore the
/// popped payload (text first, database last — ADR-0015 §6
/// commit-db-last). Returns the metadata of the command that was
/// undone, or `None` if there is nothing to undo.
pub fn undo(&self, live: &mut Connection) -> Result<Option<SnapshotMeta>> {
let mut index = self.load_index()?;
let Some(entry) = index.undo.pop() else {
return Ok(None);
};
// Snapshot current ("after entry.command") onto redo.
let redo_id = index.next_id;
index.next_id += 1;
self.snapshot_into(live, &self.payload_dir(redo_id))?;
index.redo.push(Entry {
id: redo_id,
timestamp: utc_iso8601_now(),
command: entry.command.clone(),
});
// Restore the popped payload, then persist the index, then
// delete the consumed payload (last so a failure there leaves
// the index consistent).
self.restore_from(live, &self.payload_dir(entry.id))?;
self.save_index(&index)?;
remove_dir_all_if_exists(&self.payload_dir(entry.id))?;
tracing::info!(command = %entry.command, "undo applied");
Ok(Some(entry.meta()))
}
/// Restore the most recently undone snapshot: snapshot the current
/// state onto the undo ring, then restore the redo payload.
/// Returns the metadata of the command that was re-applied, or
/// `None` if there is nothing to redo. Does **not** clear redo or
/// evict — only new work does that.
pub fn redo(&self, live: &mut Connection) -> Result<Option<SnapshotMeta>> {
let mut index = self.load_index()?;
let Some(entry) = index.redo.pop() else {
return Ok(None);
};
// Snapshot current ("before entry.command") back onto undo.
let undo_id = index.next_id;
index.next_id += 1;
self.snapshot_into(live, &self.payload_dir(undo_id))?;
index.undo.push(Entry {
id: undo_id,
timestamp: utc_iso8601_now(),
command: entry.command.clone(),
});
self.restore_from(live, &self.payload_dir(entry.id))?;
self.save_index(&index)?;
remove_dir_all_if_exists(&self.payload_dir(entry.id))?;
tracing::info!(command = %entry.command, "redo applied");
Ok(Some(entry.meta()))
}
/// Remove crash leftovers on project open: the `.staging/` dir and
/// any payload dir not referenced by the index.
pub fn cleanup(&self) -> Result<()> {
remove_dir_all_if_exists(&self.staging_dir())?;
if !self.root.exists() {
return Ok(());
}
let index = self.load_index()?;
let referenced: std::collections::HashSet<u64> = index
.undo
.iter()
.chain(index.redo.iter())
.map(|e| e.id)
.collect();
for entry in read_dir(&self.root)? {
let entry = entry.map_err(|e| io_err("read_dir entry", &self.root, e))?;
let name = entry.file_name();
let Some(name) = name.to_str() else { continue };
if let Ok(id) = name.parse::<u64>()
&& !referenced.contains(&id)
{
tracing::debug!(id, "removing orphan snapshot payload");
remove_dir_all_if_exists(&self.payload_dir(id))?;
}
}
Ok(())
}
// ---- internals -----------------------------------------------------
fn staging_dir(&self) -> PathBuf {
self.root.join(STAGING_DIR)
}
fn payload_dir(&self, id: u64) -> PathBuf {
self.root.join(id.to_string())
}
fn index_path(&self) -> PathBuf {
self.root.join(INDEX_FILE)
}
/// Capture the current state (database + text) into `dir`.
fn snapshot_into(&self, live: &Connection, dir: &Path) -> Result<()> {
create_dir_all(dir)?;
live.backup(MAIN_DB, dir.join(PLAYGROUND_DB), None)?;
copy_if_exists(
&self.project_dir.join(PROJECT_YAML),
&dir.join(PROJECT_YAML),
)?;
self.copy_data_dir(&self.project_dir.join(DATA_DIR), &dir.join(DATA_DIR))?;
Ok(())
}
/// Restore the project state from a payload `dir`: text first, then
/// the database (commit-db-last, ADR-0015 §6).
fn restore_from(&self, live: &mut Connection, dir: &Path) -> Result<()> {
// Text: project.yaml (atomic replace) + a wholesale data/ swap.
let yaml_src = dir.join(PROJECT_YAML);
if yaml_src.exists() {
atomic_replace(&yaml_src, &self.project_dir.join(PROJECT_YAML))?;
}
self.replace_data_dir(&dir.join(DATA_DIR), &self.project_dir.join(DATA_DIR))?;
// Database last.
live.restore(MAIN_DB, dir.join(PLAYGROUND_DB), NO_PROGRESS)?;
Ok(())
}
/// Copy a flat `data/` directory (`*.csv`, no subdirs per ADR-0015)
/// into `dst`. Tolerates a missing source (empty project).
fn copy_data_dir(&self, src: &Path, dst: &Path) -> Result<()> {
if !src.exists() {
return Ok(());
}
create_dir_all(dst)?;
for entry in read_dir(src)? {
let entry = entry.map_err(|e| io_err("read_dir entry", src, e))?;
let path = entry.path();
if path.is_file() {
let name = entry.file_name();
copy_if_exists(&path, &dst.join(name))?;
}
}
Ok(())
}
/// Make the project's `data/` match the snapshot's exactly: clear
/// the live data dir, then copy the snapshot's files in.
fn replace_data_dir(&self, src: &Path, dst: &Path) -> Result<()> {
remove_dir_all_if_exists(dst)?;
self.copy_data_dir(src, dst)
}
fn load_index(&self) -> Result<Index> {
let path = self.index_path();
let mut index = if path.exists() {
let body = fs::read_to_string(&path).map_err(|e| io_err("read index", &path, e))?;
serde_yml::from_str(&body).map_err(|e| SnapshotError::Index {
message: e.to_string(),
})?
} else {
Index::default()
};
// Reconcile next_id against any payload dirs present, so a crash
// that left an orphan can never cause an id to be reused.
if self.root.exists() {
let mut max_seen = index.next_id;
for entry in read_dir(&self.root)? {
let entry = entry.map_err(|e| io_err("read_dir entry", &self.root, e))?;
if let Some(name) = entry.file_name().to_str()
&& let Ok(id) = name.parse::<u64>()
{
max_seen = max_seen.max(id + 1);
}
}
index.next_id = max_seen;
}
Ok(index)
}
fn save_index(&self, index: &Index) -> Result<()> {
create_dir_all(&self.root)?;
let body = serde_yml::to_string(index).map_err(|e| SnapshotError::Index {
message: e.to_string(),
})?;
let path = self.index_path();
let tmp = path.with_extension("yaml.tmp");
write_fsync(&tmp, body.as_bytes())?;
rename(&tmp, &path)?;
Ok(())
}
}
/// Restore's progress callback type, fixed to `None`.
const NO_PROGRESS: Option<fn(rusqlite::backup::Progress)> = None;
// ---- small fs helpers (own error context) -----------------------------
fn io_err(op: &'static str, path: &Path, source: std::io::Error) -> SnapshotError {
SnapshotError::Io {
op,
path: path.to_path_buf(),
source,
}
}
fn create_dir_all(dir: &Path) -> Result<()> {
fs::create_dir_all(dir).map_err(|e| io_err("create_dir_all", dir, e))
}
fn read_dir(dir: &Path) -> Result<fs::ReadDir> {
fs::read_dir(dir).map_err(|e| io_err("read_dir", dir, e))
}
fn rename(from: &Path, to: &Path) -> Result<()> {
fs::rename(from, to).map_err(|e| io_err("rename", to, e))
}
fn remove_dir_all_if_exists(dir: &Path) -> Result<()> {
if dir.exists() {
fs::remove_dir_all(dir).map_err(|e| io_err("remove_dir_all", dir, e))?;
}
Ok(())
}
fn copy_if_exists(src: &Path, dst: &Path) -> Result<()> {
if src.exists() {
if let Some(parent) = dst.parent() {
create_dir_all(parent)?;
}
fs::copy(src, dst).map_err(|e| io_err("copy", src, e))?;
}
Ok(())
}
/// Replace `dst` with `src` atomically via copy-to-temp + rename.
fn atomic_replace(src: &Path, dst: &Path) -> Result<()> {
let body = fs::read(src).map_err(|e| io_err("read", src, e))?;
let tmp = dst.with_extension("restore.tmp");
write_fsync(&tmp, &body)?;
rename(&tmp, dst)
}
fn write_fsync(path: &Path, bytes: &[u8]) -> Result<()> {
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut f = fs::File::create(path).map_err(|e| io_err("create", path, e))?;
f.write_all(bytes).map_err(|e| io_err("write", path, e))?;
f.sync_all().map_err(|e| io_err("fsync", path, e))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
use tempfile::TempDir;
/// A test project: a temp dir with a file-backed db connection and
/// helpers to read/write the text targets the store snapshots.
struct Fixture {
_dir: TempDir,
path: PathBuf,
conn: Connection,
}
impl Fixture {
fn new() -> Self {
let dir = TempDir::new().unwrap();
let path = dir.path().to_path_buf();
let conn = Connection::open(path.join(PLAYGROUND_DB)).unwrap();
conn.execute_batch("CREATE TABLE t (n INTEGER);").unwrap();
Self {
_dir: dir,
path,
conn,
}
}
fn store(&self) -> SnapshotStore {
SnapshotStore::new(&self.path, DEFAULT_RING_CAPACITY)
}
fn store_cap(&self, cap: usize) -> SnapshotStore {
SnapshotStore::new(&self.path, cap)
}
fn insert(&self, n: i64) {
self.conn
.execute("INSERT INTO t (n) VALUES (?1)", [n])
.unwrap();
}
fn rows(&self) -> Vec<i64> {
let mut stmt = self.conn.prepare("SELECT n FROM t ORDER BY n").unwrap();
stmt.query_map([], |r| r.get::<_, i64>(0))
.unwrap()
.map(std::result::Result::unwrap)
.collect()
}
fn write_yaml(&self, body: &str) {
fs::write(self.path.join(PROJECT_YAML), body).unwrap();
}
fn read_yaml(&self) -> String {
fs::read_to_string(self.path.join(PROJECT_YAML)).unwrap()
}
fn write_csv(&self, table: &str, body: &str) {
let data = self.path.join(DATA_DIR);
fs::create_dir_all(&data).unwrap();
fs::write(data.join(format!("{table}.csv")), body).unwrap();
}
fn csv_exists(&self, table: &str) -> bool {
self.path
.join(DATA_DIR)
.join(format!("{table}.csv"))
.exists()
}
}
fn stage_finalize(store: &SnapshotStore, conn: &Connection, command: &str) {
let staged = store.stage(conn, command).unwrap();
store.finalize(staged).unwrap();
}
#[test]
fn stage_then_finalize_records_undo_entry() {
let fx = Fixture::new();
let store = fx.store();
assert!(store.peek_undo().unwrap().is_none());
stage_finalize(&store, &fx.conn, "delete from t where n=1");
let meta = store.peek_undo().unwrap().expect("an undo entry");
assert_eq!(meta.command, "delete from t where n=1");
assert!(!meta.timestamp.is_empty());
}
#[test]
fn discard_leaves_no_undo_entry() {
let fx = Fixture::new();
let store = fx.store();
let staged = store.stage(&fx.conn, "drop table t").unwrap();
store.discard(staged).unwrap();
assert!(store.peek_undo().unwrap().is_none());
assert!(!store.staging_dir().exists());
}
#[test]
fn undo_restores_database_state() {
let fx = Fixture::new();
let store = fx.store();
fx.insert(1);
// Snapshot the pre-mutation state, then mutate.
let staged = store.stage(&fx.conn, "insert into t values (2)").unwrap();
store.finalize(staged).unwrap();
fx.insert(2);
assert_eq!(fx.rows(), vec![1, 2]);
let mut conn = fx.conn;
let undone = store.undo(&mut conn).unwrap().expect("something to undo");
assert_eq!(undone.command, "insert into t values (2)");
// Re-read through a fresh connection to be sure it is the file
// that changed, not just connection cache.
let check = Connection::open(fx.path.join(PLAYGROUND_DB)).unwrap();
let n: Vec<i64> = check
.prepare("SELECT n FROM t ORDER BY n")
.unwrap()
.query_map([], |r| r.get(0))
.unwrap()
.map(std::result::Result::unwrap)
.collect();
assert_eq!(n, vec![1], "row 2 should be undone");
}
#[test]
fn redo_reapplies_after_undo() {
let fx = Fixture::new();
let store = fx.store();
fx.insert(1);
stage_finalize(&store, &fx.conn, "insert into t values (2)");
fx.insert(2);
let mut conn = fx.conn;
store.undo(&mut conn).unwrap();
// restore the connection's view by reopening through the fixture
// (the in-place restore mutated the same file/connection).
let after_undo: Vec<i64> = conn
.prepare("SELECT n FROM t ORDER BY n")
.unwrap()
.query_map([], |r| r.get(0))
.unwrap()
.map(std::result::Result::unwrap)
.collect();
assert_eq!(after_undo, vec![1]);
let redone = store.redo(&mut conn).unwrap().expect("something to redo");
assert_eq!(redone.command, "insert into t values (2)");
let after_redo: Vec<i64> = conn
.prepare("SELECT n FROM t ORDER BY n")
.unwrap()
.query_map([], |r| r.get(0))
.unwrap()
.map(std::result::Result::unwrap)
.collect();
assert_eq!(after_redo, vec![1, 2], "redo should re-apply the insert");
}
#[test]
fn undo_restores_text_files() {
let mut fx = Fixture::new();
let store = fx.store();
fx.write_yaml("version: 1\nA\n");
fx.write_csv("t", "n\n1\n");
stage_finalize(&store, &fx.conn, "update t set n=9 --all-rows");
fx.write_yaml("version: 1\nB\n");
fx.write_csv("t", "n\n9\n");
store.undo(&mut fx.conn).unwrap();
assert_eq!(fx.read_yaml(), "version: 1\nA\n");
assert_eq!(
fs::read_to_string(fx.path.join(DATA_DIR).join("t.csv")).unwrap(),
"n\n1\n"
);
}
#[test]
fn undo_removes_csv_for_table_created_after_snapshot() {
let mut fx = Fixture::new();
let store = fx.store();
fx.write_csv("t", "n\n1\n");
// Snapshot, then a later command "creates" another table's CSV.
stage_finalize(&store, &fx.conn, "create table u with pk id(serial)");
fx.write_csv("u", "id\n1\n");
assert!(fx.csv_exists("u"));
store.undo(&mut fx.conn).unwrap();
assert!(fx.csv_exists("t"), "t existed at the snapshot");
assert!(
!fx.csv_exists("u"),
"u was created after the snapshot and must be removed on undo"
);
}
#[test]
fn new_work_clears_redo() {
let fx = Fixture::new();
let store = fx.store();
stage_finalize(&store, &fx.conn, "delete from t --all-rows");
let mut conn = fx.conn;
store.undo(&mut conn).unwrap();
assert!(store.peek_redo().unwrap().is_some(), "redo available");
// New work: stage + finalize again.
let staged = store.stage(&conn, "drop table t").unwrap();
store.finalize(staged).unwrap();
assert!(
store.peek_redo().unwrap().is_none(),
"new work discards the redo stack"
);
}
#[test]
fn eviction_keeps_only_cap_entries() {
let fx = Fixture::new();
let store = fx.store_cap(2);
stage_finalize(&store, &fx.conn, "cmd 1");
stage_finalize(&store, &fx.conn, "cmd 2");
stage_finalize(&store, &fx.conn, "cmd 3");
// Top is cmd 3; only 2 payloads survive.
let meta = store.peek_undo().unwrap().unwrap();
assert_eq!(meta.command, "cmd 3");
let payload_dirs = fs::read_dir(&store.root)
.unwrap()
.filter_map(std::result::Result::ok)
.filter(|e| e.file_name().to_str().is_some_and(|n| n.parse::<u64>().is_ok()))
.count();
assert_eq!(payload_dirs, 2, "ring capped at 2 payloads");
}
#[test]
fn empty_stacks_return_none() {
let fx = Fixture::new();
let store = fx.store();
let mut conn = fx.conn;
assert!(store.undo(&mut conn).unwrap().is_none());
assert!(store.redo(&mut conn).unwrap().is_none());
assert!(store.peek_undo().unwrap().is_none());
assert!(store.peek_redo().unwrap().is_none());
}
#[test]
fn peek_does_not_consume() {
let fx = Fixture::new();
let store = fx.store();
stage_finalize(&store, &fx.conn, "delete from t --all-rows");
let a = store.peek_undo().unwrap();
let b = store.peek_undo().unwrap();
assert_eq!(a, b);
assert!(a.is_some());
}
#[test]
fn cleanup_removes_orphan_payloads_and_staging() {
let fx = Fixture::new();
let store = fx.store();
stage_finalize(&store, &fx.conn, "keep me");
// Fabricate an orphan payload dir and a leftover staging dir.
fs::create_dir_all(store.payload_dir(999)).unwrap();
fs::create_dir_all(store.staging_dir()).unwrap();
store.cleanup().unwrap();
assert!(!store.payload_dir(999).exists(), "orphan removed");
assert!(!store.staging_dir().exists(), "staging removed");
assert!(
store.peek_undo().unwrap().is_some(),
"referenced snapshot retained"
);
}
#[test]
fn next_id_is_not_reused_after_orphan() {
let fx = Fixture::new();
let store = fx.store();
// An orphan dir with a high id must push next_id past it.
fs::create_dir_all(store.payload_dir(41)).unwrap();
stage_finalize(&store, &fx.conn, "cmd");
let meta = store.peek_undo().unwrap().unwrap();
assert!(meta.id >= 42, "id allocated above the orphan, got {}", meta.id);
}
}