Issue
KafkaTransport::new in
src/transport/kafka/mod.rs:227
always allocates BOTH a BaseConsumer<StatsContext> AND a
FutureProducer<StatsContext> from the same ClientConfig, even when
the caller only needs one role.
For producer-only callers, the correct librdkafka config has an empty
group.id (producers have no consumer group). But with empty
group.id, librdkafka treats the instance as producer-only,
rd_kafka_queue_get_consumer returns NULL, and the BaseConsumer
construction fails with:
Client creation error: rdkafka consumer queue not available
Reproduces from dfe-transform-vrl, which builds two rustlib
KafkaTransport instances -- one consumer-side (source), one
producer-side (sink) -- via its own build_consumer_config /
build_producer_config helpers. The producer config correctly sets
group: String::new(), and the transport then fails to construct.
Symptom in dfe-transform-vrl: logs "initialising pipeline" and nothing
further; no consumer group registered on the broker; no messages
consumed. The container reports "healthy" because the HTTP server keeps
responding while the pipeline task has died -- the actual
pipeline shutdown with error log only appears when the container is
sent SIGTERM and graceful shutdown logs the captured error.
Proposed solution
Three viable approaches; pick the cleanest:
- Split
KafkaTransport into separate consumer / producer
constructors (KafkaTransport::consumer(...) /
KafkaTransport::producer(...)). Each only allocates what it needs.
- Skip
BaseConsumer creation in the existing constructor when
topics.is_empty() && group.is_empty() -- treat that combination as
"producer-only mode".
- Document a "producer-only mode" config that explicitly bypasses the
consumer half, and surface a clear error when the consumer half is
asked for without a group.id.
Whichever approach is chosen, the failure mode of "construct silently
in the wrong role and crash at runtime" needs to go.
Additional info
Workaround in dfe-docker: set librdkafka_options.group.id in the sink
block of the VRL config to a dummy non-empty value. Sink overrides are
applied AFTER the hard-coded empty default in KafkaTransport::new, so
the override wins and BaseConsumer constructs successfully even
though the consumer is never used on the producer transport.
# config/transform-vrl/kafka.yaml -- producer-side workaround
sink:
kind: kafka
brokers: kafka:9092
topic: default_load
librdkafka_options:
group.id: "dfe-transform-vrl-producer-noop"
# ^ never used; only present so BaseConsumer doesn't fail to construct
Pairs with the separate API-contract issue
("Producer cannot send to its configured sink topic"). Both need to
land before dfe-transform-vrl can produce messages end-to-end.
Observed: rustlib 2.7.1 + dfe-transform-vrl 1.1.3.
Issue
KafkaTransport::newinsrc/transport/kafka/mod.rs:227
always allocates BOTH a
BaseConsumer<StatsContext>AND aFutureProducer<StatsContext>from the sameClientConfig, even whenthe caller only needs one role.
For producer-only callers, the correct librdkafka config has an empty
group.id(producers have no consumer group). But with emptygroup.id, librdkafka treats the instance as producer-only,rd_kafka_queue_get_consumerreturns NULL, and theBaseConsumerconstruction fails with:
Reproduces from
dfe-transform-vrl, which builds two rustlibKafkaTransportinstances -- one consumer-side (source), oneproducer-side (sink) -- via its own
build_consumer_config/build_producer_confighelpers. The producer config correctly setsgroup: String::new(), and the transport then fails to construct.Symptom in dfe-transform-vrl: logs "initialising pipeline" and nothing
further; no consumer group registered on the broker; no messages
consumed. The container reports "healthy" because the HTTP server keeps
responding while the pipeline task has died -- the actual
pipeline shutdown with errorlog only appears when the container issent SIGTERM and graceful shutdown logs the captured error.
Proposed solution
Three viable approaches; pick the cleanest:
KafkaTransportinto separate consumer / producerconstructors (
KafkaTransport::consumer(...)/KafkaTransport::producer(...)). Each only allocates what it needs.BaseConsumercreation in the existing constructor whentopics.is_empty() && group.is_empty()-- treat that combination as"producer-only mode".
consumer half, and surface a clear error when the consumer half is
asked for without a
group.id.Whichever approach is chosen, the failure mode of "construct silently
in the wrong role and crash at runtime" needs to go.
Additional info
Workaround in dfe-docker: set
librdkafka_options.group.idin the sinkblock of the VRL config to a dummy non-empty value. Sink overrides are
applied AFTER the hard-coded empty default in
KafkaTransport::new, sothe override wins and
BaseConsumerconstructs successfully eventhough the consumer is never used on the producer transport.
Pairs with the separate API-contract issue
("Producer cannot send to its configured sink topic"). Both need to
land before dfe-transform-vrl can produce messages end-to-end.
Observed: rustlib 2.7.1 + dfe-transform-vrl 1.1.3.