Skip to content

feat(connectors): add Apache Doris sink connector#3215

Open
ryankert01 wants to merge 7 commits into
apache:masterfrom
ryankert01:feat/doris-sink-connector
Open

feat(connectors): add Apache Doris sink connector#3215
ryankert01 wants to merge 7 commits into
apache:masterfrom
ryankert01:feat/doris-sink-connector

Conversation

@ryankert01
Copy link
Copy Markdown
Member

@ryankert01 ryankert01 commented May 5, 2026

Which issue does this PR close?

Closes #3112

Rationale

Adds an Apache Doris sink so Iggy streams can be written into Doris for analytical querying.

What changed?

Iggy had no path to land messages in Apache Doris. A new iggy_connector_doris_sink crate consumes JSON payloads and writes them via Doris's HTTP Stream Load API (PUT /api/{db}/{table}/_stream_load).

The non-obvious bits the connector handles: re-attaching Authorization across the FE→BE 307 redirect (which reqwest strips by default), parsing the JSON Status body to classify success / Label Already Exists / transient (Publish Timeout, 5xx) / permanent (Fail, 4xx, unknown), and emitting a deterministic per-batch label so replays are deduplicated by Doris's label-keep window. v1 is sink-only, JSON-only, HTTP Basic auth only, and assumes pre-created tables — no DDL.

Local Execution

  • Passed
  • Pre-commit hooks ran. Pre-push C#/Java hooks skipped (no dotnet/JDK locally; contribution is Rust-only).

AI Usage

  1. Claude Code (Anthropic).
  2. Crate scaffolding against quickwit_sink / influxdb_sink, testcontainer fixture, and iteration on the Stream Load redirect + Status-body classification.
  3. 14 unit tests + 6 integration tests against a real apache/doris:doris-all-in-one-2.1.0 container, covering happy path, 1k-row bulk, max_filter_ratio, label-replay dedupe, missing-target-table, and columns derived expressions; row state verified via the MySQL frontend. help write docs.
  4. Yes.

