Files
rdbms-playground/src/persistence/history.rs
T
claude@clouddev1 e4f2f5fa15 feat: ADR-0034 — history journal records err + replay parses/filters the journal
Replay (§3): run_replay parses <ts>|<status>|<source> journal records — runs ok, skips non-ok — while still accepting bare .commands scripts (prefix-detected so a | inside a bare command isn't misread). Fixes replay history.log, which died on line 1.

Journal failures (§1/§2): failed commands are recorded err via a new Action::JournalFailure, emitted by the pure-sync App for both parse failures and worker-execution failures (runtime appends best-effort, never fatal). Hydration reads all records so typo'd/rejected commands are recallable across sessions.

Amendment 1 — replay filters app-lifecycle commands: a working replay history.log exposed that the journal also records save as/load/new/export/import/rebuild/mode (which would panic the worker dispatch or abort replay). Replay now re-applies only schema/data writes and skips every app-lifecycle command + nested replay, classified by entry word so modal/incomplete forms (save as, bare mode) and quit skip uniformly rather than aborting. All skips continue (reversing the nested-replay refusal); import and nested replay warn. replay.error_nested removed; replay.skipped_import/_replay added; ReplayCompleted carries warnings. requirements.md U3/U4 updated; app-command runtime-failure journalling tracked as a follow-up.

1659 passing / 0 failing / 0 skipped / 1 ignored. Clippy clean.
2026-05-24 18:59:06 +00:00

440 lines
15 KiB
Rust

//! Append-only `history.log` writer (ADR-0015 §5).
//!
//! Format: one record per line, three pipe-separated fields:
//!
//! ```text
//! 2026-05-07T14:30:12Z|ok|create table Customers with pk id(serial)
//! ```
//!
//! Status is always `ok` in v1; failed commands are not
//! recorded. The status field is kept in the line shape so
//! future use cases can carry additional values without a
//! format break.
//!
//! Newlines inside the command (which do not yet appear, but
//! will when multi-line input I1 lands) are escaped to a
//! literal `\n`.
use std::fs::OpenOptions;
use std::io::Write as _;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use super::PersistenceError;
/// Journal-record status tokens (ADR-0034 §1). Kept as named
/// constants so the writer and the readers (hydration + replay)
/// cannot drift on the spelling.
pub(super) const STATUS_OK: &str = "ok";
pub(super) const STATUS_ERR: &str = "err";
/// Format a successful-command record. Pure; no I/O.
pub(super) fn format_record(command_text: &str, timestamp_iso: String) -> String {
format_record_with_status(command_text, timestamp_iso, STATUS_OK)
}
/// Format a record with an explicit status token (ADR-0034 §1).
/// Pure; no I/O.
pub(super) fn format_record_with_status(
command_text: &str,
timestamp_iso: String,
status: &str,
) -> String {
let escaped = escape_command(command_text);
format!("{timestamp_iso}|{status}|{escaped}\n")
}
/// Read the most-recent `max_n` user-issued command sources
/// from `history_log_path`, in chronological order
/// (oldest-first within the returned slice).
///
/// This is the I2-persist hydration helper (ADR-0015 §12):
/// on project open, the runtime seeds the in-memory navigable
/// history from this list so Up/Down recall picks up where
/// the user left off in the previous session.
///
/// Lines that do not match the `<ts>|<status>|<source>` shape
/// are silently skipped — they are likely corruption or a
/// future format extension; either way, refusing to seed at
/// all because of a single bad line would be a worse UX than
/// quietly rejoining the user's history.
///
/// A missing file returns an empty `Vec`; other IO errors
/// are surfaced via `PersistenceError` so the caller can
/// decide how to handle them. In practice the runtime treats
/// hydration failures as non-fatal — the user just gets an
/// empty history and a tracing warning.
pub(super) fn read_recent_sources(
history_log_path: &std::path::Path,
max_n: usize,
) -> Result<Vec<String>, super::PersistenceError> {
use std::io::ErrorKind;
let body = match std::fs::read_to_string(history_log_path) {
Ok(b) => b,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(source) => {
return Err(super::PersistenceError::Io {
operation: "read",
path: history_log_path.to_path_buf(),
source,
});
}
};
let mut sources: Vec<String> = body
.lines()
.filter_map(parse_record_source)
.collect();
if sources.len() > max_n {
let skip = sources.len() - max_n;
sources.drain(0..skip);
}
Ok(sources)
}
/// Parse one `<ts>|<status>|<source>` line and return the
/// unescaped source. Returns `None` for malformed lines.
fn parse_record_source(line: &str) -> Option<String> {
// Format: timestamp|status|source-with-pipes-allowed
// We split into at most 3 parts so a `|` inside source
// (which append() does NOT escape — pipes are valid SQL
// characters) is preserved.
let mut parts = line.splitn(3, '|');
let _ts = parts.next()?;
let _status = parts.next()?;
let source = parts.next()?;
Some(unescape_command(source))
}
/// A parsed journal record (ADR-0034 §3). `source` is already
/// unescaped.
pub(super) struct JournalRecord {
pub status_is_ok: bool,
pub source: String,
}
/// Classify `line` as a journal record or a bare command
/// (ADR-0034 §3). Returns `Some(JournalRecord)` only when the
/// line begins with a valid `<iso8601-timestamp>|<status>|`
/// prefix — so a bare command containing `|` (e.g.
/// `select 'a|b' from t`) is `None` (treated as bare by the
/// caller) because it does not start with a timestamp. A valid
/// timestamp prefix with a non-`ok` (or unrecognised) status is
/// still a journal record, reported with `status_is_ok = false`
/// so replay skips it rather than mis-running it as a command.
pub(super) fn parse_journal_record(line: &str) -> Option<JournalRecord> {
let mut parts = line.splitn(3, '|');
let ts = parts.next()?;
let status = parts.next()?;
let source = parts.next()?;
if !looks_like_iso8601(ts) {
return None;
}
Some(JournalRecord {
status_is_ok: status == STATUS_OK,
source: unescape_command(source),
})
}
/// True when `s` is exactly an `YYYY-MM-DDTHH:MM:SSZ` timestamp
/// — the shape `utc_iso8601_now` emits. Used to distinguish a
/// journal record's leading field from a bare command that
/// merely contains `|`.
fn looks_like_iso8601(s: &str) -> bool {
let b = s.as_bytes();
if b.len() != 20 {
return false;
}
let digit = |i: usize| b[i].is_ascii_digit();
digit(0) && digit(1) && digit(2) && digit(3) && b[4] == b'-'
&& digit(5) && digit(6) && b[7] == b'-'
&& digit(8) && digit(9) && b[10] == b'T'
&& digit(11) && digit(12) && b[13] == b':'
&& digit(14) && digit(15) && b[16] == b':'
&& digit(17) && digit(18) && b[19] == b'Z'
}
fn unescape_command(s: &str) -> String {
let mut out = String::with_capacity(s.len());
let mut chars = s.chars();
while let Some(c) = chars.next() {
if c != '\\' {
out.push(c);
continue;
}
match chars.next() {
Some('n') => out.push('\n'),
Some('r') => out.push('\r'),
Some('\\') => out.push('\\'),
// Preserve unknown escapes literally so a future
// extension to `escape_command` doesn't corrupt
// entries written before that extension.
Some(other) => {
out.push('\\');
out.push(other);
}
None => out.push('\\'),
}
}
out
}
/// Append `line` (which already ends in `\n`) to the file at
/// `path`. Creates the file if it doesn't exist. fsyncs after
/// the write so a power-cut doesn't lose the latest entry.
pub(super) fn append(path: &Path, line: &str) -> Result<(), PersistenceError> {
let mut f = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|source| PersistenceError::Io {
operation: "open",
path: path.to_path_buf(),
source,
})?;
f.write_all(line.as_bytes())
.map_err(|source| PersistenceError::Io {
operation: "write",
path: path.to_path_buf(),
source,
})?;
f.sync_all().map_err(|source| PersistenceError::Io {
operation: "fsync",
path: path.to_path_buf(),
source,
})?;
Ok(())
}
fn escape_command(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\\' => out.push_str("\\\\"),
_ => out.push(c),
}
}
out
}
/// Current UTC time as ISO-8601 with second precision and a
/// `Z` suffix.
pub(super) fn utc_iso8601_now() -> String {
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
iso8601_from_unix_secs(secs)
}
fn iso8601_from_unix_secs(secs: i64) -> String {
let day_secs = secs.rem_euclid(86_400);
let h = day_secs / 3600;
let m = (day_secs % 3600) / 60;
let s = day_secs % 60;
let (y, mo, d) = ymd_from_unix_secs(secs);
format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z")
}
const fn ymd_from_unix_secs(secs: i64) -> (u32, u32, u32) {
let days = secs.div_euclid(86_400);
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = (z - era * 146_097) as u64;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y as u32, m as u32, d as u32)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
#[test]
fn record_format() {
let line = format_record(
"create table Customers with pk id(serial)",
"2026-05-07T14:30:12Z".to_string(),
);
assert_eq!(
line,
"2026-05-07T14:30:12Z|ok|create table Customers with pk id(serial)\n",
);
}
#[test]
fn newlines_in_command_are_escaped() {
let line = format_record("foo\nbar", "T".to_string());
assert_eq!(line, "T|ok|foo\\nbar\n");
}
// ---- ADR-0034 §3 — journal-record detection for replay ----
#[test]
fn parse_journal_record_ok_extracts_unescaped_source() {
let rec = parse_journal_record(
"2026-05-24T10:00:00Z|ok|create table T with pk id(int)",
)
.expect("valid ok journal record");
assert!(rec.status_is_ok);
assert_eq!(rec.source, "create table T with pk id(int)");
}
#[test]
fn parse_journal_record_err_is_record_but_not_ok() {
// A valid timestamp + `err` status is a journal record (so
// replay treats it as skippable), reported `status_is_ok =
// false`.
let rec = parse_journal_record("2026-05-24T10:00:02Z|err|insert into T values (1)")
.expect("valid err journal record");
assert!(!rec.status_is_ok);
}
#[test]
fn parse_journal_record_unknown_status_is_record_but_not_ok() {
// A valid timestamp + an unrecognised status is still a
// journal record (ADR-0034 §1 "readers ignore values they
// do not recognise"); replay skips it rather than running it.
let rec = parse_journal_record("2026-05-24T10:00:02Z|frobnicate|whatever")
.expect("valid-ts record with unknown status");
assert!(!rec.status_is_ok);
}
#[test]
fn parse_journal_record_rejects_bare_command_with_pipe() {
// A bare command that merely contains `|` must NOT be read
// as a journal record — its first field is not a timestamp.
assert!(parse_journal_record("select 'a|b' from t").is_none());
assert!(parse_journal_record("show data Orders").is_none());
}
#[test]
fn parse_journal_record_rejects_timestamp_ish_but_invalid_prefix() {
// Boundary: looks vaguely date-y but isn't the exact
// `YYYY-MM-DDTHH:MM:SSZ` shape → bare command.
assert!(parse_journal_record("2026-5-24|ok|x").is_none());
assert!(parse_journal_record("2026-05-24 10:00:00|ok|x").is_none());
assert!(parse_journal_record("notatimestamp|ok|x").is_none());
}
#[test]
fn parse_journal_record_preserves_pipe_in_source() {
// `|` is not escaped by the writer (it's a valid SQL char);
// `splitn(3, '|')` keeps everything after the second `|`.
let rec = parse_journal_record("2026-05-24T10:00:00Z|ok|select 'a|b' from t")
.expect("ok record");
assert_eq!(rec.source, "select 'a|b' from t");
}
#[test]
fn parse_journal_record_round_trips_a_written_record() {
// What `format_record` writes, `parse_journal_record` reads
// back to the original command (escape→unescape lossless for
// the awkward cases).
let cmd = "update T set v = 'x\\y' where id = 1";
let line = format_record(cmd, "2026-05-24T10:00:00Z".to_string());
let rec = parse_journal_record(line.trim_end()).expect("round-trip record");
assert!(rec.status_is_ok);
assert_eq!(rec.source, cmd);
}
#[test]
fn backslash_is_escaped() {
let line = format_record("a\\b", "T".to_string());
assert_eq!(line, "T|ok|a\\\\b\n");
}
#[test]
fn iso8601_format_is_well_formed() {
let s = utc_iso8601_now();
// YYYY-MM-DDTHH:MM:SSZ has length 20.
assert_eq!(s.len(), 20);
assert!(s.ends_with('Z'));
assert_eq!(&s[4..5], "-");
assert_eq!(&s[10..11], "T");
}
#[test]
fn iso8601_known_seconds() {
assert_eq!(iso8601_from_unix_secs(0), "1970-01-01T00:00:00Z");
assert_eq!(iso8601_from_unix_secs(1_778_112_000), "2026-05-07T00:00:00Z");
}
#[test]
fn read_recent_sources_returns_empty_when_file_missing() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("nope.log");
let got = read_recent_sources(&path, 10).unwrap();
assert!(got.is_empty());
}
#[test]
fn read_recent_sources_unescapes_newlines_and_backslashes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("history.log");
let line1 = format_record("a\nb", "T1".to_string());
let line2 = format_record("c\\d", "T2".to_string());
std::fs::write(&path, format!("{line1}{line2}")).unwrap();
let got = read_recent_sources(&path, 10).unwrap();
assert_eq!(got, vec!["a\nb".to_string(), "c\\d".to_string()]);
}
#[test]
fn read_recent_sources_caps_at_max_n_keeping_most_recent() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("history.log");
let body: String = (0..10)
.map(|i| format_record(&format!("cmd{i}"), format!("T{i}")))
.collect();
std::fs::write(&path, body).unwrap();
let got = read_recent_sources(&path, 3).unwrap();
assert_eq!(got, vec!["cmd7".to_string(), "cmd8".to_string(), "cmd9".to_string()]);
}
#[test]
fn read_recent_sources_skips_malformed_lines() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("history.log");
// Two valid lines and one garbage line in the middle.
let body = format!(
"{}{}{}",
format_record("good1", "T1".to_string()),
"this is not a record\n",
format_record("good2", "T2".to_string()),
);
std::fs::write(&path, body).unwrap();
let got = read_recent_sources(&path, 10).unwrap();
assert_eq!(got, vec!["good1".to_string(), "good2".to_string()]);
}
#[test]
fn read_recent_sources_preserves_pipes_inside_source() {
// The append-side does NOT escape `|`, so pipes inside
// the source must round-trip through the parser. This
// is what splitn(3) on `|` is supposed to handle.
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("history.log");
std::fs::write(&path, "T1|ok|select 'a|b' from t\n").unwrap();
let got = read_recent_sources(&path, 10).unwrap();
assert_eq!(got, vec!["select 'a|b' from t".to_string()]);
}
#[test]
fn append_creates_and_grows_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("history.log");
append(&path, "first|ok|a\n").unwrap();
append(&path, "second|ok|b\n").unwrap();
let body = fs::read_to_string(&path).unwrap();
assert_eq!(body, "first|ok|a\nsecond|ok|b\n");
}
}