Skip to content

KafkaTransport always allocates both BaseConsumer and FutureProducer, fails for producer-only callers with empty group.id #36

@kazmosahebi

Description

@kazmosahebi

Issue

KafkaTransport::new in
src/transport/kafka/mod.rs:227
always allocates BOTH a BaseConsumer<StatsContext> AND a
FutureProducer<StatsContext> from the same ClientConfig, even when
the caller only needs one role.

For producer-only callers, the correct librdkafka config has an empty
group.id (producers have no consumer group). But with empty
group.id, librdkafka treats the instance as producer-only,
rd_kafka_queue_get_consumer returns NULL, and the BaseConsumer
construction fails with:

Client creation error: rdkafka consumer queue not available

Reproduces from dfe-transform-vrl, which builds two rustlib
KafkaTransport instances -- one consumer-side (source), one
producer-side (sink) -- via its own build_consumer_config /
build_producer_config helpers. The producer config correctly sets
group: String::new(), and the transport then fails to construct.

Symptom in dfe-transform-vrl: logs "initialising pipeline" and nothing
further; no consumer group registered on the broker; no messages
consumed. The container reports "healthy" because the HTTP server keeps
responding while the pipeline task has died -- the actual
pipeline shutdown with error log only appears when the container is
sent SIGTERM and graceful shutdown logs the captured error.

Proposed solution

Three viable approaches; pick the cleanest:

  • Split KafkaTransport into separate consumer / producer
    constructors (KafkaTransport::consumer(...) /
    KafkaTransport::producer(...)). Each only allocates what it needs.
  • Skip BaseConsumer creation in the existing constructor when
    topics.is_empty() && group.is_empty() -- treat that combination as
    "producer-only mode".
  • Document a "producer-only mode" config that explicitly bypasses the
    consumer half, and surface a clear error when the consumer half is
    asked for without a group.id.

Whichever approach is chosen, the failure mode of "construct silently
in the wrong role and crash at runtime" needs to go.

Additional info

Workaround in dfe-docker: set librdkafka_options.group.id in the sink
block of the VRL config to a dummy non-empty value. Sink overrides are
applied AFTER the hard-coded empty default in KafkaTransport::new, so
the override wins and BaseConsumer constructs successfully even
though the consumer is never used on the producer transport.

# config/transform-vrl/kafka.yaml -- producer-side workaround
sink:
  kind: kafka
  brokers: kafka:9092
  topic: default_load
  librdkafka_options:
    group.id: "dfe-transform-vrl-producer-noop"
    # ^ never used; only present so BaseConsumer doesn't fail to construct

Pairs with the separate API-contract issue
("Producer cannot send to its configured sink topic"). Both need to
land before dfe-transform-vrl can produce messages end-to-end.

Observed: rustlib 2.7.1 + dfe-transform-vrl 1.1.3.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions