Skip to content

feat(sources): post-processing in SourceSender#25563

Open
20agbekodo wants to merge 3 commits into
vectordotdev:masterfrom
20agbekodo:josue.agbekodo/post-processing-in-source-sender
Open

feat(sources): post-processing in SourceSender#25563
20agbekodo wants to merge 3 commits into
vectordotdev:masterfrom
20agbekodo:josue.agbekodo/post-processing-in-source-sender

Conversation

@20agbekodo

@20agbekodo 20agbekodo commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Summary

Add a `PostProcessor` hook to `SourceSender` so that any source can attach a
hard-coded Rust closure that runs on every event immediately after schema metadata is
attached and before the event is placed on the output channel. The enum is kept
with a single variant so future variants can be added without breaking the public API.

What changed

  • `PostProcessor` enum (public, in `mod.rs`): one variant:
    • `HardCoded(Arc<dyn Fn(&mut Event) + Send + Sync>)` — calls an infallible
      Rust closure; no events are ever dropped by this variant.
  • `Builder::with_post_processor(pp: PostProcessor) -> Self`: fluent setter
    that applies the processor globally to all outputs (default + named ports).
  • `Output::new_with_buffer`: new `post_processor: Option`
    parameter (all existing call sites pass `None`).
  • `Output::send_inner`: fast path (no processor) unchanged; when a
    processor is set, `apply_post_processor` is called, which returns
    `Vec` — one typed sub-array per contiguous run of same-type
    surviving events — and each sub-array is sent to the channel. This correctly
    handles mixed-type batches without data loss.
  • Changelog fragment: `changelog.d/source_sender_post_processor.feature.md`

Vector configuration

No configuration change is exposed to end users in this PR.

How did you test this PR?

Unit tests in `lib/vector-core/src/source_sender/tests.rs`:

  • `post_processor_none_is_noop` — verifies that the absence of a processor
    leaves events untouched.
  • `post_processor_hard_coded_mutates_events` — verifies that a `HardCoded`
    closure mutates every event in the batch.

`make check-clippy` passes (exit 0).

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

@github-actions github-actions Bot added the domain: core Anything related to core crates i.e. vector-core, core-common, etc label Jun 2, 2026
@20agbekodo 20agbekodo force-pushed the josue.agbekodo/post-processing-in-source-sender branch 7 times, most recently from 80b263e to 0274176 Compare June 4, 2026 09:40
@20agbekodo 20agbekodo marked this pull request as ready for review June 4, 2026 12:03
@20agbekodo 20agbekodo requested a review from a team as a code owner June 4, 2026 12:03

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 02741763f9

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread lib/vector-core/src/source_sender/output.rs Outdated
Comment thread lib/vector-core/src/source_sender/builder.rs
Comment thread lib/vector-core/src/source_sender/output.rs Outdated
@20agbekodo 20agbekodo force-pushed the josue.agbekodo/post-processing-in-source-sender branch from 0274176 to c68b368 Compare June 4, 2026 12:18

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c68b368f22

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread lib/vector-core/src/source_sender/output.rs Outdated
Comment thread lib/vector-core/src/source_sender/output.rs Outdated
Comment thread lib/vector-core/src/source_sender/output.rs Outdated
@20agbekodo 20agbekodo marked this pull request as draft June 4, 2026 12:37
@20agbekodo 20agbekodo force-pushed the josue.agbekodo/post-processing-in-source-sender branch from c68b368 to 782877f Compare June 4, 2026 12:39
Add a `PostProcessor` hook to `SourceSender` so sources can attach a VRL
program or a hard-coded Rust closure that runs on every event after schema
metadata is attached, immediately before the event is placed on the output
channel. This provides a first-class, reusable facility (e.g. for HEC token
enrichment) instead of requiring per-source workarounds.
@20agbekodo 20agbekodo force-pushed the josue.agbekodo/post-processing-in-source-sender branch from 782877f to 3b43566 Compare June 4, 2026 13:00
@20agbekodo 20agbekodo marked this pull request as ready for review June 4, 2026 13:36

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3b43566249

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