@ryankert01 ryankert01 marked this pull request as draft May 5, 2026 17:58
@ryankert01 ryankert01 changed the title feat(connectors): add Apache Doris sink connector (#2753) feat(connectors): add Apache Doris sink connector May 5, 2026
@ryankert01 ryankert01 force-pushed the feat/doris-sink-connector branch from a9f3652 to b5434dd Compare May 5, 2026 18:17
@codecov
Copy link
Copy Markdown

codecov Bot commented May 5, 2026

Codecov Report

❌ Patch coverage is 89.29336% with 50 lines in your changes missing coverage. Please review.
✅ Project coverage is 51.03%. Comparing base (1ae123f) to head (fd99d7f).

Files with missing lines Patch % Lines
core/connectors/sinks/doris_sink/src/lib.rs 89.29% 37 Missing and 13 partials ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3215       +/-   ##
=============================================
- Coverage     73.78%   51.03%   -22.75%     
  Complexity      943      943               
=============================================
  Files          1200     1199        -1     
  Lines        109094    95293    -13801     
  Branches      85994    72213    -13781     
=============================================
- Hits          80492    48631    -31861     
- Misses        25866    44094    +18228     
+ Partials       2736     2568      -168     
Components Coverage Δ
Rust Core 44.74% <89.29%> (-30.18%) ⬇️
Java SDK 58.44% <ø> (ø)
C# SDK 69.13% <ø> (-0.35%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.41% <ø> (-0.04%) ⬇️
Go SDK 39.80% <ø> (ø)
Files with missing lines Coverage Δ
core/connectors/sinks/doris_sink/src/lib.rs 89.29% <89.29%> (ø)

... and 317 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ryankert01 ryankert01 force-pushed the feat/doris-sink-connector branch 5 times, most recently from 7e263c9 to 03ed46c Compare May 11, 2026 12:58
Sink connector that writes Iggy messages to Apache Doris via the HTTP
Stream Load API. v1 scope: JSON payloads only, HTTP Basic auth,
pre-created tables only (no DDL).

Behaviour:
- Manual 307/308 redirect following (capped at 5) so the Authorization
  header survives the FE -> BE hop, which reqwest strips by default.
- Deterministic per-batch label
  ({prefix}-{stream}-{topic}-{partition}-{first_offset}-{last_offset})
  so replays are deduplicated by Doris within label_keep_max_second.
- Response body Status field drives error classification: Success and
  "Label Already Exists" -> Ok; Publish Timeout -> CannotStoreData
  (transient); Fail or any unknown status -> PermanentHttpError so the
  runtime DLQs the batch instead of looping.
- Optional columns / where / max_filter_ratio / batch_size / timeout
  forwarded as Stream Load headers.
- Password held as secrecy::SecretString; auth header wrapped in
  SecretString so Debug derivation never leaks the base64 credential.
- Client built in open() with InitError on failure; fe_url validated
  there too so a bad config fails at startup rather than first batch.

Tests: 6 integration tests under core/integration/tests/connectors/doris
backed by an apache/doris all-in-one testcontainer (FE HTTP + FE MySQL).
Coverage includes happy path, 1k-row bulk, max_filter_ratio skip path,
label-replay dedupe, missing-target-table (proves no auto-create), and
the columns derived-expression header. The container must bind host:8040
1:1 because the FE 307-redirects to 127.0.0.1:8040; tests are serialized
via a 'doris' nextest test-group (max-threads = 1) so concurrent test
processes don't race for that port.
Addresses review feedback on the Doris sink connector before merge.

Correctness:
- Label format now appends an 8-hex blake3 of the *raw* stream/topic names,
  so streams that sanitize identically (e.g. `events.v1` vs `events_v1`)
  can no longer collide and silently dedupe against each other in Doris.
  Each variable-length segment is also truncated; total label is bounded
  under Doris's 128-char cap regardless of input length.
- `build_label` is now a pure `pub` free function. The integration test's
  manual label construction (used to verify server-side dedupe) now calls
  it directly, so the test cannot drift from the production format.
- `consume` tracks the *most severe* error across chunks via `record_error`:
  permanent shadows transient. The previous first-error strategy let a
  transient error from chunk N hide a permanent error from chunk M and
  caused the runtime to retry forever instead of routing to DLQ.
- HTTP 408 (Request Timeout) and 429 (Too Many Requests) classified as
  `CannotStoreData` (transient). They are 4xx but recoverable; the old
  code lumped them with all 4xx and DLQ'd retryable conditions.
- Parse failures on the response body now return `PermanentHttpError`.
  An unparseable 200-OK is almost always a Doris bug or proxy interference
  — retrying the same bytes won't help.

Security:
- `open()` rejects `database`/`table` values outside `[A-Za-z0-9_]+`.
  Doris would reject them server-side anyway, but rejecting at config-load
  also prevents path traversal in the `/api/{db}/{table}/_stream_load` URL.
- `open()` emits a `warn!` when `fe_url` is `http://` and the host is
  not loopback. README's new "Security notes" section spells out the
  trust boundary the manual-redirect-following implies (a compromised FE
  could exfiltrate credentials via a hostile `Location` header).
- Response body truncated to 4 KB at a UTF-8 boundary before being
  formatted into errors or logs, so a misbehaving proxy that returns a
  giant body cannot OOM the connector or flood logs.

Robustness:
- Explicit `connect_timeout` (5 s) so an unreachable FE fails fast
  instead of consuming the full request timeout on the handshake alone.
- `send_stream_load` takes `bytes::Bytes`; clones inside the redirect
  loop are now refcount bumps instead of full `Vec<u8>` copies.

Observability:
- `warn!` when Doris reports `number_filtered_rows > 0` — schema drift
  in upstream messages was previously logged at `info!` and easy to miss.
- Per-batch success log demoted from `info!` to `debug!`.
- README documents `Expect: 100-continue`, `label_keep_max_second`
  guidance, and the filtered-row alert.

Tests: 21 unit tests pass (was 13, added 8 covering hash-suffix label
collision resistance, label length cap, severity ordering, identifier
validation, and log truncation). All 6 testcontainer integration tests
pass against a real Doris all-in-one image.
@ryankert01 ryankert01 force-pushed the feat/doris-sink-connector branch from 03ed46c to 9fd85d6 Compare May 11, 2026 12:59
@ryankert01 ryankert01 marked this pull request as ready for review May 11, 2026 13:01
@ryankert01 ryankert01 marked this pull request as draft May 11, 2026 13:18
…308 redirects

Three small README gaps surfaced during a re-read against the post-review
code:

- `database` / `table` must match `[A-Za-z0-9_]+`. The connector rejects
  anything else at startup with `Error::InvalidConfigValue` — surface the
  constraint where operators look for it (Requirements + Configuration
  table).
- Non-JSON payloads are dropped with `warn!` and the offset advances past
  them. That is silent data loss, not a recoverable skip, so the README
  now spells it out instead of glossing it as "skipped with a warning".
- `308 Permanent Redirect` is followed in addition to `307` (defensive),
  and the redirect cap of 5 is documented.
@ryankert01 ryankert01 marked this pull request as ready for review May 11, 2026 13:50
@ryankert01
Copy link
Copy Markdown
Member Author

There's a performance optimization(may or may not works) that I want to leave as a follow-up PR:
multi-output-format support: currently use json, but json seems to be slow in Doris. So I can test out csv & parquet.

Four pre-merge check failures from the previous commit, all mechanical:

- typos: `unparseable` → `unparsable` (1 in README, 2 in lib.rs comments).
- markdown lint MD013: README's label-format bullet was 583 chars; split
  into a parent bullet + 3 sub-bullets, all within the 500-char cap.
- rustfmt: trailing blank line in the integration test after the recent
  removal of the local `sanitize` helper.
- cargo sort: `iggy_connector_doris_sink` was added under
  `iggy_connector_sdk` in core/integration/Cargo.toml; reordered so the
  dependency list stays alphabetical.

No behavior change. 21 unit tests still pass; `cargo fmt --check` and
`cargo sort --workspace --check` both clean locally.
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 12, 2026

we'll check this in upcoming 2-3 days.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

/ready

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label May 14, 2026
Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides below findings, please try to improve the tests time :( currently, iggy CI takes 11 minutes in github actions CI. if we merge this PR, it'll increase to 19 minutes, which is unacceptable.

Comment thread core/connectors/sinks/doris_sink/src/lib.rs
// `[A-Za-z0-9_-]`. These caps keep the worst-case label well under that limit.
const MAX_LABEL_PREFIX_LEN: usize = 16;
const MAX_LABEL_NAME_LEN: usize = 16;
const LABEL_HASH_HEX_LEN: usize = 8;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LABEL_HASH_HEX_LEN = 8 truncates blake3 to 32 bits. inside the sanitize-collision class (e.g. events.v1 and events_v1), birthday bound is ~65k. an adversarial multi-tenant namer trivially forces a label collision -- and Doris's server-side label dedupe treats it as Label Already Exists success (lines 376-384), silently dropping one tenant's batch. bump to 16 hex chars (64-bit, birthday ~5B). naive bump alone overflows the 128-char Doris label cap though (P=16 + 2N=32 + 2H=32 + 5 hyphens + 2 underscores + 10 + 20 + 20 = 137 chars), so MAX_LABEL_NAME_LEN needs to drop from 16 to 8 at the same time. with P=16, N=8, H=16 the worst case is 121 chars, under cap. the existing label_stays_under_doris_128_char_cap test at lines 657-674 will catch regressions.

homepage = "https://iggy.apache.org"
documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing publish = false

}
};

