Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
## 2026-04-08 - [Performance: Defer Allocation during Traversal]
**Learning:** During DAG traversals, creating owned variants of identifiers (like `file.to_path_buf()`) *before* checking `visited` HashSets results in heap allocations (O(E)) for every edge instead of every visited node (O(V)). By moving the `&PathBuf` allocation strictly *after* all HashSet `contains` checks using the borrowed reference (`&Path`), we drastically reduce memory churn.
**Action:** Always check `HashSet::contains` with a borrowed reference *before* creating the owned version required by `HashSet::insert`, especially in performance-critical graph traversal paths.

## 2026-04-10 - [Dynamic SQL Generation Allocation Overhead]
**Learning:** For dynamic SQL generation (e.g., in Cloudflare D1 targets), using intermediate `Vec` allocations and `format!` or `join()` causes unnecessary O(n) heap allocations and memory churn.
**Action:** Always construct queries directly using `String::with_capacity` and the `write!` macro (via `std::fmt::Write`) to minimize memory allocation overhead and improve latency.
Comment on lines +6 to +8
94 changes: 64 additions & 30 deletions crates/flow/src/targets/d1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,40 +300,67 @@ impl D1ExportContext {
key: &KeyValue,
values: &FieldValues,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut columns = vec![];
let mut placeholders = vec![];
let mut params = vec![];
let mut update_clauses = vec![];
use std::fmt::Write;

Comment on lines +303 to +304
// ⚡ Bolt: Optimize SQL generation by pre-allocating string capacity and avoiding
// intermediate `Vec<String>` allocations for columns, placeholders, and updates.
let mut sql = String::with_capacity(
128 + self.key_fields_schema.len() * 16 + self.value_fields_schema.len() * 32,
);
let mut params =
Vec::with_capacity(self.key_fields_schema.len() + self.value_fields_schema.len());
Comment on lines +305 to +311

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Revisit the String::with_capacity sizing heuristics for clarity and maintainability.

The preallocation constants (128 + keys * 16 + values * 32) are opaque and tightly coupled to current assumptions about identifier and SQL boilerplate length, which may drift over time.

Consider either:

  • documenting how you derived these constants, or
  • simplifying to a more generic, field-count-based estimate and relying on reallocation.

This would keep the optimization while making it less fragile to future SQL changes.

Suggested change
// ⚡ Bolt: Optimize SQL generation by pre-allocating string capacity and avoiding
// intermediate `Vec<String>` allocations for columns, placeholders, and updates.
let mut sql = String::with_capacity(
128 + self.key_fields_schema.len() * 16 + self.value_fields_schema.len() * 32,
);
let mut params =
Vec::with_capacity(self.key_fields_schema.len() + self.value_fields_schema.len());
// ⚡ Bolt: Optimize SQL generation by pre-allocating string capacity and avoiding
// intermediate `Vec<String>` allocations for columns, placeholders, and updates.
//
// Heuristic: use a simple, field-count-based estimate instead of tightly coupling the
// capacity to specific SQL layout details. The factor (~32 bytes per field) is meant
// to comfortably cover typical identifier, punctuation, and placeholder overhead.
// Occasional reallocations are acceptable if this estimate is low.
let num_key_fields = self.key_fields_schema.len();
let num_value_fields = self.value_fields_schema.len();
let estimated_sql_len = 64 + (num_key_fields + num_value_fields) * 32;
let mut sql = String::with_capacity(estimated_sql_len);
let mut params = Vec::with_capacity(num_key_fields + num_value_fields);


write!(sql, "INSERT INTO {} (", self.table_name)
.map_err(|e| RecocoError::internal_msg(e.to_string()))?;

let mut first = true;
let mut num_cols = 0;

// Extract key parts - KeyValue is a wrapper around Box<[KeyPart]>
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
for (idx, key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
columns.push(self.key_fields_schema[idx].name.clone());
placeholders.push("?".to_string());
if !first {
write!(sql, ", ").unwrap();
}
first = false;
write!(sql, "{}", key_field.name).unwrap();
params.push(key_part_to_json(key_part)?);
num_cols += 1;
}
}

// Add value fields
for (idx, value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
columns.push(value_field.name.clone());
placeholders.push("?".to_string());
if !first {
write!(sql, ", ").unwrap();
}
first = false;
write!(sql, "{}", value_field.name).unwrap();
params.push(value_to_json(value)?);
update_clauses.push(format!(
"{} = excluded.{}",
value_field.name, value_field.name
));
num_cols += 1;
}
}

let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
self.table_name,
columns.join(", "),
placeholders.join(", "),
update_clauses.join(", ")
);
write!(sql, ") VALUES (").unwrap();
for i in 0..num_cols {
if i > 0 {
write!(sql, ", ").unwrap();
}
write!(sql, "?").unwrap();
}

write!(sql, ") ON CONFLICT DO UPDATE SET ").unwrap();
let mut first_update = true;
for (idx, _value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
if !first_update {
write!(sql, ", ").unwrap();
}
first_update = false;
write!(sql, "{0} = excluded.{0}", value_field.name).unwrap();
}
}

Ok((sql, params))
}
Expand All @@ -342,22 +369,29 @@ impl D1ExportContext {
&self,
key: &KeyValue,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut where_clauses = vec![];
let mut params = vec![];
use std::fmt::Write;

Comment on lines +372 to +373
// ⚡ Bolt: Optimize SQL generation by pre-allocating string capacity and avoiding
// intermediate `Vec<String>` allocations for the WHERE clauses.
let mut sql = String::with_capacity(64 + self.key_fields_schema.len() * 16);
let mut params = Vec::with_capacity(self.key_fields_schema.len());

for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
write!(sql, "DELETE FROM {} WHERE ", self.table_name)
.map_err(|e| RecocoError::internal_msg(e.to_string()))?;

let mut first = true;

for (idx, key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
if !first {
write!(sql, " AND ").unwrap();
}
Comment on lines +379 to +388

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Use consistent error propagation instead of unwrap() in the DELETE SQL builder.

After the first write! maps errors into RecocoError, the later write! calls use unwrap(), which can panic. Instead, either propagate fmt::Error for all write! calls using the same map_err pattern, or add a small helper around write! that does the conversion. This keeps error handling aligned with the function’s return type and avoids panics.

Suggested implementation:

                if !first {
                    write!(sql, " AND ")
                        .map_err(|e| RecocoError::internal_msg(e.to_string()))?;
                }
                first = false;
                write!(sql, "{} = ?", key_field.name)
                    .map_err(|e| RecocoError::internal_msg(e.to_string()))?;
                params.push(key_part_to_json(key_part)?);
            }
                first = false;
                write!(sql, "{}", key_field.name)
                    .map_err(|e| RecocoError::internal_msg(e.to_string()))?;
                params.push(key_part_to_json(key_part)?);
                num_cols += 1;

If there are any other write!(...) calls in this file that currently use .unwrap(), they should be updated in the same way to keep error handling consistent:

write!(sql, "...", args...).map_err(|e| RecocoError::internal_msg(e.to_string()))?;

This ensures all formatting errors are converted into RecocoError and propagated instead of causing panics.

first = false;
write!(sql, "{} = ?", key_field.name).unwrap();
params.push(key_part_to_json(key_part)?);
}
}

let sql = format!(
"DELETE FROM {} WHERE {}",
self.table_name,
where_clauses.join(" AND ")
);

Ok((sql, params))
}

Expand Down