.map(|mut event| {
#[cfg(debug_assertions)]
let before = std::mem::discriminant(&event);
f(&mut event);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve metadata across same-variant replacements

Fresh evidence after the variant-preservation fix is that a processor can still replace an event with a new value of the same variant, which passes the discriminant check and is described as supported by the PostProcessor docs. In that case this call discards the original EventMetadata; the code only re-adds schema/upstream afterward, so existing finalizers, secrets, or source event IDs on events from acking sources are lost and downstream delivery will not update the original finalizers. Preserve/merge the original metadata around same-variant replacements, or make whole-event replacement unsupported.

Useful? React with 👍 / 👎.

@bruceg bruceg self-requested a review June 4, 2026 23:31
@20agbekodo 20agbekodo marked this pull request as draft June 10, 2026 14:29
…e-variant replacements

A PostProcessor closure that replaces an event with a new value of the
same variant passes the discriminant check but discards the original
EventMetadata, losing finalizers, secrets, and source_event_id. This
breaks downstream delivery for acking sources.

Fix by moving the finalizers out before calling the closure (preventing
double-counting on in-place mutations) and cloning the remaining metadata
snapshot. After the closure, restore via merge() for secrets/source_event_id
and merge_finalizers() for the finalizers.
@20agbekodo 20agbekodo marked this pull request as ready for review June 11, 2026 09:09

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7ef6bfc7bf

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

// those fields are absent on the post-closure event. merge_finalizers()
// re-attaches the finalizers taken above (original_meta has empty
// finalizers at this point, so merge() itself adds nothing for them).
event.metadata_mut().merge(original_meta);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve metric origin metadata on replacement

Fresh evidence in the current revision is that same-variant whole-event replacement is supported, but this restore path relies on EventMetadata::merge, which does not copy fields such as datadog_origin_metadata or dropped metadata values. If a post-processor replaces a Datadog Agent metric with a new Metric of the same variant, the event passes the variant check but loses the origin metadata set by src/sources/datadog_agent/metrics.rs, so the Datadog metrics encoder later emits it without the pass-through origin information.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like an issue to resolve in EventMetadata::merge not here.

/// before the event is placed on the output channel. See [`PostProcessor`] for the available
/// variants and their error-handling semantics.
///
/// This method may be called before or after [`add_source_output`][Self::add_source_output];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be called seems a bit odd here, its not one or the other for sure?

/// - [`PostProcessor::HardCoded`]: calls an infallible Rust closure. No events are dropped by
/// this variant.
///
/// If per-output post-processing is needed in the future, a `with_post_processor_for_port` API

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment necessary? I dont know how often we leave future feature notes

@bruceg bruceg left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some discussion on the boxed closure vs trait object alternatives before approving.

Comment on lines +54 to +65
pub enum PostProcessor {
/// Call a hard-coded Rust function against every event.
///
/// The closure is infallible; no events are dropped by this variant.
///
/// # Contract
///
/// The closure **must not change the event's variant** (log → log, metric → metric, trace →
/// trace). Changing the variant violates the variant-preservation contract and will panic in
/// debug builds.
HardCoded(std::sync::Arc<dyn Fn(&mut crate::event::Event) + Send + Sync>),
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are usually fine with source-level breaking changes, coding this as an enum instead of a tuple (newtype) struct seems to qualify as YAGNI.

Suggested change
pub enum PostProcessor {
/// Call a hard-coded Rust function against every event.
///
/// The closure is infallible; no events are dropped by this variant.
///
/// # Contract
///
/// The closure **must not change the event's variant** (log → log, metric → metric, trace →
/// trace). Changing the variant violates the variant-preservation contract and will panic in
/// debug builds.
HardCoded(std::sync::Arc<dyn Fn(&mut crate::event::Event) + Send + Sync>),
}
pub struct PostProcessor(std::sync::Arc<dyn Fn(&mut crate::event::Event) + Send + Sync>);

}

impl Output {
#[expect(clippy::too_many_arguments)]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is unfortunate. Would it make sense to avoid adding a new parameter and instead add a "builder" function:

    fn with_post_processor(mut self, post_processor: PostProcessor) -> Self;


// Move finalizers out before calling the closure to prevent
// double-counting if the closure mutates the event in-place.
// Clone the remaining metadata so we can restore secrets and

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to take the metadata here and restore it after instead? While clone is clone-on-write optimized, that clone is still expensive.

// those fields are absent on the post-closure event. merge_finalizers()
// re-attaches the finalizers taken above (original_meta has empty
// finalizers at this point, so merge() itself adds nothing for them).
event.metadata_mut().merge(original_meta);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like an issue to resolve in EventMetadata::merge not here.

Comment on lines +219 to +226
// Contract: closure must not change the event's variant.
#[cfg(debug_assertions)]
debug_assert_eq!(
before,
std::mem::discriminant(&event),
"PostProcessor::HardCoded closure changed the event variant; \
this violates the variant-preservation contract"
);

@bruceg bruceg Jun 11, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternate mechanism would be to make PostProcessor a trait with separate methods for the different event types (i.e. LogEvent etc), each returning the same type, making it impossible at a source-code level for this to happen. In fact, that might even be preferred for processor implementors as they will likely have to match on the event type anyways internally.

Thinking further, as to my objection above, making it a trait avoids the need for an enum entirely as well.

Comment on lines +389 to +390
pub(super) fn set_post_processor(&mut self, pp: &PostProcessor) {
self.post_processor = Some(pp.clone());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd rather see methods like this move the clone into the caller by owning the PostProcessor.

}

#[cfg(any(test, feature = "test"))]
pub fn new_test_with_post_processor(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a separate with_post_processor modifier avoids needing this extra function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: core Anything related to core crates i.e. vector-core, core-common, etc

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants