Issue
TransportSender::send(key, payload) in
src/transport/traits.rs documents the key
argument as "topic name" for Kafka.
The Kafka impl in
src/transport/kafka/mod.rs passes that
arg to FutureRecord::to(key).payload(payload) -- i.e. the supposed
"key" is the Kafka topic, and there is no separate slot for the
per-message partition key.
This forces every caller into one of two broken shapes:
- Pass the topic name as
key -- then there is no way to set a real
partition key per message; all messages go to the configured topic
but partition-key-based ordering is gone.
- Pass a partition key as
key (the obvious read of the trait name) --
then FutureRecord::to(<partition-key>) either produces to an
empty/garbage topic (InvalidTopic from the broker) or scatters
messages across one topic per distinct key value, none of which are
the configured sink topic.
A caller cannot do "send to topic X with partition key K" using this
trait.
Proposed solution
- Replace
send(key, payload) with a Kafka-aware signature that
separates topic from key:
- Option A: extend
TransportSender itself --
send(target: &str, key: Option<&[u8]>, payload: &[u8]). target
is "topic" for Kafka, "address" for gRPC, etc. key is optional
and Kafka-specific (ignored by transports that have no concept of
partition key).
- Option B: introduce a separate
KafkaSender trait with
send_keyed(topic, key, payload) and keep the generic
TransportSender::send(target, payload) for transports without
partition semantics.
- Either way, the "topic" and "partition key" slots have to be distinct
in the API so callers cannot pick the wrong one.
- Audit every existing impl and call site after the change; the
current behaviour is wrong everywhere, not just at one call site.
Additional info
Reproduction: VRL 1.1.3 + rustlib 2.7.1, default config.
VRL's pipeline extracts a per-message partition key via
extract_key(value, key_field) and passes THAT to
producer.send(key_str, &serialized). So the produce target topic
ends up being either:
- empty string (when
sink.key_field is unset, the default) ->
librdkafka rejects with InvalidTopic; OR
- the value of the per-event partition-key field (when
sink.key_field
is set) -> messages scatter across one topic per distinct key value,
none of which is the configured sink.topic.
Symptoms after the empty-group-id workaround on the related issue
("KafkaTransport always allocates both BaseConsumer and FutureProducer")
lands: VRL reaches the event loop, consumes from default_land, then
on every batch logs:
ERROR fatal produce error
error=transport send error: Message production error:
InvalidTopic (Broker: Invalid topic)
sink.topic (default_load) is never written to.
There is NO config workaround. The key slot in the rustlib API has
been overloaded for both partition key and topic name. The fix must
be in code -- caller-side workarounds cannot put both values through
a single slot.
Workaround in dfe-docker: bypass VRL entirely. Loader consumes from
default_land directly, so the active data path is
receiver -> default_land -> loader -> ClickHouse and VRL sits idle.
The transform stage can only be restored once this API contract is
fixed.
Cross-ref:
- dfe-transform-vrl pipeline call site:
src/pipeline.rs (extract_key + producer.send)
- Pairs with the separate "producer-only KafkaTransport" issue;
both need to land before VRL produces messages end-to-end.
Issue
TransportSender::send(key, payload)insrc/transport/traits.rs documents the
keyargument as "topic name" for Kafka.
The Kafka impl in
src/transport/kafka/mod.rs passes that
arg to
FutureRecord::to(key).payload(payload)-- i.e. the supposed"key" is the Kafka topic, and there is no separate slot for the
per-message partition key.
This forces every caller into one of two broken shapes:
key-- then there is no way to set a realpartition key per message; all messages go to the configured topic
but partition-key-based ordering is gone.
key(the obvious read of the trait name) --then
FutureRecord::to(<partition-key>)either produces to anempty/garbage topic (
InvalidTopicfrom the broker) or scattersmessages across one topic per distinct key value, none of which are
the configured sink topic.
A caller cannot do "send to topic X with partition key K" using this
trait.
Proposed solution
send(key, payload)with a Kafka-aware signature thatseparates topic from key:
TransportSenderitself --send(target: &str, key: Option<&[u8]>, payload: &[u8]).targetis "topic" for Kafka, "address" for gRPC, etc.
keyis optionaland Kafka-specific (ignored by transports that have no concept of
partition key).
KafkaSendertrait withsend_keyed(topic, key, payload)and keep the genericTransportSender::send(target, payload)for transports withoutpartition semantics.
in the API so callers cannot pick the wrong one.
current behaviour is wrong everywhere, not just at one call site.
Additional info
Reproduction: VRL 1.1.3 + rustlib 2.7.1, default config.
VRL's pipeline extracts a per-message partition key via
extract_key(value, key_field)and passes THAT toproducer.send(key_str, &serialized). So the produce target topicends up being either:
sink.key_fieldis unset, the default) ->librdkafka rejects with
InvalidTopic; ORsink.key_fieldis set) -> messages scatter across one topic per distinct key value,
none of which is the configured
sink.topic.Symptoms after the empty-group-id workaround on the related issue
("KafkaTransport always allocates both BaseConsumer and FutureProducer")
lands: VRL reaches the event loop, consumes from
default_land, thenon every batch logs:
sink.topic(default_load) is never written to.There is NO config workaround. The
keyslot in the rustlib API hasbeen overloaded for both partition key and topic name. The fix must
be in code -- caller-side workarounds cannot put both values through
a single slot.
Workaround in dfe-docker: bypass VRL entirely. Loader consumes from
default_landdirectly, so the active data path isreceiver -> default_land -> loader -> ClickHouseand VRL sits idle.The transform stage can only be restored once this API contract is
fixed.
Cross-ref:
src/pipeline.rs(extract_key + producer.send)both need to land before VRL produces messages end-to-end.