let first_offset = chunk.first().map(|m| m.offset).unwrap_or(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunk.first().map(|m| m.offset).unwrap_or(0) and chunk.last()...unwrap_or(0) are unreachable today -- chunks(batch_size) with batch_size >= 1 (guaranteed by batch_size().max(1) at lines 137-139) over a non-empty messages (guarded at line 459) never yields an empty chunk. but if any future refactor breaks that invariant, the fabricated offset = 0 collides with the genuine offset-0 batch label and Doris silently dedupes the wrong batch. replace with chunk.first().expect("non-empty chunk").offset -- panic loud, do not silently corrupt.

2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream}_{hash8}-{topic}_{hash8}-{partition}-{first_offset}-{last_offset}`.
- Each variable-length segment carries a 32-bit blake3 hash of the raw name, so names that sanitize to the same string (e.g. `events.v1` vs `events_v1`) cannot collide.
- The total label is bounded under Doris's 128-char cap regardless of input length.
- Doris dedupes loads by label inside its `label_keep_max_second` window, so a replayed batch (after restart, retry, etc.) is silently absorbed instead of producing duplicates.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and the label_keep_max_second guidance at line 81 are misleading. the runtime ignores the sink's consume() return value and the iggy consumer commits offset at poll time, so after a crash the next batch picks up past the failed one -- at-most-once, not idempotent replay. the only replay that benefits from server-side label dedupe is in-process retry within a single send_stream_load call, which this connector does not loop. either fix the runtime to honor the rc and disable poll-time auto-commit for the sink path, or reword this section to Idempotent retry within a single consume() invocation and add a Limitations bullet on at-most-once-under-crash.

const DEFAULT_TIMEOUT_SECS: u64 = 30;
// A bounded TCP handshake timeout so an unreachable FE fails fast instead of
// eating the entire request-timeout budget on connect alone.
const CONNECT_TIMEOUT_SECS: u64 = 5;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONNECT_TIMEOUT_SECS = 5 is hardcoded while timeout_secs is configurable. cross-region operators with cold-start FE will trip the 5s connect under their otherwise-30s budget. either expose a connect_timeout_secs config field or document the 5s hardcode in the README.

- **`label_keep_max_second`.** Idempotent replay relies on Doris retaining each label for at least as long as it could take the Iggy runtime to redrive a failed batch. The Doris default is 3 days, which is conservative. If you set this lower on the Doris side, make sure your runtime retry budget fits inside the window — once a label expires, a replay re-loads instead of deduping, producing duplicate rows.
- **Filtered-row alerts.** When Doris reports `number_filtered_rows > 0`, the connector emits a `warn!`. This is your signal that upstream message shapes have drifted from the table schema; alert on it.

## Limitations (todo)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no sibling sink README uses the (todo) suffix on this heading. http_sink/README.md uses plain ### Limitations. rename to ## Limitations or ## Roadmap.

.header("label", label)
.body(body.clone());

if let Some(ratio) = self.config.max_filter_ratio {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ratio.to_string() is rebuilt per batch and per redirect, even though max_filter_ratio is config-static (never mutated post-open). same pattern applies to the columns and where_clause headers at lines 188-193, and to the Authorization HeaderValue reparsed every PUT at line 178. precompute these once in open() as Option<HeaderValue> (with set_sensitive(true) for Authorization). sub-microsecond saves but cheap to fix.

.map_err(|e| Error::InitError(format!("Failed to build Doris HTTP client: {e}")))
}

fn stream_load_url(&self) -> String {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_load_url() rebuilds the URL via format! on every send_stream_load call (line 172). fe_url, database, table are config-static and validated in open(). cache the formatted URL string in open() after validate_identifier.

match self.send_stream_load(&label, body).await {
Ok(response) => match classify_status(&response) {
Ok(()) => {
if response.number_filtered_rows > 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number_filtered_rows > 0 emits warn! only. the rows are silently absent from the table -- a semantic data-loss vector. the path is only reachable when the operator opts into max_filter_ratio > 0 (default Doris would return Fail for filtered rows), so warn-only is correct for the opt-in case, but a filtered_rows_total counter metric per {db, table} would let operators alert on schema drift instead of grepping logs.

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels May 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Apache Doris connector

2 participants