Skip to content

TransportSender::send overloads 'key' arg as Kafka topic name, making producer-side use structurally impossible #37

@kazmosahebi

Description

@kazmosahebi

Issue

TransportSender::send(key, payload) in
src/transport/traits.rs documents the key
argument as "topic name" for Kafka.

The Kafka impl in
src/transport/kafka/mod.rs passes that
arg to FutureRecord::to(key).payload(payload) -- i.e. the supposed
"key" is the Kafka topic, and there is no separate slot for the
per-message partition key.

This forces every caller into one of two broken shapes:

  • Pass the topic name as key -- then there is no way to set a real
    partition key per message; all messages go to the configured topic
    but partition-key-based ordering is gone.
  • Pass a partition key as key (the obvious read of the trait name) --
    then FutureRecord::to(<partition-key>) either produces to an
    empty/garbage topic (InvalidTopic from the broker) or scatters
    messages across one topic per distinct key value, none of which are
    the configured sink topic.

A caller cannot do "send to topic X with partition key K" using this
trait.

Proposed solution

  • Replace send(key, payload) with a Kafka-aware signature that
    separates topic from key:
    • Option A: extend TransportSender itself --
      send(target: &str, key: Option<&[u8]>, payload: &[u8]). target
      is "topic" for Kafka, "address" for gRPC, etc. key is optional
      and Kafka-specific (ignored by transports that have no concept of
      partition key).
    • Option B: introduce a separate KafkaSender trait with
      send_keyed(topic, key, payload) and keep the generic
      TransportSender::send(target, payload) for transports without
      partition semantics.
  • Either way, the "topic" and "partition key" slots have to be distinct
    in the API so callers cannot pick the wrong one.
  • Audit every existing impl and call site after the change; the
    current behaviour is wrong everywhere, not just at one call site.

Additional info

Reproduction: VRL 1.1.3 + rustlib 2.7.1, default config.

VRL's pipeline extracts a per-message partition key via
extract_key(value, key_field) and passes THAT to
producer.send(key_str, &serialized). So the produce target topic
ends up being either:

  • empty string (when sink.key_field is unset, the default) ->
    librdkafka rejects with InvalidTopic; OR
  • the value of the per-event partition-key field (when sink.key_field
    is set) -> messages scatter across one topic per distinct key value,
    none of which is the configured sink.topic.

Symptoms after the empty-group-id workaround on the related issue
("KafkaTransport always allocates both BaseConsumer and FutureProducer")
lands: VRL reaches the event loop, consumes from default_land, then
on every batch logs:

ERROR fatal produce error
  error=transport send error: Message production error:
  InvalidTopic (Broker: Invalid topic)

sink.topic (default_load) is never written to.

There is NO config workaround. The key slot in the rustlib API has
been overloaded for both partition key and topic name. The fix must
be in code -- caller-side workarounds cannot put both values through
a single slot.

Workaround in dfe-docker: bypass VRL entirely. Loader consumes from
default_land directly, so the active data path is
receiver -> default_land -> loader -> ClickHouse and VRL sits idle.
The transform stage can only be restored once this API contract is
fixed.

Cross-ref:

  • dfe-transform-vrl pipeline call site:
    src/pipeline.rs (extract_key + producer.send)
  • Pairs with the separate "producer-only KafkaTransport" issue;
    both need to land before VRL produces messages end-to-end.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions