feat: ADR-0006 §8 step 3 — wire the snapshot ring into the db worker

- snapshot_then() brackets all 19 mutating dispatch arms: stage a
  pre-op snapshot, finalise on success / discard on rollback; gated
  on a user command source (internal ops like open-time rebuild are
  not snapshotted) and on undo being enabled
- BatchState + BeginBatch/EndBatch requests: a batch takes one
  boundary snapshot, suppresses per-command snapshots, and finalises
  iff a mutation committed (one undo step per replay/batch)
- Undo/Redo/PeekUndo/PeekRedo requests handled in worker_loop with
  &mut conn for the restore; cleanup() sweeps crash leftovers on open
- Database::{undo,redo,peek_undo,peek_redo,begin_batch,end_batch} +
  open_with_persistence_and_undo(); snapshot failures are non-fatal
  (logged), restore failures surface
- 6 Tier-3 integration tests through the real worker

1680 passed / 0 failed / 1 ignored; clippy clean.
This commit is contained in:
claude@clouddev1
2026-05-24 20:31:05 +00:00
parent 64eee3ed6d
commit a97069c02e
2 changed files with 566 additions and 25 deletions
+330 -25
View File
@@ -28,7 +28,7 @@ use std::thread;
use rusqlite::Connection;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info};
use tracing::{debug, info, warn};
use crate::dsl::action::ReferentialAction;
use crate::dsl::command::{
@@ -46,6 +46,7 @@ use crate::persistence::{
SchemaSnapshot, TableSchema, TableSnapshot, decode_cell, parse_csv, parse_schema,
};
use crate::project::{DATA_DIR, PROJECT_YAML};
use crate::undo::{DEFAULT_RING_CAPACITY, SnapshotError, SnapshotMeta, SnapshotStore, Staged};
/// Inbox capacity. The worker is fast enough that this rarely
/// matters; `64` is a generous head-room for bursts.
@@ -681,6 +682,40 @@ enum Request {
source: crate::dsl::grammar::IdentSource,
reply: oneshot::Sender<Result<Vec<String>, DbError>>,
},
/// Restore the most recent undo snapshot (ADR-0006 Amendment 1).
/// Replies with the metadata of the command that was undone, or
/// `None` if there is nothing to undo (or undo is disabled).
Undo {
reply: oneshot::Sender<Result<Option<SnapshotMeta>, DbError>>,
},
/// Re-apply the most recently undone snapshot. `None` if there is
/// nothing to redo.
Redo {
reply: oneshot::Sender<Result<Option<SnapshotMeta>, DbError>>,
},
/// Read — without restoring — the snapshot `undo` would restore.
/// Used to build the confirmation prompt. `None` if the ring is
/// empty or undo is disabled.
PeekUndo {
reply: oneshot::Sender<Result<Option<SnapshotMeta>, DbError>>,
},
/// Read — without restoring — the snapshot `redo` would restore.
PeekRedo {
reply: oneshot::Sender<Result<Option<SnapshotMeta>, DbError>>,
},
/// Open a batch (ADR-0006 Amendment 1): take one boundary
/// snapshot for the whole batch and suppress per-command
/// snapshots until `EndBatch`. Used by `replay` so a multi-command
/// replay is a single undo step. `source` is the batch command.
BeginBatch {
source: Option<String>,
reply: oneshot::Sender<()>,
},
/// Close a batch: finalise the boundary snapshot into the ring if
/// any mutation committed during the batch, else discard it.
EndBatch {
reply: oneshot::Sender<()>,
},
}
impl Database {
@@ -693,7 +728,7 @@ impl Database {
/// are skipped — useful for unit tests that exercise the
/// SQLite layer in isolation.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DbError> {
Self::open_inner(path, None)
Self::open_inner(path, None, false)
}
/// Open a database with per-command persistence wired in
@@ -705,12 +740,24 @@ impl Database {
path: P,
persistence: Persistence,
) -> Result<Self, DbError> {
Self::open_inner(path, Some(persistence))
Self::open_inner(path, Some(persistence), false)
}
/// Open with per-command persistence *and* the undo/snapshot ring
/// (ADR-0006 Amendment 1). `undo_enabled` is `false` under the
/// `--no-undo` CLI flag, in which case no snapshots are taken.
pub fn open_with_persistence_and_undo<P: AsRef<Path>>(
path: P,
persistence: Persistence,
undo_enabled: bool,
) -> Result<Self, DbError> {
Self::open_inner(path, Some(persistence), undo_enabled)
}
fn open_inner<P: AsRef<Path>>(
path: P,
persistence: Option<Persistence>,
undo_enabled: bool,
) -> Result<Self, DbError> {
let path_display = path.as_ref().to_string_lossy().into_owned();
let conn = match path.as_ref().to_str() {
@@ -721,10 +768,27 @@ impl Database {
info!(path = %path_display, "opened database");
configure_connection(&conn).map_err(DbError::from_rusqlite)?;
// The undo ring needs the project directory; it is only
// available when persistence is wired and undo is enabled.
let snapshots = if undo_enabled {
persistence
.as_ref()
.map(|p| SnapshotStore::new(p.project_path(), DEFAULT_RING_CAPACITY))
} else {
None
};
if let Some(store) = &snapshots {
// Sweep crash leftovers (`.staging/`, orphan payloads).
if let Err(e) = store.cleanup() {
warn!(error = %e, "undo snapshot cleanup on open failed");
}
info!("undo snapshots enabled");
}
let (tx, rx) = mpsc::channel::<Request>(REQUEST_CHANNEL_CAPACITY);
thread::Builder::new()
.name("rdbms-db-worker".to_string())
.spawn(move || worker_loop(conn, persistence, rx))
.spawn(move || worker_loop(conn, persistence, snapshots, rx))
.map_err(|e| DbError::Io(e.to_string()))?;
Ok(Self { inbox: tx })
@@ -1238,6 +1302,57 @@ impl Database {
recv.await.map_err(|_| DbError::WorkerGone)?
}
/// Restore the most recent undo snapshot (ADR-0006 Amendment 1).
/// `Ok(Some(meta))` reports the command that was undone;
/// `Ok(None)` means nothing to undo (or undo is disabled).
pub async fn undo(&self) -> Result<Option<SnapshotMeta>, DbError> {
let (reply, recv) = oneshot::channel();
self.send(Request::Undo { reply }).await?;
recv.await.map_err(|_| DbError::WorkerGone)?
}
/// Re-apply the most recently undone snapshot. `Ok(None)` means
/// nothing to redo.
pub async fn redo(&self) -> Result<Option<SnapshotMeta>, DbError> {
let (reply, recv) = oneshot::channel();
self.send(Request::Redo { reply }).await?;
recv.await.map_err(|_| DbError::WorkerGone)?
}
/// Metadata of the snapshot `undo` would restore, without
/// restoring it — for the confirmation prompt.
pub async fn peek_undo(&self) -> Result<Option<SnapshotMeta>, DbError> {
let (reply, recv) = oneshot::channel();
self.send(Request::PeekUndo { reply }).await?;
recv.await.map_err(|_| DbError::WorkerGone)?
}
/// Metadata of the snapshot `redo` would restore.
pub async fn peek_redo(&self) -> Result<Option<SnapshotMeta>, DbError> {
let (reply, recv) = oneshot::channel();
self.send(Request::PeekRedo { reply }).await?;
recv.await.map_err(|_| DbError::WorkerGone)?
}
/// Open a batch so a multi-command operation (`replay`, future
/// batch commands) records a single undo step (ADR-0006
/// Amendment 1). Pair with [`Database::end_batch`]. `source` is
/// the batch command text recorded on the boundary snapshot.
pub async fn begin_batch(&self, source: Option<String>) -> Result<(), DbError> {
let (reply, recv) = oneshot::channel();
self.send(Request::BeginBatch { source, reply }).await?;
recv.await.map_err(|_| DbError::WorkerGone)
}
/// Close a batch opened with [`Database::begin_batch`]. The
/// boundary snapshot is kept iff a mutation committed during the
/// batch.
pub async fn end_batch(&self) -> Result<(), DbError> {
let (reply, recv) = oneshot::channel();
self.send(Request::EndBatch { reply }).await?;
recv.await.map_err(|_| DbError::WorkerGone)
}
async fn send(&self, req: Request) -> Result<(), DbError> {
self.inbox.send(req).await.map_err(|_| DbError::WorkerGone)
}
@@ -1325,16 +1440,192 @@ fn iso8601_now() -> String {
fn worker_loop(
conn: Connection,
persistence: Option<Persistence>,
snapshots: Option<SnapshotStore>,
mut rx: mpsc::Receiver<Request>,
) {
debug!("db worker started");
// `conn` must be mutable: restoring a snapshot (undo/redo) writes
// into the live connection via the backup API (`&mut`).
let mut conn = conn;
let snap = snapshots.as_ref();
let mut batch = BatchState::default();
while let Some(req) = rx.blocking_recv() {
handle_request(&conn, persistence.as_ref(), req);
// Undo/redo/peek/batch are handled here: undo/redo need
// `&mut conn` for the restore, and batch state lives across
// requests. Everything else goes to `handle_request`, which
// brackets mutations with a pre-op snapshot.
match req {
Request::Undo { reply } => {
let _ = reply.send(do_undo(snap, &mut conn));
}
Request::Redo { reply } => {
let _ = reply.send(do_redo(snap, &mut conn));
}
Request::PeekUndo { reply } => {
let _ = reply.send(peek_undo_op(snap));
}
Request::PeekRedo { reply } => {
let _ = reply.send(peek_redo_op(snap));
}
Request::BeginBatch { source, reply } => {
begin_batch(snap, &conn, &mut batch, source.as_deref());
let _ = reply.send(());
}
Request::EndBatch { reply } => {
end_batch(snap, &mut batch);
let _ = reply.send(());
}
other => handle_request(&conn, persistence.as_ref(), snap, &mut batch, other),
}
}
debug!("db worker exiting");
}
fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Request) {
/// Worker-side undo bracketing state for the request stream.
/// `active` is set between `BeginBatch`/`EndBatch` so per-command
/// snapshots are suppressed in favour of one boundary snapshot for
/// the whole batch (ADR-0006 Amendment 1).
#[derive(Default)]
struct BatchState {
active: bool,
dirty: bool,
staged: Option<Staged>,
}
fn snapshot_to_db_error(e: SnapshotError) -> DbError {
DbError::Io(e.to_string())
}
/// Stage a pre-mutation snapshot, never failing the user's command if
/// the snapshot itself can't be taken (the real persistence is the
/// durable state — the snapshot is a best-effort safety net). Returns
/// `None` when undo is off, the command has no user `source` (an
/// internal op, e.g. open-time rebuild — not undoable), or staging
/// failed.
fn stage_pre_mutation(
snap: Option<&SnapshotStore>,
conn: &Connection,
source: Option<&str>,
) -> Option<Staged> {
let store = snap?;
let src = source?;
match store.stage(conn, src) {
Ok(staged) => Some(staged),
Err(e) => {
warn!(error = %e, "could not stage undo snapshot; command proceeds without undo");
None
}
}
}
/// Run a mutating handler with undo bracketing: stage before, then
/// finalise on success / discard on failure — or, inside a batch,
/// just mark the batch dirty so its single boundary snapshot is kept.
/// The command result is always sent; snapshot bookkeeping never
/// fails the user's actual work.
fn snapshot_then<T>(
snap: Option<&SnapshotStore>,
batch: &mut BatchState,
conn: &Connection,
source: Option<&str>,
reply: oneshot::Sender<Result<T, DbError>>,
run: impl FnOnce() -> Result<T, DbError>,
) {
let staged = if batch.active {
None
} else {
stage_pre_mutation(snap, conn, source)
};
let result = run();
let committed = result.is_ok();
if batch.active {
if committed {
batch.dirty = true;
}
} else if let (Some(store), Some(st)) = (snap, staged) {
let outcome = if committed {
store.finalize(st).map(|_| ())
} else {
store.discard(st)
};
if let Err(e) = outcome {
warn!(error = %e, "undo snapshot bookkeeping failed (command already applied)");
}
}
let _ = reply.send(result);
}
/// Open a batch: one boundary snapshot, then suppress per-command
/// snapshots until `end_batch`.
fn begin_batch(
snap: Option<&SnapshotStore>,
conn: &Connection,
batch: &mut BatchState,
source: Option<&str>,
) {
if batch.active {
warn!("BeginBatch while a batch is active; ignoring (no nested batches)");
return;
}
batch.staged = stage_pre_mutation(snap, conn, source);
batch.active = true;
batch.dirty = false;
}
/// Close a batch: keep the boundary snapshot iff a mutation committed
/// during it, else discard it (an all-skips batch leaves no undo step).
fn end_batch(snap: Option<&SnapshotStore>, batch: &mut BatchState) {
if !batch.active {
warn!("EndBatch with no active batch; ignoring");
return;
}
if let (Some(store), Some(st)) = (snap, batch.staged.take()) {
let outcome = if batch.dirty {
store.finalize(st).map(|_| ())
} else {
store.discard(st)
};
if let Err(e) = outcome {
warn!(error = %e, "batch undo snapshot bookkeeping failed");
}
}
batch.active = false;
batch.dirty = false;
}
fn do_undo(
snap: Option<&SnapshotStore>,
conn: &mut Connection,
) -> Result<Option<SnapshotMeta>, DbError> {
snap.map_or(Ok(None), |store| {
store.undo(conn).map_err(snapshot_to_db_error)
})
}
fn do_redo(
snap: Option<&SnapshotStore>,
conn: &mut Connection,
) -> Result<Option<SnapshotMeta>, DbError> {
snap.map_or(Ok(None), |store| {
store.redo(conn).map_err(snapshot_to_db_error)
})
}
fn peek_undo_op(snap: Option<&SnapshotStore>) -> Result<Option<SnapshotMeta>, DbError> {
snap.map_or(Ok(None), |s| s.peek_undo().map_err(snapshot_to_db_error))
}
fn peek_redo_op(snap: Option<&SnapshotStore>) -> Result<Option<SnapshotMeta>, DbError> {
snap.map_or(Ok(None), |s| s.peek_redo().map_err(snapshot_to_db_error))
}
fn handle_request(
conn: &Connection,
persistence: Option<&Persistence>,
snap: Option<&SnapshotStore>,
batch: &mut BatchState,
req: Request,
) {
match req {
Request::CreateTable {
name,
@@ -1343,7 +1634,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_create_table(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_create_table(
conn,
persistence,
source.as_deref(),
@@ -1357,7 +1648,9 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_drop_table(conn, persistence, source.as_deref(), &name));
snapshot_then(snap, batch, conn, source.as_deref(), reply, || {
do_drop_table(conn, persistence, source.as_deref(), &name)
});
}
Request::AddColumn {
table,
@@ -1365,7 +1658,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_add_column(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_add_column(
conn,
persistence,
source.as_deref(),
@@ -1380,7 +1673,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_drop_column(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_drop_column(
conn,
persistence,
source.as_deref(),
@@ -1396,7 +1689,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_rename_column(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_rename_column(
conn,
persistence,
source.as_deref(),
@@ -1413,7 +1706,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_change_column_type(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_change_column_type(
conn,
persistence,
source.as_deref(),
@@ -1450,7 +1743,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_add_relationship(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_add_relationship(
conn,
persistence,
source.as_deref(),
@@ -1469,7 +1762,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_drop_relationship(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_drop_relationship(
conn,
persistence,
source.as_deref(),
@@ -1483,7 +1776,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_add_index(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_add_index(
conn,
persistence,
source.as_deref(),
@@ -1497,7 +1790,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_drop_index(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_drop_index(
conn,
persistence,
source.as_deref(),
@@ -1511,7 +1804,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_add_constraint(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_add_constraint(
conn,
persistence,
source.as_deref(),
@@ -1527,7 +1820,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_drop_constraint(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_drop_constraint(
conn,
persistence,
source.as_deref(),
@@ -1543,7 +1836,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_insert(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_insert(
conn,
persistence,
source.as_deref(),
@@ -1559,7 +1852,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_update(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_update(
conn,
persistence,
source.as_deref(),
@@ -1574,7 +1867,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_delete(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_delete(
conn,
persistence,
source.as_deref(),
@@ -1615,7 +1908,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
returning,
reply,
} => {
let _ = reply.send(do_sql_insert(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_sql_insert(
conn,
persistence,
source.as_deref(),
@@ -1633,7 +1926,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
returning,
reply,
} => {
let _ = reply.send(do_sql_update(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_sql_update(
conn,
persistence,
source.as_deref(),
@@ -1649,7 +1942,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
returning,
reply,
} => {
let _ = reply.send(do_sql_delete(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_sql_delete(
conn,
persistence,
source.as_deref(),
@@ -1663,7 +1956,7 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
source,
reply,
} => {
let _ = reply.send(do_rebuild_from_text(
snapshot_then(snap, batch, conn, source.as_deref(), reply, || do_rebuild_from_text(
conn,
persistence,
source.as_deref(),
@@ -1691,6 +1984,18 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
let result = do_list_names_for(conn, source);
let _ = reply.send(result);
}
// Undo/redo/peek/batch are intercepted in `worker_loop` (they
// need `&mut conn` or persistent batch state) and never reach
// here. Listed explicitly so a new variant still forces a
// decision at compile time.
Request::Undo { .. }
| Request::Redo { .. }
| Request::PeekUndo { .. }
| Request::PeekRedo { .. }
| Request::BeginBatch { .. }
| Request::EndBatch { .. } => {
unreachable!("undo/redo/peek/batch are handled in worker_loop")
}
}
}