Iteration 2: per-command write-through to project.yaml, CSVs, history.log

Every successful user command now persists through to YAML, the
affected CSVs, and history.log inside the same SQLite transaction,
with the commit-db-last ordering from ADR-0015 §6: validate ->
mutate -> stage text + fsync -> atomic rename -> append history ->
commit. A failure in any text-write step rolls back the SQLite tx,
so disk state is unchanged on failure. Persistence failures are
routed through a new AppEvent::PersistenceFatal which sets a
fatal_message on the App, emits Action::Quit, and is printed to
stderr after terminal teardown so the banner remains above the
shell prompt (ADR-0015 §8).

New persistence module owns the file formats: hand-rolled YAML
schema writer, per-type CSV encoder (RFC 4180, NULL distinct from
empty string, base64 blobs), append-only history.log with ISO-8601
timestamps and successful-only entries. Atomic per-file writes via
tmp + fsync + rename.

The db worker holds an Option<Persistence>; tests still use
Database::open(":memory:") with no persistence. Action::ExecuteDsl
gains a source field carrying the user-typed text, threaded
through to history.log.

Tests: 289 passing (256 lib + 7 new integration + 9 lifecycle + 17
walking-skeleton), 0 failing, 0 skipped. Clippy clean with nursery
lints.
This commit is contained in:
claude@clouddev1
2026-05-07 21:09:15 +00:00
parent 601d3b6c51
commit 5c076f6d8f
15 changed files with 2275 additions and 213 deletions
+323
View File
@@ -0,0 +1,323 @@
//! Per-command persistence to `project.yaml`, `data/*.csv`,
//! and `history.log` (ADR-0015 §3–§6).
//!
//! Iteration 2 wiring: every successful user command, after
//! its SQLite mutations are staged but before the transaction
//! commits, asks `Persistence` to write the affected text
//! targets atomically (write-temp + fsync + rename). The
//! commit-db-last ordering (ADR-0015 §6) is enforced in
//! `db.rs`; this module owns the file-format details and the
//! atomic-write primitive.
//!
//! Failure semantics: any write or rename failure produces a
//! `PersistenceError`. The caller (the db worker) is
//! responsible for translating that into a fatal error and
//! letting the SQLite tx roll back.
use std::fs;
use std::io::Write as _;
use std::path::{Path, PathBuf};
use crate::dsl::action::ReferentialAction;
use crate::dsl::types::Type;
use crate::project::{DATA_DIR, HISTORY_LOG, PROJECT_YAML};
mod csv_io;
mod history;
mod yaml;
/// Owns persistence to a single project on disk. Cheap to
/// move; the db worker holds one instance for its lifetime.
#[derive(Debug, Clone)]
pub struct Persistence {
project_path: PathBuf,
}
#[derive(Debug, thiserror::Error)]
pub enum PersistenceError {
#[error("could not {operation} `{path}`: {source}")]
Io {
operation: &'static str,
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("could not encode {kind} for `{path}`: {message}")]
Encode {
kind: &'static str,
path: PathBuf,
message: String,
},
}
impl PersistenceError {
/// Path the failure was associated with.
#[must_use]
pub fn path(&self) -> &Path {
match self {
Self::Io { path, .. } | Self::Encode { path, .. } => path,
}
}
/// Short label for the kind of operation that failed,
/// suitable for the fatal banner.
#[must_use]
pub const fn operation(&self) -> &'static str {
match self {
Self::Io { operation, .. } => operation,
Self::Encode { .. } => "encode",
}
}
}
/// Snapshot of the full schema as it is to be written to
/// `project.yaml`.
///
/// Read from the database after the in-flight mutation has
/// staged its changes (within the same SQLite tx) so the YAML
/// reflects the post-mutation state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaSnapshot {
pub created_at: String,
pub tables: Vec<TableSchema>,
pub relationships: Vec<RelationshipSchema>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableSchema {
pub name: String,
pub primary_key: Vec<String>,
pub columns: Vec<ColumnSchema>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnSchema {
pub name: String,
pub user_type: Type,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RelationshipSchema {
pub name: String,
pub parent_table: String,
pub parent_column: String,
pub child_table: String,
pub child_column: String,
pub on_delete: ReferentialAction,
pub on_update: ReferentialAction,
}
/// Snapshot of one table's full row data, for writing
/// `data/<table>.csv`. The column order matches the table's
/// declaration order; the row tuples are aligned to it.
#[derive(Debug, Clone, PartialEq)]
pub struct TableSnapshot {
pub name: String,
pub columns: Vec<ColumnSchema>,
pub rows: Vec<Vec<CellValue>>,
}
/// A scalar cell value, in the small ADT understood by the
/// CSV encoder.
///
/// `Null` and `Text("")` are distinct. `Eq` is intentionally
/// NOT derived because `Real(f64)` does not satisfy it (NaN);
/// use `PartialEq` for comparison.
#[derive(Debug, Clone, PartialEq)]
pub enum CellValue {
Null,
Integer(i64),
Real(f64),
Text(String),
Blob(Vec<u8>),
}
impl Persistence {
#[must_use]
pub const fn new(project_path: PathBuf) -> Self {
Self { project_path }
}
/// Project root directory. Used in tests and diagnostics.
#[must_use]
pub fn project_path(&self) -> &Path {
&self.project_path
}
/// Write `project.yaml` from a full schema snapshot.
/// Atomic: writes to `project.yaml.tmp`, fsyncs, then
/// renames over the destination.
pub fn write_schema(&self, schema: &SchemaSnapshot) -> Result<(), PersistenceError> {
let body = yaml::serialize_schema(schema);
atomic_write(&self.project_path.join(PROJECT_YAML), body.as_bytes())
}
/// Write `data/<table>.csv` from a table snapshot. Atomic
/// per file. Creates the `data/` directory if missing
/// (tolerant of fresh projects).
pub fn write_table_data(&self, table: &TableSnapshot) -> Result<(), PersistenceError> {
let data_dir = self.project_path.join(DATA_DIR);
fs::create_dir_all(&data_dir).map_err(|source| PersistenceError::Io {
operation: "create",
path: data_dir.clone(),
source,
})?;
let body =
csv_io::serialize_table(table).map_err(|message| PersistenceError::Encode {
kind: "CSV",
path: data_dir.join(format!("{}.csv", table.name)),
message,
})?;
atomic_write(&data_dir.join(format!("{}.csv", table.name)), &body)
}
/// Remove `data/<table>.csv` if present. Used when a
/// table is dropped so stale CSVs don't linger.
pub fn delete_table_data(&self, table_name: &str) -> Result<(), PersistenceError> {
let path = self
.project_path
.join(DATA_DIR)
.join(format!("{table_name}.csv"));
match fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(source) => Err(PersistenceError::Io {
operation: "delete",
path,
source,
}),
}
}
/// Append one record to `history.log`.
pub fn append_history(&self, command_text: &str) -> Result<(), PersistenceError> {
let path = self.project_path.join(HISTORY_LOG);
let line = history::format_record(command_text, history::utc_iso8601_now());
history::append(&path, &line)
}
}
/// Write `body` to `path` atomically via temp file + fsync +
/// rename. The temp file is named `<final>.tmp` in the same
/// directory so the rename stays on the same filesystem.
fn atomic_write(path: &Path, body: &[u8]) -> Result<(), PersistenceError> {
let tmp_path = path.with_extension(extension_with_tmp(path));
{
let mut tmp = fs::File::create(&tmp_path).map_err(|source| PersistenceError::Io {
operation: "create",
path: tmp_path.clone(),
source,
})?;
tmp.write_all(body).map_err(|source| PersistenceError::Io {
operation: "write",
path: tmp_path.clone(),
source,
})?;
tmp.sync_all().map_err(|source| PersistenceError::Io {
operation: "fsync",
path: tmp_path.clone(),
source,
})?;
}
fs::rename(&tmp_path, path).map_err(|source| PersistenceError::Io {
operation: "rename",
path: path.to_path_buf(),
source,
})?;
Ok(())
}
/// Build the `.tmp` extension for a path.
///
/// If the path already has an extension (`project.yaml`), the
/// tmp variant is `project.yaml.tmp`. If the path has no
/// extension, the extension becomes plain `tmp`.
fn extension_with_tmp(path: &Path) -> String {
path.extension().map_or_else(
|| "tmp".to_string(),
|ext| format!("{}.tmp", ext.to_string_lossy()),
)
}
#[cfg(test)]
mod tests {
use super::*;
fn tempdir() -> tempfile::TempDir {
tempfile::tempdir().expect("create tempdir")
}
#[test]
fn extension_with_tmp_appends_to_existing_extension() {
assert_eq!(extension_with_tmp(Path::new("a/b/project.yaml")), "yaml.tmp");
assert_eq!(extension_with_tmp(Path::new("a/b/Customers.csv")), "csv.tmp");
assert_eq!(extension_with_tmp(Path::new("a/b/lockfile")), "tmp");
}
#[test]
fn atomic_write_roundtrips() {
let dir = tempdir();
let target = dir.path().join("file.txt");
atomic_write(&target, b"hello\n").unwrap();
assert_eq!(fs::read_to_string(&target).unwrap(), "hello\n");
// Calling again replaces atomically — no .tmp left behind.
atomic_write(&target, b"world\n").unwrap();
assert_eq!(fs::read_to_string(&target).unwrap(), "world\n");
assert!(!target.with_extension("txt.tmp").exists());
}
#[test]
fn write_schema_writes_yaml() {
let dir = tempdir();
let p = Persistence::new(dir.path().to_path_buf());
let schema = SchemaSnapshot {
created_at: "2026-05-07T14:30:12Z".to_string(),
tables: vec![],
relationships: vec![],
};
p.write_schema(&schema).unwrap();
let body = fs::read_to_string(dir.path().join(PROJECT_YAML)).unwrap();
assert!(body.contains("version: 1"));
assert!(body.contains("created_at:"));
}
#[test]
fn write_and_delete_table_data() {
let dir = tempdir();
let p = Persistence::new(dir.path().to_path_buf());
let table = TableSnapshot {
name: "Customers".to_string(),
columns: vec![ColumnSchema {
name: "Name".to_string(),
user_type: Type::Text,
}],
rows: vec![vec![CellValue::Text("Alice".to_string())]],
};
p.write_table_data(&table).unwrap();
let csv_path = dir.path().join(DATA_DIR).join("Customers.csv");
assert!(csv_path.exists());
let body = fs::read_to_string(&csv_path).unwrap();
assert!(body.contains("Name"));
assert!(body.contains("Alice"));
p.delete_table_data("Customers").unwrap();
assert!(!csv_path.exists());
// Idempotent on a missing file.
p.delete_table_data("Customers").unwrap();
}
#[test]
fn append_history_creates_and_appends() {
let dir = tempdir();
let p = Persistence::new(dir.path().to_path_buf());
p.append_history("create table Foo with pk id:serial").unwrap();
p.append_history("insert into Foo (1)").unwrap();
let body = fs::read_to_string(dir.path().join(HISTORY_LOG)).unwrap();
let lines: Vec<&str> = body.trim_end().lines().collect();
assert_eq!(lines.len(), 2);
assert!(lines[0].ends_with("|ok|create table Foo with pk id:serial"));
assert!(lines[1].ends_with("|ok|insert into Foo (1)"));
}
}