feat(sources): post-processing in SourceSender#25563
Conversation
80b263e to
0274176
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 02741763f9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
0274176 to
c68b368
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c68b368f22
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
c68b368 to
782877f
Compare
Add a `PostProcessor` hook to `SourceSender` so sources can attach a VRL program or a hard-coded Rust closure that runs on every event after schema metadata is attached, immediately before the event is placed on the output channel. This provides a first-class, reusable facility (e.g. for HEC token enrichment) instead of requiring per-source workarounds.
782877f to
3b43566
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3b43566249
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| .map(|mut event| { | ||
| #[cfg(debug_assertions)] | ||
| let before = std::mem::discriminant(&event); | ||
| f(&mut event); |
There was a problem hiding this comment.
Preserve metadata across same-variant replacements
Fresh evidence after the variant-preservation fix is that a processor can still replace an event with a new value of the same variant, which passes the discriminant check and is described as supported by the PostProcessor docs. In that case this call discards the original EventMetadata; the code only re-adds schema/upstream afterward, so existing finalizers, secrets, or source event IDs on events from acking sources are lost and downstream delivery will not update the original finalizers. Preserve/merge the original metadata around same-variant replacements, or make whole-event replacement unsupported.
Useful? React with 👍 / 👎.
…e-variant replacements A PostProcessor closure that replaces an event with a new value of the same variant passes the discriminant check but discards the original EventMetadata, losing finalizers, secrets, and source_event_id. This breaks downstream delivery for acking sources. Fix by moving the finalizers out before calling the closure (preventing double-counting on in-place mutations) and cloning the remaining metadata snapshot. After the closure, restore via merge() for secrets/source_event_id and merge_finalizers() for the finalizers.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7ef6bfc7bf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // those fields are absent on the post-closure event. merge_finalizers() | ||
| // re-attaches the finalizers taken above (original_meta has empty | ||
| // finalizers at this point, so merge() itself adds nothing for them). | ||
| event.metadata_mut().merge(original_meta); |
There was a problem hiding this comment.
Preserve metric origin metadata on replacement
Fresh evidence in the current revision is that same-variant whole-event replacement is supported, but this restore path relies on EventMetadata::merge, which does not copy fields such as datadog_origin_metadata or dropped metadata values. If a post-processor replaces a Datadog Agent metric with a new Metric of the same variant, the event passes the variant check but loses the origin metadata set by src/sources/datadog_agent/metrics.rs, so the Datadog metrics encoder later emits it without the pass-through origin information.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
That seems like an issue to resolve in EventMetadata::merge not here.
| /// before the event is placed on the output channel. See [`PostProcessor`] for the available | ||
| /// variants and their error-handling semantics. | ||
| /// | ||
| /// This method may be called before or after [`add_source_output`][Self::add_source_output]; |
There was a problem hiding this comment.
may be called seems a bit odd here, its not one or the other for sure?
| /// - [`PostProcessor::HardCoded`]: calls an infallible Rust closure. No events are dropped by | ||
| /// this variant. | ||
| /// | ||
| /// If per-output post-processing is needed in the future, a `with_post_processor_for_port` API |
There was a problem hiding this comment.
is this comment necessary? I dont know how often we leave future feature notes
bruceg
left a comment
There was a problem hiding this comment.
I'd like to see some discussion on the boxed closure vs trait object alternatives before approving.
| pub enum PostProcessor { | ||
| /// Call a hard-coded Rust function against every event. | ||
| /// | ||
| /// The closure is infallible; no events are dropped by this variant. | ||
| /// | ||
| /// # Contract | ||
| /// | ||
| /// The closure **must not change the event's variant** (log → log, metric → metric, trace → | ||
| /// trace). Changing the variant violates the variant-preservation contract and will panic in | ||
| /// debug builds. | ||
| HardCoded(std::sync::Arc<dyn Fn(&mut crate::event::Event) + Send + Sync>), | ||
| } |
There was a problem hiding this comment.
Since we are usually fine with source-level breaking changes, coding this as an enum instead of a tuple (newtype) struct seems to qualify as YAGNI.
| pub enum PostProcessor { | |
| /// Call a hard-coded Rust function against every event. | |
| /// | |
| /// The closure is infallible; no events are dropped by this variant. | |
| /// | |
| /// # Contract | |
| /// | |
| /// The closure **must not change the event's variant** (log → log, metric → metric, trace → | |
| /// trace). Changing the variant violates the variant-preservation contract and will panic in | |
| /// debug builds. | |
| HardCoded(std::sync::Arc<dyn Fn(&mut crate::event::Event) + Send + Sync>), | |
| } | |
| pub struct PostProcessor(std::sync::Arc<dyn Fn(&mut crate::event::Event) + Send + Sync>); |
| } | ||
|
|
||
| impl Output { | ||
| #[expect(clippy::too_many_arguments)] |
There was a problem hiding this comment.
That is unfortunate. Would it make sense to avoid adding a new parameter and instead add a "builder" function:
fn with_post_processor(mut self, post_processor: PostProcessor) -> Self;|
|
||
| // Move finalizers out before calling the closure to prevent | ||
| // double-counting if the closure mutates the event in-place. | ||
| // Clone the remaining metadata so we can restore secrets and |
There was a problem hiding this comment.
Would it make sense to take the metadata here and restore it after instead? While clone is clone-on-write optimized, that clone is still expensive.
| // those fields are absent on the post-closure event. merge_finalizers() | ||
| // re-attaches the finalizers taken above (original_meta has empty | ||
| // finalizers at this point, so merge() itself adds nothing for them). | ||
| event.metadata_mut().merge(original_meta); |
There was a problem hiding this comment.
That seems like an issue to resolve in EventMetadata::merge not here.
| // Contract: closure must not change the event's variant. | ||
| #[cfg(debug_assertions)] | ||
| debug_assert_eq!( | ||
| before, | ||
| std::mem::discriminant(&event), | ||
| "PostProcessor::HardCoded closure changed the event variant; \ | ||
| this violates the variant-preservation contract" | ||
| ); |
There was a problem hiding this comment.
An alternate mechanism would be to make PostProcessor a trait with separate methods for the different event types (i.e. LogEvent etc), each returning the same type, making it impossible at a source-code level for this to happen. In fact, that might even be preferred for processor implementors as they will likely have to match on the event type anyways internally.
Thinking further, as to my objection above, making it a trait avoids the need for an enum entirely as well.
| pub(super) fn set_post_processor(&mut self, pp: &PostProcessor) { | ||
| self.post_processor = Some(pp.clone()); |
There was a problem hiding this comment.
I think I'd rather see methods like this move the clone into the caller by owning the PostProcessor.
| } | ||
|
|
||
| #[cfg(any(test, feature = "test"))] | ||
| pub fn new_test_with_post_processor( |
There was a problem hiding this comment.
Having a separate with_post_processor modifier avoids needing this extra function.
Summary
Add a `PostProcessor` hook to `SourceSender` so that any source can attach a
hard-coded Rust closure that runs on every event immediately after schema metadata is
attached and before the event is placed on the output channel. The enum is kept
with a single variant so future variants can be added without breaking the public API.
What changed
Rust closure; no events are ever dropped by this variant.
that applies the processor globally to all outputs (default + named ports).
parameter (all existing call sites pass `None`).
processor is set, `apply_post_processor` is called, which returns
`Vec` — one typed sub-array per contiguous run of same-type
surviving events — and each sub-array is sent to the channel. This correctly
handles mixed-type batches without data loss.
Vector configuration
No configuration change is exposed to end users in this PR.
How did you test this PR?
Unit tests in `lib/vector-core/src/source_sender/tests.rs`:
leaves events untouched.
closure mutates every event in the batch.
`make check-clippy` passes (exit 0).
Change Type
Is this a breaking change?