Iteration 3: existence-only load + rebuild from text on missing .db
When the runtime opens a project whose playground.db is missing,
it now rebuilds the database from project.yaml + data/<table>.csv
per ADR-0015 §7. The rebuild path:
1. Parses project.yaml (serde_yml). Unknown versions / types /
actions surface as PersistenceFatal.
2. Recreates each user table with FK constraints inline
(PRAGMA foreign_keys=OFF), then populates the column-type,
relationship, and project metadata tables.
3. Loads each table's CSV via a hand-rolled reader that
preserves the NULL-vs-empty distinction (the csv crate
doesn't expose whether a field was quoted; ours does).
4. Runs PRAGMA foreign_key_check before commit; any violation
aborts.
5. Restores foreign_keys=ON regardless of success.
Row-level failures get DbError::RebuildRowFailed with row
number, file, table, and a friendly per-type detail. They land
in the runtime as a fatal stderr message ("unable to load row N
from `data/T.csv` into table `T`: ...") before the alternate
screen is entered.
created_at from project.yaml overwrites the configure-time
placeholder so timestamps round-trip stably.
Tests: 307 passing (267 lib + 9 + 5 new + 9 + 17), 0 failing,
0 skipped. Clippy clean with nursery lints.
This commit is contained in:
@@ -39,8 +39,9 @@ use crate::dsl::types::Type;
|
||||
use crate::dsl::value::{Bound, Value, ValueError};
|
||||
use crate::persistence::{
|
||||
CellValue, ColumnSchema, Persistence, PersistenceError, RelationshipSchema, SchemaSnapshot,
|
||||
TableSchema, TableSnapshot,
|
||||
TableSchema, TableSnapshot, decode_cell, parse_csv, parse_schema,
|
||||
};
|
||||
use crate::project::{DATA_DIR, PROJECT_YAML};
|
||||
|
||||
/// Inbox capacity. The worker is fast enough that this rarely
|
||||
/// matters; `64` is a generous head-room for bursts.
|
||||
@@ -117,6 +118,16 @@ pub enum DbError {
|
||||
path: std::path::PathBuf,
|
||||
message: String,
|
||||
},
|
||||
#[error(
|
||||
"unable to load row {row_number} from `{}` into table `{table}`: {detail}",
|
||||
csv_path.display()
|
||||
)]
|
||||
RebuildRowFailed {
|
||||
table: String,
|
||||
csv_path: std::path::PathBuf,
|
||||
row_number: usize,
|
||||
detail: String,
|
||||
},
|
||||
#[error("database worker is no longer available")]
|
||||
WorkerGone,
|
||||
#[error("io error: {0}")]
|
||||
@@ -213,7 +224,7 @@ impl DbError {
|
||||
/// surfaces these as fatal banners.
|
||||
#[must_use]
|
||||
pub const fn is_fatal(&self) -> bool {
|
||||
matches!(self, Self::PersistenceFatal { .. })
|
||||
matches!(self, Self::PersistenceFatal { .. } | Self::RebuildRowFailed { .. })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,6 +320,14 @@ enum Request {
|
||||
source: Option<String>,
|
||||
reply: oneshot::Sender<Result<DataResult, DbError>>,
|
||||
},
|
||||
/// Rebuild the database from `project.yaml` + `data/`
|
||||
/// (ADR-0015 §7). Used by the runtime when the `.db` file
|
||||
/// is missing on project open. Iteration 4's `rebuild`
|
||||
/// app-level command will reuse the same request.
|
||||
RebuildFromText {
|
||||
project_path: std::path::PathBuf,
|
||||
reply: oneshot::Sender<Result<(), DbError>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Database {
|
||||
@@ -523,6 +542,23 @@ impl Database {
|
||||
recv.await.map_err(|_| DbError::WorkerGone)?
|
||||
}
|
||||
|
||||
/// Rebuild the database from `project.yaml` + `data/`
|
||||
/// (ADR-0015 §7). Called by the runtime on a missing `.db`
|
||||
/// at startup; Iteration 4 will also expose this via the
|
||||
/// `rebuild` app-level command.
|
||||
pub async fn rebuild_from_text(
|
||||
&self,
|
||||
project_path: std::path::PathBuf,
|
||||
) -> Result<(), DbError> {
|
||||
let (reply, recv) = oneshot::channel();
|
||||
self.send(Request::RebuildFromText {
|
||||
project_path,
|
||||
reply,
|
||||
})
|
||||
.await?;
|
||||
recv.await.map_err(|_| DbError::WorkerGone)?
|
||||
}
|
||||
|
||||
pub async fn query_data(
|
||||
&self,
|
||||
table: String,
|
||||
@@ -785,6 +821,9 @@ fn handle_request(conn: &Connection, persistence: Option<&Persistence>, req: Req
|
||||
&table,
|
||||
));
|
||||
}
|
||||
Request::RebuildFromText { project_path, reply } => {
|
||||
let _ = reply.send(do_rebuild_from_text(conn, &project_path));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2393,6 +2432,278 @@ fn read_relationships_inbound(
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Rebuild the database from `project.yaml` + `data/<table>.csv`
|
||||
/// (ADR-0015 §7).
|
||||
///
|
||||
/// The on-disk text is the authoritative source: this function
|
||||
/// recreates schema, metadata, and rows so the resulting `.db`
|
||||
/// reflects them exactly. Persistence callbacks are NOT invoked;
|
||||
/// we're loading, not changing user-visible state.
|
||||
///
|
||||
/// FK enforcement is disabled for the load and re-enabled at
|
||||
/// the end (regardless of success). A `foreign_key_check`
|
||||
/// before commit verifies the loaded data is consistent — any
|
||||
/// violation aborts with a fatal error.
|
||||
fn do_rebuild_from_text(conn: &Connection, project_path: &Path) -> Result<(), DbError> {
|
||||
let yaml_path = project_path.join(PROJECT_YAML);
|
||||
let data_dir = project_path.join(DATA_DIR);
|
||||
|
||||
let yaml_body =
|
||||
std::fs::read_to_string(&yaml_path).map_err(|e| DbError::PersistenceFatal {
|
||||
operation: "read",
|
||||
path: yaml_path.clone(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
let snapshot = parse_schema(&yaml_body).map_err(|e| DbError::PersistenceFatal {
|
||||
operation: "parse",
|
||||
path: yaml_path.clone(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
conn.execute_batch("PRAGMA foreign_keys = OFF;")
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
|
||||
let result = (|| -> Result<(), DbError> {
|
||||
let tx = conn
|
||||
.unchecked_transaction()
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
|
||||
// 1. Recreate user tables with FK constraints inline.
|
||||
for table in &snapshot.tables {
|
||||
let read_schema = build_read_schema(table, &snapshot.relationships);
|
||||
let ddl = schema_to_ddl(&table.name, &read_schema);
|
||||
tx.execute_batch(&ddl).map_err(DbError::from_rusqlite)?;
|
||||
}
|
||||
|
||||
// 2. Column-type metadata.
|
||||
{
|
||||
let mut stmt = tx
|
||||
.prepare(&format!(
|
||||
"INSERT INTO {META_TABLE} (table_name, column_name, user_type) \
|
||||
VALUES (?1, ?2, ?3);"
|
||||
))
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
for table in &snapshot.tables {
|
||||
for col in &table.columns {
|
||||
stmt.execute([
|
||||
table.name.as_str(),
|
||||
col.name.as_str(),
|
||||
col.user_type.keyword(),
|
||||
])
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Relationship metadata.
|
||||
{
|
||||
let mut stmt = tx
|
||||
.prepare(&format!(
|
||||
"INSERT INTO {REL_TABLE} \
|
||||
(name, parent_table, parent_column, child_table, child_column, \
|
||||
on_delete, on_update) \
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7);"
|
||||
))
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
for rel in &snapshot.relationships {
|
||||
stmt.execute([
|
||||
rel.name.as_str(),
|
||||
rel.parent_table.as_str(),
|
||||
rel.parent_column.as_str(),
|
||||
rel.child_table.as_str(),
|
||||
rel.child_column.as_str(),
|
||||
rel.on_delete.keyword(),
|
||||
rel.on_update.keyword(),
|
||||
])
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Project metadata: overwrite the configure-time
|
||||
// `created_at` with the YAML's authoritative value.
|
||||
tx.execute(
|
||||
&format!(
|
||||
"INSERT INTO {META_PROJECT_TABLE} (key, value) VALUES ('created_at', ?1) \
|
||||
ON CONFLICT(key) DO UPDATE SET value = excluded.value;"
|
||||
),
|
||||
[snapshot.created_at.as_str()],
|
||||
)
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
|
||||
// 5. Load each table's rows (if a CSV is present).
|
||||
for table in &snapshot.tables {
|
||||
let csv_path = data_dir.join(format!("{}.csv", table.name));
|
||||
if !csv_path.exists() {
|
||||
continue;
|
||||
}
|
||||
load_table_csv(&tx, table, &csv_path)?;
|
||||
}
|
||||
|
||||
// 6. Verify FK consistency before committing.
|
||||
{
|
||||
let mut check = tx
|
||||
.prepare("PRAGMA foreign_key_check;")
|
||||
.map_err(DbError::from_rusqlite)?;
|
||||
let mut rows = check.query([]).map_err(DbError::from_rusqlite)?;
|
||||
if rows.next().map_err(DbError::from_rusqlite)?.is_some() {
|
||||
return Err(DbError::PersistenceFatal {
|
||||
operation: "rebuild",
|
||||
path: yaml_path.clone(),
|
||||
message: "rebuilt data violates foreign-key constraints".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
tx.commit().map_err(DbError::from_rusqlite)?;
|
||||
Ok(())
|
||||
})();
|
||||
|
||||
let pragma_result = conn
|
||||
.execute_batch("PRAGMA foreign_keys = ON;")
|
||||
.map_err(DbError::from_rusqlite);
|
||||
result.and(pragma_result)
|
||||
}
|
||||
|
||||
/// Build a `ReadSchema` for `table` that includes any
|
||||
/// relationships from the snapshot in which `table` is the
|
||||
/// child. The output drives `schema_to_ddl` so the resulting
|
||||
/// CREATE TABLE has the FKs inline.
|
||||
fn build_read_schema(table: &TableSchema, relationships: &[RelationshipSchema]) -> ReadSchema {
|
||||
let columns: Vec<ReadColumn> = table
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| ReadColumn {
|
||||
name: c.name.clone(),
|
||||
sqlite_type: c.user_type.sqlite_strict_type().to_string(),
|
||||
notnull: false,
|
||||
primary_key: table.primary_key.contains(&c.name),
|
||||
user_type: Some(c.user_type),
|
||||
})
|
||||
.collect();
|
||||
let foreign_keys: Vec<ReadForeignKey> = relationships
|
||||
.iter()
|
||||
.filter(|r| r.child_table == table.name)
|
||||
.map(|r| ReadForeignKey {
|
||||
parent_table: r.parent_table.clone(),
|
||||
parent_column: r.parent_column.clone(),
|
||||
child_column: r.child_column.clone(),
|
||||
on_delete: r.on_delete,
|
||||
on_update: r.on_update,
|
||||
})
|
||||
.collect();
|
||||
ReadSchema {
|
||||
columns,
|
||||
primary_key: table.primary_key.clone(),
|
||||
foreign_keys,
|
||||
}
|
||||
}
|
||||
|
||||
/// Read `csv_path` and INSERT each row into `table.name`.
|
||||
/// Failures are wrapped in `DbError::RebuildRowFailed` with
|
||||
/// row number and table name per ADR-0015 §7.
|
||||
fn load_table_csv(
|
||||
tx: &rusqlite::Transaction<'_>,
|
||||
table: &TableSchema,
|
||||
csv_path: &Path,
|
||||
) -> Result<(), DbError> {
|
||||
let body = std::fs::read_to_string(csv_path).map_err(|e| DbError::PersistenceFatal {
|
||||
operation: "read",
|
||||
path: csv_path.to_path_buf(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
let parsed = parse_csv(&body).map_err(|e| DbError::PersistenceFatal {
|
||||
operation: "parse",
|
||||
path: csv_path.to_path_buf(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if parsed.rows.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Header sanity check: column names must match the YAML
|
||||
// schema's column order. A mismatch is a hand-edit hazard;
|
||||
// surfacing it as a fatal error is better than silently
|
||||
// mis-aligning columns.
|
||||
let expected: Vec<&str> = table.columns.iter().map(|c| c.name.as_str()).collect();
|
||||
let header_strs: Vec<&str> = parsed.header.iter().map(String::as_str).collect();
|
||||
if header_strs != expected {
|
||||
return Err(DbError::PersistenceFatal {
|
||||
operation: "validate",
|
||||
path: csv_path.to_path_buf(),
|
||||
message: format!(
|
||||
"CSV header {:?} does not match table columns {:?}",
|
||||
parsed.header, expected,
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let cols_csv = table
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| quote_ident(&c.name))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let placeholders = (1..=table.columns.len())
|
||||
.map(|i| format!("?{i}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let sql = format!(
|
||||
"INSERT INTO {ident} ({cols_csv}) VALUES ({placeholders});",
|
||||
ident = quote_ident(&table.name),
|
||||
);
|
||||
let mut stmt = tx.prepare(&sql).map_err(DbError::from_rusqlite)?;
|
||||
|
||||
for (idx, raw_row) in parsed.rows.iter().enumerate() {
|
||||
// Row number reported as a 1-based file line: header
|
||||
// is line 1, so the first data row is line 2.
|
||||
let row_number = idx + 2;
|
||||
if raw_row.len() != table.columns.len() {
|
||||
return Err(DbError::RebuildRowFailed {
|
||||
table: table.name.clone(),
|
||||
csv_path: csv_path.to_path_buf(),
|
||||
row_number,
|
||||
detail: format!(
|
||||
"row has {} field(s) but table has {} column(s)",
|
||||
raw_row.len(),
|
||||
table.columns.len(),
|
||||
),
|
||||
});
|
||||
}
|
||||
let mut params: Vec<rusqlite::types::Value> = Vec::with_capacity(raw_row.len());
|
||||
for (col, raw_cell) in table.columns.iter().zip(raw_row.iter()) {
|
||||
let cell = decode_cell(col.user_type, raw_cell).map_err(|detail| {
|
||||
DbError::RebuildRowFailed {
|
||||
table: table.name.clone(),
|
||||
csv_path: csv_path.to_path_buf(),
|
||||
row_number,
|
||||
detail: format!("column `{}`: {detail}", col.name),
|
||||
}
|
||||
})?;
|
||||
params.push(cell_value_to_sqlite(&cell));
|
||||
}
|
||||
stmt.execute(rusqlite::params_from_iter(params.iter()))
|
||||
.map_err(|e| DbError::RebuildRowFailed {
|
||||
table: table.name.clone(),
|
||||
csv_path: csv_path.to_path_buf(),
|
||||
row_number,
|
||||
detail: e.to_string(),
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cell_value_to_sqlite(cell: &CellValue) -> rusqlite::types::Value {
|
||||
use rusqlite::types::Value;
|
||||
match cell {
|
||||
CellValue::Null => Value::Null,
|
||||
CellValue::Integer(n) => Value::Integer(*n),
|
||||
CellValue::Real(f) => Value::Real(*f),
|
||||
CellValue::Text(s) => Value::Text(s.clone()),
|
||||
CellValue::Blob(b) => Value::Blob(b.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user