perf(seed): single-transaction multi-row insert path (ADR-0048 P1.3d)
do_seed inserted row-by-row through do_insert, re-writing the whole table CSV each time — O(N^2). Extract do_insert's row core into a new insert_one_row (bind + serial/shortid autofill + FK-enriched execute, no tx/persist), shared by: - do_insert: one row in its own transaction (behaviour unchanged). - do_seed: all rows in ONE transaction, with a single finalize_persistence before the single commit — O(N), preserving ADR-0015 §6 commit-db-last. A mid-batch failure now rolls the whole seed back atomically; the capped preview is read back by rowid. A near-max 10000-row seed drops from ~tens of seconds to well under one. do_insert behaviour unchanged (whole suite green: 2346 pass / 0 fail / 0 skip, clippy clean); seed's existing tests exercise the batch path.
This commit is contained in:
@@ -8999,18 +8999,22 @@ fn do_seed(
|
|||||||
const MAX_ATTEMPTS: u32 = 200;
|
const MAX_ATTEMPTS: u32 = 200;
|
||||||
|
|
||||||
let mut rng = seed::make_rng(rng_seed);
|
let mut rng = seed::make_rng(rng_seed);
|
||||||
let mut preview = DataResult {
|
let mut preview_rowids: Vec<i64> = Vec::new();
|
||||||
table_name: table.to_string(),
|
|
||||||
columns: Vec::new(),
|
|
||||||
column_types: Vec::new(),
|
|
||||||
rows: Vec::new(),
|
|
||||||
};
|
|
||||||
let mut accepted: u64 = 0;
|
let mut accepted: u64 = 0;
|
||||||
let mut capped = false;
|
let mut capped = false;
|
||||||
|
|
||||||
|
// All rows insert in a single transaction; persistence (the CSV and
|
||||||
|
// the one history line) is written once, before the single commit —
|
||||||
|
// preserving ADR-0015 §6 commit-db-last while staying O(N) instead
|
||||||
|
// of the O(N^2) of per-row CSV rewrites. A mid-batch failure rolls
|
||||||
|
// the whole seed back (atomic).
|
||||||
|
let tx = conn
|
||||||
|
.unchecked_transaction()
|
||||||
|
.map_err(DbError::from_rusqlite)?;
|
||||||
|
|
||||||
while accepted < n {
|
while accepted < n {
|
||||||
let mut attempt = 0u32;
|
let mut attempt = 0u32;
|
||||||
let inserted = loop {
|
let rowid = loop {
|
||||||
// One sampled parent row per FK for this attempt, so a
|
// One sampled parent row per FK for this attempt, so a
|
||||||
// compound FK's children stay consistent.
|
// compound FK's children stay consistent.
|
||||||
let fk_choice: Vec<usize> = fk_samples
|
let fk_choice: Vec<usize> = fk_samples
|
||||||
@@ -9056,28 +9060,16 @@ fn do_seed(
|
|||||||
for (gi, k) in keys.into_iter().enumerate() {
|
for (gi, k) in keys.into_iter().enumerate() {
|
||||||
used[gi].insert(k);
|
used[gi].insert(k);
|
||||||
}
|
}
|
||||||
// Only the first inserted row carries the `source`, so the
|
let (_rows, rowid) =
|
||||||
// whole seed writes exactly one `history.log` line.
|
insert_one_row(conn, table, &schema, Some(&col_names), &values)?;
|
||||||
let row_source = if accepted == 0 { source } else { None };
|
break Some(rowid);
|
||||||
break Some(do_insert(
|
|
||||||
conn,
|
|
||||||
persistence,
|
|
||||||
row_source,
|
|
||||||
table,
|
|
||||||
Some(&col_names),
|
|
||||||
&values,
|
|
||||||
)?);
|
|
||||||
};
|
};
|
||||||
match inserted {
|
match rowid {
|
||||||
Some(result) => {
|
Some(rowid) => {
|
||||||
// Accumulate the capped preview (D18).
|
// Keep the first `SEED_PREVIEW_CAP` rowids for the
|
||||||
if preview.columns.is_empty() {
|
// capped auto-show (D18).
|
||||||
preview.columns = result.data.columns;
|
if preview_rowids.len() < SEED_PREVIEW_CAP {
|
||||||
preview.column_types = result.data.column_types;
|
preview_rowids.push(rowid);
|
||||||
}
|
|
||||||
if preview.rows.len() < SEED_PREVIEW_CAP {
|
|
||||||
preview.rows.extend(result.data.rows);
|
|
||||||
preview.rows.truncate(SEED_PREVIEW_CAP);
|
|
||||||
}
|
}
|
||||||
accepted += 1;
|
accepted += 1;
|
||||||
}
|
}
|
||||||
@@ -9094,43 +9086,66 @@ fn do_seed(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Persist once (CSV + the single history line), then commit (db-last).
|
||||||
|
let changes = Changes {
|
||||||
|
schema_dirty: false,
|
||||||
|
rewritten_tables: vec![table.to_string()],
|
||||||
|
..Changes::default()
|
||||||
|
};
|
||||||
|
finalize_persistence(conn, persistence, source, &changes)?;
|
||||||
|
tx.commit().map_err(DbError::from_rusqlite)?;
|
||||||
|
|
||||||
|
let data = if preview_rowids.is_empty() {
|
||||||
|
DataResult {
|
||||||
|
table_name: table.to_string(),
|
||||||
|
columns: Vec::new(),
|
||||||
|
column_types: Vec::new(),
|
||||||
|
rows: Vec::new(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
query_rows_by_rowid(conn, table, &preview_rowids)?
|
||||||
|
};
|
||||||
|
|
||||||
Ok(SeedResult {
|
Ok(SeedResult {
|
||||||
table: table.to_string(),
|
table: table.to_string(),
|
||||||
requested: n,
|
requested: n,
|
||||||
produced: accepted,
|
produced: accepted,
|
||||||
data: preview,
|
data,
|
||||||
advisory_columns,
|
advisory_columns,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn do_insert(
|
/// Build and execute a single-row `INSERT` — column resolution, value
|
||||||
|
/// binding, `serial`/`shortid` autofill, and the FK-enriched execute —
|
||||||
|
/// returning `(rows_affected, new rowid)`.
|
||||||
|
///
|
||||||
|
/// It does **not** manage the transaction or persistence: the caller
|
||||||
|
/// owns those. This lets `do_insert` run one row in its own
|
||||||
|
/// transaction while `do_seed` runs N rows in a single transaction and
|
||||||
|
/// persists once (preserving ADR-0015 §6 commit-db-last while staying
|
||||||
|
/// O(N)). **The caller must hold an open transaction.** `table` must
|
||||||
|
/// already be canonical and `schema` already read.
|
||||||
|
fn insert_one_row(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
persistence: Option<&Persistence>,
|
|
||||||
source: Option<&str>,
|
|
||||||
table: &str,
|
table: &str,
|
||||||
|
schema: &ReadSchema,
|
||||||
user_columns: Option<&[String]>,
|
user_columns: Option<&[String]>,
|
||||||
user_values: &[Value],
|
user_values: &[Value],
|
||||||
) -> Result<InsertResult, DbError> {
|
) -> Result<(usize, i64), DbError> {
|
||||||
debug!(table = %table, "insert");
|
// Resolve which columns the user is providing values for. The short
|
||||||
let canonical_table = require_canonical_table(conn, table)?;
|
// form (None) is every non-auto-generated column in schema
|
||||||
let table = canonical_table.as_str();
|
// declaration order; serial and shortid get auto-filled below.
|
||||||
let schema = read_schema(conn, table)?;
|
let user_cols: Vec<String> = user_columns.map_or_else(
|
||||||
|
|| {
|
||||||
// Resolve which columns the user is providing values for.
|
|
||||||
let user_cols: Vec<String> = match user_columns {
|
|
||||||
Some(cols) => cols.to_vec(),
|
|
||||||
None => {
|
|
||||||
// Short form: every non-auto-generated column in
|
|
||||||
// schema declaration order. Serial and shortid both
|
|
||||||
// get auto-filled below.
|
|
||||||
schema
|
schema
|
||||||
.columns
|
.columns
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|c| !matches!(c.user_type, Some(Type::Serial) | Some(Type::ShortId)))
|
.filter(|c| !matches!(c.user_type, Some(Type::Serial) | Some(Type::ShortId)))
|
||||||
.map(|c| c.name.clone())
|
.map(|c| c.name.clone())
|
||||||
.collect()
|
.collect()
|
||||||
}
|
},
|
||||||
};
|
<[String]>::to_vec,
|
||||||
|
);
|
||||||
|
|
||||||
if user_cols.len() != user_values.len() {
|
if user_cols.len() != user_values.len() {
|
||||||
return Err(DbError::InvalidValue(format!(
|
return Err(DbError::InvalidValue(format!(
|
||||||
@@ -9143,7 +9158,7 @@ fn do_insert(
|
|||||||
|
|
||||||
let mut bindings: Vec<(String, Bound)> = Vec::with_capacity(user_cols.len());
|
let mut bindings: Vec<(String, Bound)> = Vec::with_capacity(user_cols.len());
|
||||||
for (col_name, value) in user_cols.iter().zip(user_values.iter()) {
|
for (col_name, value) in user_cols.iter().zip(user_values.iter()) {
|
||||||
let bound = impl_value_for(&schema, col_name, value)?;
|
let bound = impl_value_for(schema, col_name, value)?;
|
||||||
bindings.push((col_name.clone(), bound));
|
bindings.push((col_name.clone(), bound));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -9214,11 +9229,28 @@ fn do_insert(
|
|||||||
debug!(sql = %sql, "insert");
|
debug!(sql = %sql, "insert");
|
||||||
let params: Vec<rusqlite::types::Value> =
|
let params: Vec<rusqlite::types::Value> =
|
||||||
bindings.iter().map(|(_, b)| bound_to_sqlite_value(b)).collect();
|
bindings.iter().map(|(_, b)| bound_to_sqlite_value(b)).collect();
|
||||||
|
let rows_affected = execute_with_fk_enrichment(conn, table, &sql, ¶ms)?;
|
||||||
|
let new_rowid = conn.last_insert_rowid();
|
||||||
|
Ok((rows_affected, new_rowid))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_insert(
|
||||||
|
conn: &Connection,
|
||||||
|
persistence: Option<&Persistence>,
|
||||||
|
source: Option<&str>,
|
||||||
|
table: &str,
|
||||||
|
user_columns: Option<&[String]>,
|
||||||
|
user_values: &[Value],
|
||||||
|
) -> Result<InsertResult, DbError> {
|
||||||
|
debug!(table = %table, "insert");
|
||||||
|
let canonical_table = require_canonical_table(conn, table)?;
|
||||||
|
let table = canonical_table.as_str();
|
||||||
|
let schema = read_schema(conn, table)?;
|
||||||
let tx = conn
|
let tx = conn
|
||||||
.unchecked_transaction()
|
.unchecked_transaction()
|
||||||
.map_err(DbError::from_rusqlite)?;
|
.map_err(DbError::from_rusqlite)?;
|
||||||
let rows_affected = execute_with_fk_enrichment(conn, table, &sql, ¶ms)?;
|
let (rows_affected, new_rowid) =
|
||||||
let new_rowid = conn.last_insert_rowid();
|
insert_one_row(conn, table, &schema, user_columns, user_values)?;
|
||||||
let data = query_rows_by_rowid(conn, table, &[new_rowid])?;
|
let data = query_rows_by_rowid(conn, table, &[new_rowid])?;
|
||||||
let changes = Changes {
|
let changes = Changes {
|
||||||
schema_dirty: false,
|
schema_dirty: false,
|
||||||
@@ -9227,10 +9259,7 @@ fn do_insert(
|
|||||||
};
|
};
|
||||||
finalize_persistence(conn, persistence, source, &changes)?;
|
finalize_persistence(conn, persistence, source, &changes)?;
|
||||||
tx.commit().map_err(DbError::from_rusqlite)?;
|
tx.commit().map_err(DbError::from_rusqlite)?;
|
||||||
Ok(InsertResult {
|
Ok(InsertResult { rows_affected, data })
|
||||||
rows_affected,
|
|
||||||
data,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build the parameterised `UPDATE … SET … WHERE …` statement.
|
/// Build the parameterised `UPDATE … SET … WHERE …` statement.
|
||||||
|
|||||||
Reference in New Issue
Block a user