feat(proxy): per-credential and global concurrency caps (#210)#211
feat(proxy): per-credential and global concurrency caps (#210)#211hashedone wants to merge 4 commits into
Conversation
Add migration 025 with `max_concurrent INTEGER NOT NULL DEFAULT 8` (CHECK 1..=256) on user_anthropic_keys. Extend UserAnthropicKeyRepo: * upsert() now takes Optional<i32> cap; COALESCE preserves existing value on update, falls back to DB default 8 on insert. * get_credential() returns the cap alongside ciphertext+nonce so the proxy hot path stays one DB round-trip. * status() (renames the old configured_at()) returns the cap to the GET /me/anthropic-key endpoint, which now exposes it in the JSON response. * PUT /me/anthropic-key accepts an optional max_concurrent with client-side bounds matching the DB CHECK. Repo-layer tests cover: default-applied-on-insert, explicit-set, preserve-on-rotate-without-new-cap, override-on-rotate-with-new-cap. Existing tests updated to pass the new None for the optional cap. Part of #210.
Add two semaphores to AppState and wire them into the proxy handler:
* `proxy_per_credential_semaphores` — `DashMap<Uuid, Arc<Semaphore>>`
keyed by user_anthropic_keys.user_id. Lazily created on first request
per credential, sized to the row's max_concurrent at that moment.
Update semantics are intentionally lazy: a PUT changes the DB row but
not the in-memory semaphore until restart, avoiding atomic-swap edge
cases on cap changes.
* `proxy_global_semaphore` — `Option<Arc<Semaphore>>` from the new
`PROXY_MAX_GLOBAL_CONCURRENT` env var. Unset = unlimited (the right
default for current single-team deployments). Operator turns it on
after capacity testing.
Acquisition order is global → per-credential so the per-credential
permit drops first (RAII), keeping the global cap free for other users
the moment a credential finishes.
Permits ride with the response body via a tiny PermitHoldingStream
wrapper. This is structurally critical for SSE: if permits dropped on
handler return rather than stream completion, the cap would only bound
'time to first byte' instead of 'time to last byte' — agents streaming
long generations would not be counted against the cap they're really
using.
Rejection paths return Anthropic-shaped 429 with `overloaded_error`,
matching what real Anthropic returns under load. Per-credential message
names the cap ("cap: N") so users can debug from their /me/proxy/ UI.
Telemetry: structured warn-level log on every reject with reason,
cap_value, user_id, path.
Adds dashmap = "6" and futures-util = "0.3" (already transitive) as
direct deps.
Three integration tests:
* proxy_rejects_when_per_credential_cap_exceeded — 3rd in-flight gets
429, message names cap.
* proxy_frees_permit_when_request_completes — sequential request
succeeds after first completes (proves permits release).
* proxy_rejects_when_global_cap_exceeded — second user blocked by
global cap even with own per-credential budget free.
Closes #210 (backend half).
Add a numeric input next to the Anthropic key field. Saved together with the key in one PUT, defaulting to 8 (the DB default) for fresh keys, pre-filled with the current value on rotation. Client-side bounds (1..=256) match the DB CHECK constraint so the user sees a clear inline error instead of a 400 from the API. Shows the current cap alongside the 'Configured / last set' line so users can see what they've stored at a glance, and a help note explaining that changes apply on next server restart (per the lazy in-memory update semantics). Closes #210 (frontend half).
| /// decrypt the key. Returns `(plaintext, max_concurrent)` on success or an | ||
| /// Anthropic-shaped error envelope on any failure (no key configured, no | ||
| /// master key on this server, ciphertext corrupted, DB error). | ||
| async fn load_credential(state: &AppState, user_id: Uuid) -> Result<(String, i32), Response> { |
There was a problem hiding this comment.
The match over UserAnthropicKeyRepo::get_credential introduces nesting around error/none cases; prefer explicit guard-style early returns for Ok(None)/Err to flatten the happy path and improve readability.
Details
✨ AI Reasoning
The new load_credential() code performs a DB fetch then handles Ok(Some), Ok(None), and Err cases in a single match. Although error cases already return, the match wraps the main-path credential extraction. Rewriting with guard-style checks (e.g., early-return on Ok(None) and Err before continuing to the happy path) would reduce the visual nesting and make the successful-path logic stand out more clearly.
🔧 How do I fix it?
Place parameter validation and guard clauses at the function start. Use early returns to reduce nesting levels and improve readability.
Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info
| // we ship to today. Operators turn this on after capacity testing; a | ||
| // sensible starting value is 256. | ||
| let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> = | ||
| match std::env::var("PROXY_MAX_GLOBAL_CONCURRENT") { |
There was a problem hiding this comment.
The nested match used to parse and validate PROXY_MAX_GLOBAL_CONCURRENT is dense; consider simplifying the parsing/validation into a small helper to reduce nesting and aid readability.
Details
✨ AI Reasoning
A match expression is nested inside another match to parse and validate an environment variable and log on failure; this increases the mental steps to follow the success and multiple failure branches all at the declaration site. The code tries to parse PROXY_MAX_GLOBAL_CONCURRENT, logs on parse failure, and returns different Option states depending on nested matches — it's a multi-branch decision tree introduced in this change.
🔧 How do I fix it?
Break long lines to enhance clarity. Aim for a maximum of 80-120 characters per line, depending on the context and coding standards.
Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info
| // max_concurrent is NULL when the caller didn't specify one and | ||
| // the DB default kicks in for the column. | ||
| sqlx::query( | ||
| "INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce) | ||
| VALUES ($1, $2, $3) | ||
| "INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce, max_concurrent) | ||
| VALUES ($1, $2, $3, COALESCE($4, 8)) |
There was a problem hiding this comment.
Comment says DB default applies on insert, but VALUES (..., COALESCE($4, 8)) always supplies 8 directly. The documented control path is impossible under this query.
| // max_concurrent is NULL when the caller didn't specify one and | |
| // the DB default kicks in for the column. | |
| sqlx::query( | |
| "INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce) | |
| VALUES ($1, $2, $3) | |
| "INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce, max_concurrent) | |
| VALUES ($1, $2, $3, COALESCE($4, 8)) | |
| // max_concurrent is NULL when the caller didn't specify one and | |
| VALUES ($1, $2, $3, $4) |
Details
✨ AI Reasoning
The code intends to preserve existing caps when omitted and use a default on inserts. However, the inserted value is always computed in SQL with a fallback literal, so the described fallback mechanism is not what executes. This creates a direct contradiction between behavior and explanatory text, which can mislead future maintenance and debugging around cap semantics.
Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info
| // Optional global concurrency cap across all proxy requests. Unset = no | ||
| // global limit; this is the right default for the small-team deployments | ||
| // we ship to today. Operators turn this on after capacity testing; a | ||
| // sensible starting value is 256. | ||
| let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> = | ||
| match std::env::var("PROXY_MAX_GLOBAL_CONCURRENT") { | ||
| Ok(s) => match s.parse::<usize>() { | ||
| Ok(n) if n > 0 => { | ||
| tracing::info!(cap = n, "proxy global concurrency cap enabled"); | ||
| Some(std::sync::Arc::new(tokio::sync::Semaphore::new(n))) | ||
| } | ||
| _ => { | ||
| tracing::warn!( | ||
| value = %s, | ||
| "PROXY_MAX_GLOBAL_CONCURRENT is set but not a positive integer; ignoring" | ||
| ); | ||
| None | ||
| } | ||
| }, | ||
| Err(_) => None, | ||
| }; |
There was a problem hiding this comment.
Nested match expressions parse PROXY_MAX_GLOBAL_CONCURRENT; prefer early-return/guard-style parsing to flatten control flow (e.g., if let Ok(s) = env::var(...) { if let Ok(n) = s.parse::() { ... } } or return/continue on parse failure).
Show fix
| // Optional global concurrency cap across all proxy requests. Unset = no | |
| // global limit; this is the right default for the small-team deployments | |
| // we ship to today. Operators turn this on after capacity testing; a | |
| // sensible starting value is 256. | |
| let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> = | |
| match std::env::var("PROXY_MAX_GLOBAL_CONCURRENT") { | |
| Ok(s) => match s.parse::<usize>() { | |
| Ok(n) if n > 0 => { | |
| tracing::info!(cap = n, "proxy global concurrency cap enabled"); | |
| Some(std::sync::Arc::new(tokio::sync::Semaphore::new(n))) | |
| } | |
| _ => { | |
| tracing::warn!( | |
| value = %s, | |
| "PROXY_MAX_GLOBAL_CONCURRENT is set but not a positive integer; ignoring" | |
| ); | |
| None | |
| } | |
| }, | |
| Err(_) => None, | |
| }; | |
| // Optional global concurrency cap across all proxy requests. Unset = no | |
| let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> = (|| { | |
| let s = std::env::var("PROXY_MAX_GLOBAL_CONCURRENT").ok()?; | |
| let n = s.parse::<usize>().ok()?; | |
| if n == 0 { | |
| tracing::warn!( | |
| value = %s, | |
| "PROXY_MAX_GLOBAL_CONCURRENT is set but not a positive integer; ignoring" | |
| ); | |
| return None; | |
| } | |
| tracing::info!(cap = n, "proxy global concurrency cap enabled"); | |
| Some(std::sync::Arc::new(tokio::sync::Semaphore::new(n))) | |
| })(); |
Details
✨ AI Reasoning
The added code in main() parses an optional environment variable and uses a match nesting (match std::env::var { Ok(s) => match s.parse() { ... } Err(_) => None }). This is input validation and initialization logic that is currently nested two levels. Converting the inner branches to be handled with early returns/guards (or using if-let chains) would flatten the control flow and make the initialization intent clearer, reducing visual indentation and easing future edits.
Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info
Previously the PUT /me/anthropic-key endpoint required a new key on every request, which made 'just bump the concurrency cap' awkward — rotating the key just to nudge max_concurrent felt wrong, and the user couldn't do it from the UI at all without saving a new key. Backend: * PUT now accepts an optional `key` field. At least one of `key`/`max_concurrent` must be present; both-absent returns 400. * Settings-only PUT (cap without key) requires an existing row — we refuse to insert a half-row with cap but no ciphertext. * New `UserAnthropicKeyRepo::update_max_concurrent` does the cap-only UPDATE without touching key_encrypted / key_nonce. * On any successful PUT, drop the in-memory per-credential semaphore from the AppState DashMap. The next proxy request rebuilds it against the freshly-persisted cap. In-flight requests keep their permits on the now-orphaned Arc<Semaphore> until they finish — effective cap is briefly (old + new) but only for the duration of the in-flight responses, which is the natural quiet point for the change to take effect. Frontend: * Field label: 'Rotate key (optional)' when configured, 'Set key' otherwise. * Button label: 'Update' / 'Save' instead of 'Replace'. * Submit gated on a `hasUnsavedChange` derived: typed key OR cap changed from stored value. * Body only includes `key` when the user typed one — preserves the ciphertext on cap-only updates. * Help-text updated: 'New value applies on the next proxy request' (was 'on the next server restart' — outdated by this change). Three new integration tests: * me_anthropic_key_put_updates_cap_only — proves ciphertext preserved, cap updated, semaphore dropped. * me_anthropic_key_put_rejects_cap_only_when_unconfigured — 400 when no row exists. * me_anthropic_key_put_rejects_empty_body — 400 on neither-present. Part of #210.
| /// True when the user has either typed a new key or changed the cap | ||
| /// away from what's stored. The submit button is gated on this — it | ||
| /// prevents the user from submitting a no-op request. | ||
| const hasUnsavedChange = $derived.by(() => { |
There was a problem hiding this comment.
Anonymous arrow function passed to $derived.by is multi-step and non-trivial; extract into a named function to improve readability and testability (e.g., computeHasUnsavedChange()).
Details
✨ AI Reasoning
An anonymous arrow callback with several distinct steps was added as the argument to a reactive derived helper. It computes keyTyped, capChanged, then applies conditional logic depending on status.configured. This mixes multiple related but separate decisions in an unnamed lambda, which reduces readability and testability. Extracting or naming the callback would clarify its purpose and make unit testing or reuse easier.
🔧 How do I fix it?
Extract complex anonymous functions into named functions with descriptive names, or add explanatory comments for their purpose.
Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info
Closes #210. Sub-issue of #181.
What this PR ships
Schema (
migrations/025_user_anthropic_keys_max_concurrent.sql)Existing rows pick up the default 8 via the migration. Bounds mirror the API/UI validation so an out-of-range row is structurally impossible.
API
GET /api/v1/me/anthropic-keynow returnsmax_concurrent: i32 | null.PUT /api/v1/me/anthropic-keyaccepts an optionalmax_concurrent.Nonepreserves the existing value on rotation, falls back to the DB default8on first insert. Out-of-range values rejected with a clear 400 before hitting the DB CHECK.Proxy handler
Two semaphores acquired on every request, both held for the full lifetime of the response stream:
Option<Arc<Semaphore>>fromPROXY_MAX_GLOBAL_CONCURRENT. Unset = unlimited (default; correct for current small-team deployments). Operator turns it on after capacity testing.DashMap<Uuid, Arc<Semaphore>>on AppState, keyed byuser_anthropic_keys.user_id. Lazily created on first request per credential, sized to the row'smax_concurrentat that moment.Acquisition order is global → per-credential. Permits drop in reverse order via RAII (per-credential first), which is exception-safe regardless of where a request fails.
Critical detail: permits ride with the response body stream (
PermitHoldingStreamwrapper) rather than with the handler stack frame. Without this, SSE responses would release the permit the moment the upstream headers came back, breaking the cap for long generations. Tested.Update semantics
Lazy. A PUT that changes
max_concurrentupdates the DB row but does not invalidate the in-memory semaphore. The new cap applies on the next process restart, or after the entry is evicted. Avoids atomic-swap edge cases on cap changes. Documented in code and surfaced in the UI ("New value applies on the next server restart").Rejection behavior
Anthropic-shaped 429 with
error.type = "overloaded_error"— sametypethe real upstream returns under load, so unmodified clients (Claude Code, GSD2) already know to back off."Too many concurrent requests against this credential (cap: N). Retry shortly.""Server is at capacity. Retry shortly."Every reject logged via
tracing::warn!with structured fields (reason = per_credential_cap | global_cap,cap_value,user_id,path) so post-merge we can distinguish "cap too low" from "someone is attacking."Web
Numeric input on
/me/proxy/next to the key field, saved together. Pre-filled with the current value on rotation, defaults to 8 for fresh keys. Client-side bounds match the DB CHECK so the user sees an inline error instead of a 400. Existing config row shows the current cap inline ("· cap 8") next to the "Configured / last set" line.Out of scope
proxy_tokenstable.Verification
cargo fmt --checkcargo clippy --workspace --tests --all-targets -- -D warningscargo test --workspaceproxy_integrationtestsrepo_user_anthropic_keys_testpnpm checkNew tests
proxy_rejects_when_per_credential_cap_exceeded— 2 in-flight succeed, 3rd returns 429 withcap: 2in message.proxy_frees_permit_when_request_completes— sequential request after first completes proves the permit drops correctly when the streaming body finishes.proxy_rejects_when_global_cap_exceeded— second user blocked by global cap even with their own per-credential budget free.upsert_persists_explicit_max_concurrent— cap roundtrip through repo.upsert_without_cap_preserves_existing_value— key rotation without cap argument keeps existing value.upsert_with_new_cap_overrides_existing_value— explicit cap on update wins.status_reflects_presence— GET endpoint exposes the cap.Files
crates/tracevault-server/migrations/025_user_anthropic_keys_max_concurrent.sql(new)crates/tracevault-server/src/api/proxy.rs— permits, RAII stream wrapper, 429 pathscrates/tracevault-server/src/api/me.rs—max_concurrentin request/responsecrates/tracevault-server/src/repo/user_anthropic_keys.rs—StoredCredential,StoredStatus, COALESCE-based upsertcrates/tracevault-server/src/lib.rs— two new AppState fieldscrates/tracevault-server/src/main.rs— env-driven global semaphore, DashMap constructioncrates/tracevault-server/Cargo.toml—dashmap = "6",futures-util = "0.3"web/src/routes/me/proxy/+page.svelte— numeric input + status display