feat(connectors): add Apache Doris sink connector#3215
Conversation
a9f3652 to
b5434dd
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3215 +/- ##
=============================================
- Coverage 73.78% 51.03% -22.75%
Complexity 943 943
=============================================
Files 1200 1199 -1
Lines 109094 95293 -13801
Branches 85994 72213 -13781
=============================================
- Hits 80492 48631 -31861
- Misses 25866 44094 +18228
+ Partials 2736 2568 -168
🚀 New features to boost your workflow:
|
7e263c9 to
03ed46c
Compare
Sink connector that writes Iggy messages to Apache Doris via the HTTP
Stream Load API. v1 scope: JSON payloads only, HTTP Basic auth,
pre-created tables only (no DDL).
Behaviour:
- Manual 307/308 redirect following (capped at 5) so the Authorization
header survives the FE -> BE hop, which reqwest strips by default.
- Deterministic per-batch label
({prefix}-{stream}-{topic}-{partition}-{first_offset}-{last_offset})
so replays are deduplicated by Doris within label_keep_max_second.
- Response body Status field drives error classification: Success and
"Label Already Exists" -> Ok; Publish Timeout -> CannotStoreData
(transient); Fail or any unknown status -> PermanentHttpError so the
runtime DLQs the batch instead of looping.
- Optional columns / where / max_filter_ratio / batch_size / timeout
forwarded as Stream Load headers.
- Password held as secrecy::SecretString; auth header wrapped in
SecretString so Debug derivation never leaks the base64 credential.
- Client built in open() with InitError on failure; fe_url validated
there too so a bad config fails at startup rather than first batch.
Tests: 6 integration tests under core/integration/tests/connectors/doris
backed by an apache/doris all-in-one testcontainer (FE HTTP + FE MySQL).
Coverage includes happy path, 1k-row bulk, max_filter_ratio skip path,
label-replay dedupe, missing-target-table (proves no auto-create), and
the columns derived-expression header. The container must bind host:8040
1:1 because the FE 307-redirects to 127.0.0.1:8040; tests are serialized
via a 'doris' nextest test-group (max-threads = 1) so concurrent test
processes don't race for that port.
Addresses review feedback on the Doris sink connector before merge.
Correctness:
- Label format now appends an 8-hex blake3 of the *raw* stream/topic names,
so streams that sanitize identically (e.g. `events.v1` vs `events_v1`)
can no longer collide and silently dedupe against each other in Doris.
Each variable-length segment is also truncated; total label is bounded
under Doris's 128-char cap regardless of input length.
- `build_label` is now a pure `pub` free function. The integration test's
manual label construction (used to verify server-side dedupe) now calls
it directly, so the test cannot drift from the production format.
- `consume` tracks the *most severe* error across chunks via `record_error`:
permanent shadows transient. The previous first-error strategy let a
transient error from chunk N hide a permanent error from chunk M and
caused the runtime to retry forever instead of routing to DLQ.
- HTTP 408 (Request Timeout) and 429 (Too Many Requests) classified as
`CannotStoreData` (transient). They are 4xx but recoverable; the old
code lumped them with all 4xx and DLQ'd retryable conditions.
- Parse failures on the response body now return `PermanentHttpError`.
An unparseable 200-OK is almost always a Doris bug or proxy interference
— retrying the same bytes won't help.
Security:
- `open()` rejects `database`/`table` values outside `[A-Za-z0-9_]+`.
Doris would reject them server-side anyway, but rejecting at config-load
also prevents path traversal in the `/api/{db}/{table}/_stream_load` URL.
- `open()` emits a `warn!` when `fe_url` is `http://` and the host is
not loopback. README's new "Security notes" section spells out the
trust boundary the manual-redirect-following implies (a compromised FE
could exfiltrate credentials via a hostile `Location` header).
- Response body truncated to 4 KB at a UTF-8 boundary before being
formatted into errors or logs, so a misbehaving proxy that returns a
giant body cannot OOM the connector or flood logs.
Robustness:
- Explicit `connect_timeout` (5 s) so an unreachable FE fails fast
instead of consuming the full request timeout on the handshake alone.
- `send_stream_load` takes `bytes::Bytes`; clones inside the redirect
loop are now refcount bumps instead of full `Vec<u8>` copies.
Observability:
- `warn!` when Doris reports `number_filtered_rows > 0` — schema drift
in upstream messages was previously logged at `info!` and easy to miss.
- Per-batch success log demoted from `info!` to `debug!`.
- README documents `Expect: 100-continue`, `label_keep_max_second`
guidance, and the filtered-row alert.
Tests: 21 unit tests pass (was 13, added 8 covering hash-suffix label
collision resistance, label length cap, severity ordering, identifier
validation, and log truncation). All 6 testcontainer integration tests
pass against a real Doris all-in-one image.
03ed46c to
9fd85d6
Compare
…308 redirects Three small README gaps surfaced during a re-read against the post-review code: - `database` / `table` must match `[A-Za-z0-9_]+`. The connector rejects anything else at startup with `Error::InvalidConfigValue` — surface the constraint where operators look for it (Requirements + Configuration table). - Non-JSON payloads are dropped with `warn!` and the offset advances past them. That is silent data loss, not a recoverable skip, so the README now spells it out instead of glossing it as "skipped with a warning". - `308 Permanent Redirect` is followed in addition to `307` (defensive), and the redirect cap of 5 is documented.
|
There's a performance optimization(may or may not works) that I want to leave as a follow-up PR: |
Four pre-merge check failures from the previous commit, all mechanical: - typos: `unparseable` → `unparsable` (1 in README, 2 in lib.rs comments). - markdown lint MD013: README's label-format bullet was 583 chars; split into a parent bullet + 3 sub-bullets, all within the 500-char cap. - rustfmt: trailing blank line in the integration test after the recent removal of the local `sanitize` helper. - cargo sort: `iggy_connector_doris_sink` was added under `iggy_connector_sdk` in core/integration/Cargo.toml; reordered so the dependency list stays alphabetical. No behavior change. 21 unit tests still pass; `cargo fmt --check` and `cargo sort --workspace --check` both clean locally.
|
we'll check this in upcoming 2-3 days. |
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
besides below findings, please try to improve the tests time :( currently, iggy CI takes 11 minutes in github actions CI. if we merge this PR, it'll increase to 19 minutes, which is unacceptable.
| // `[A-Za-z0-9_-]`. These caps keep the worst-case label well under that limit. | ||
| const MAX_LABEL_PREFIX_LEN: usize = 16; | ||
| const MAX_LABEL_NAME_LEN: usize = 16; | ||
| const LABEL_HASH_HEX_LEN: usize = 8; |
There was a problem hiding this comment.
LABEL_HASH_HEX_LEN = 8 truncates blake3 to 32 bits. inside the sanitize-collision class (e.g. events.v1 and events_v1), birthday bound is ~65k. an adversarial multi-tenant namer trivially forces a label collision -- and Doris's server-side label dedupe treats it as Label Already Exists success (lines 376-384), silently dropping one tenant's batch. bump to 16 hex chars (64-bit, birthday ~5B). naive bump alone overflows the 128-char Doris label cap though (P=16 + 2N=32 + 2H=32 + 5 hyphens + 2 underscores + 10 + 20 + 20 = 137 chars), so MAX_LABEL_NAME_LEN needs to drop from 16 to 8 at the same time. with P=16, N=8, H=16 the worst case is 121 chars, under cap. the existing label_stays_under_doris_128_char_cap test at lines 657-674 will catch regressions.
| homepage = "https://iggy.apache.org" | ||
| documentation = "https://iggy.apache.org/docs" | ||
| repository = "https://github.com/apache/iggy" | ||
| readme = "../../README.md" |
| } | ||
| }; | ||
|
|
||
| let first_offset = chunk.first().map(|m| m.offset).unwrap_or(0); |
There was a problem hiding this comment.
chunk.first().map(|m| m.offset).unwrap_or(0) and chunk.last()...unwrap_or(0) are unreachable today -- chunks(batch_size) with batch_size >= 1 (guaranteed by batch_size().max(1) at lines 137-139) over a non-empty messages (guarded at line 459) never yields an empty chunk. but if any future refactor breaks that invariant, the fabricated offset = 0 collides with the genuine offset-0 batch label and Doris silently dedupes the wrong batch. replace with chunk.first().expect("non-empty chunk").offset -- panic loud, do not silently corrupt.
| 2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream}_{hash8}-{topic}_{hash8}-{partition}-{first_offset}-{last_offset}`. | ||
| - Each variable-length segment carries a 32-bit blake3 hash of the raw name, so names that sanitize to the same string (e.g. `events.v1` vs `events_v1`) cannot collide. | ||
| - The total label is bounded under Doris's 128-char cap regardless of input length. | ||
| - Doris dedupes loads by label inside its `label_keep_max_second` window, so a replayed batch (after restart, retry, etc.) is silently absorbed instead of producing duplicates. |
There was a problem hiding this comment.
this and the label_keep_max_second guidance at line 81 are misleading. the runtime ignores the sink's consume() return value and the iggy consumer commits offset at poll time, so after a crash the next batch picks up past the failed one -- at-most-once, not idempotent replay. the only replay that benefits from server-side label dedupe is in-process retry within a single send_stream_load call, which this connector does not loop. either fix the runtime to honor the rc and disable poll-time auto-commit for the sink path, or reword this section to Idempotent retry within a single consume() invocation and add a Limitations bullet on at-most-once-under-crash.
| const DEFAULT_TIMEOUT_SECS: u64 = 30; | ||
| // A bounded TCP handshake timeout so an unreachable FE fails fast instead of | ||
| // eating the entire request-timeout budget on connect alone. | ||
| const CONNECT_TIMEOUT_SECS: u64 = 5; |
There was a problem hiding this comment.
CONNECT_TIMEOUT_SECS = 5 is hardcoded while timeout_secs is configurable. cross-region operators with cold-start FE will trip the 5s connect under their otherwise-30s budget. either expose a connect_timeout_secs config field or document the 5s hardcode in the README.
| - **`label_keep_max_second`.** Idempotent replay relies on Doris retaining each label for at least as long as it could take the Iggy runtime to redrive a failed batch. The Doris default is 3 days, which is conservative. If you set this lower on the Doris side, make sure your runtime retry budget fits inside the window — once a label expires, a replay re-loads instead of deduping, producing duplicate rows. | ||
| - **Filtered-row alerts.** When Doris reports `number_filtered_rows > 0`, the connector emits a `warn!`. This is your signal that upstream message shapes have drifted from the table schema; alert on it. | ||
|
|
||
| ## Limitations (todo) |
There was a problem hiding this comment.
no sibling sink README uses the (todo) suffix on this heading. http_sink/README.md uses plain ### Limitations. rename to ## Limitations or ## Roadmap.
| .header("label", label) | ||
| .body(body.clone()); | ||
|
|
||
| if let Some(ratio) = self.config.max_filter_ratio { |
There was a problem hiding this comment.
ratio.to_string() is rebuilt per batch and per redirect, even though max_filter_ratio is config-static (never mutated post-open). same pattern applies to the columns and where_clause headers at lines 188-193, and to the Authorization HeaderValue reparsed every PUT at line 178. precompute these once in open() as Option<HeaderValue> (with set_sensitive(true) for Authorization). sub-microsecond saves but cheap to fix.
| .map_err(|e| Error::InitError(format!("Failed to build Doris HTTP client: {e}"))) | ||
| } | ||
|
|
||
| fn stream_load_url(&self) -> String { |
There was a problem hiding this comment.
stream_load_url() rebuilds the URL via format! on every send_stream_load call (line 172). fe_url, database, table are config-static and validated in open(). cache the formatted URL string in open() after validate_identifier.
| match self.send_stream_load(&label, body).await { | ||
| Ok(response) => match classify_status(&response) { | ||
| Ok(()) => { | ||
| if response.number_filtered_rows > 0 { |
There was a problem hiding this comment.
number_filtered_rows > 0 emits warn! only. the rows are silently absent from the table -- a semantic data-loss vector. the path is only reachable when the operator opts into max_filter_ratio > 0 (default Doris would return Fail for filtered rows), so warn-only is correct for the opt-in case, but a filtered_rows_total counter metric per {db, table} would let operators alert on schema drift instead of grepping logs.
…on-json or empty json
Which issue does this PR close?
Closes #3112
Rationale
Adds an Apache Doris sink so Iggy streams can be written into Doris for analytical querying.
What changed?
Iggy had no path to land messages in Apache Doris. A new
iggy_connector_doris_sinkcrate consumes JSON payloads and writes them via Doris's HTTP Stream Load API (PUT /api/{db}/{table}/_stream_load).The non-obvious bits the connector handles: re-attaching
Authorizationacross the FE→BE 307 redirect (whichreqweststrips by default), parsing the JSONStatusbody to classify success /Label Already Exists/ transient (Publish Timeout, 5xx) / permanent (Fail, 4xx, unknown), and emitting a deterministic per-batch label so replays are deduplicated by Doris's label-keep window. v1 is sink-only, JSON-only, HTTP Basic auth only, and assumes pre-created tables — no DDL.Local Execution
AI Usage
quickwit_sink/influxdb_sink, testcontainer fixture, and iteration on the Stream Load redirect +Status-body classification.apache/doris:doris-all-in-one-2.1.0container, covering happy path, 1k-row bulk,max_filter_ratio, label-replay dedupe, missing-target-table, andcolumnsderived expressions; row state verified via the MySQL frontend. help write docs.