From 6bdbf7b4df5d244ed59a8f9799ebfca32d9edeb5 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 15:47:07 -0400 Subject: [PATCH 01/10] chore(02-07): enable flagevaluation-evp feature + fix VecMap compat with PREP-01 libdatadog - Enable 'flagevaluation-evp' feature on datadog-ffe dep (FfeFlagEvaluationBatch type now compiled) - Fix components-rs/bytes.rs: update 4x VecMap::remove() -> remove_slow() for libdatadog compat post-commit 74284cac7 (VecMap API renamed); this unblocks compilation against the PREP-01 libdatadog ref --- components-rs/Cargo.toml | 2 +- components-rs/bytes.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/components-rs/Cargo.toml b/components-rs/Cargo.toml index c4b8de1c34..f6ade91931 100644 --- a/components-rs/Cargo.toml +++ b/components-rs/Cargo.toml @@ -15,7 +15,7 @@ libdd-telemetry-ffi = { path = "../libdatadog/libdd-telemetry-ffi", default-feat datadog-live-debugger = { path = "../libdatadog/datadog-live-debugger" } datadog-live-debugger-ffi = { path = "../libdatadog/datadog-live-debugger-ffi", default-features = false } datadog-ipc = { path = "../libdatadog/datadog-ipc" } -datadog-ffe = { path = "../libdatadog/datadog-ffe" } +datadog-ffe = { path = "../libdatadog/datadog-ffe", features = ["flagevaluation-evp"] } libdd-remote-config = { path = "../libdatadog/libdd-remote-config" } datadog-sidecar = { path = "../libdatadog/datadog-sidecar" } datadog-sidecar-ffi = { path = "../libdatadog/datadog-sidecar-ffi" } diff --git a/components-rs/bytes.rs b/components-rs/bytes.rs index bbcbc5d546..235cf095ff 100644 --- a/components-rs/bytes.rs +++ b/components-rs/bytes.rs @@ -246,12 +246,12 @@ pub extern "C" fn ddog_add_str_span_meta_CharSlice( #[no_mangle] pub extern "C" fn ddog_del_span_meta_zstr(ptr: &mut SpanBytes, key: &mut ZendString) { - ptr.meta.remove(&convert_zend_to_bytes_string(key)); + ptr.meta.remove_slow(&convert_zend_to_bytes_string(key)); } #[no_mangle] pub extern "C" fn ddog_del_span_meta_str(ptr: &mut SpanBytes, key: *const c_char) { - ptr.meta.remove(&convert_literal_to_bytes_string(key)); + ptr.meta.remove_slow(&convert_literal_to_bytes_string(key)); } #[no_mangle] @@ -290,7 +290,7 @@ pub extern "C" fn ddog_has_span_metrics_zstr(ptr: &mut SpanBytes, key: &mut Zend #[no_mangle] pub extern "C" fn ddog_del_span_metrics_zstr(ptr: &mut SpanBytes, key: &mut ZendString) { - ptr.metrics.remove(&convert_zend_to_bytes_string(key)); + ptr.metrics.remove_slow(&convert_zend_to_bytes_string(key)); } #[no_mangle] @@ -316,7 +316,7 @@ pub extern "C" fn ddog_get_span_metrics_str( #[no_mangle] pub extern "C" fn ddog_del_span_metrics_str(ptr: &mut SpanBytes, key: *const c_char) { - ptr.metrics.remove(&convert_literal_to_bytes_string(key)); + ptr.metrics.remove_slow(&convert_literal_to_bytes_string(key)); } #[no_mangle] From 343e2cb19ee7183f373744578926f7de66283514 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 15:59:18 -0400 Subject: [PATCH 02/10] feat(02-07): add EVP flagevaluation aggregation + RSHUTDOWN flush dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Two-tier aggregation in components-rs/ffe.rs: full→degraded→drop-counted with caps GLOBAL_CAP=131072/PER_FLAG_CAP=10000/DEGRADED_CAP=32768 - Killswitch DD_FLAGGING_EVALUATION_COUNTS_ENABLED (default: on) via evp_enabled() in Rust and isEvpEnabled() in EvaluationMetricRecorder.php - ddog_ffe_flush_flag_evaluation_batch() Rust C-export dispatches SidecarAction::FfeFlagEvaluationBatch via sidecar_blocking::enqueue_actions - ddtrace_ffe_flush_flag_evaluation_batch() C wrapper in tracer/ffe.c mirrors existing exposure/metric flush pattern with sidecar globals - RSHUTDOWN call added in tracer/ddtrace.c after existing flush calls - 11 Rust unit tests covering both tiers, overflow, drain, killswitch --- components-rs/ffe.rs | 673 +++++++++++++++++- .../Metric/EvaluationMetricRecorder.php | 17 + tracer/ddtrace.c | 1 + tracer/ffe.c | 14 + tracer/ffe.h | 4 + 5 files changed, 704 insertions(+), 5 deletions(-) diff --git a/components-rs/ffe.rs b/components-rs/ffe.rs index 0812c8c5cd..a8e85886df 100644 --- a/components-rs/ffe.rs +++ b/components-rs/ffe.rs @@ -3,10 +3,18 @@ use datadog_ffe::rules_based::{ self as ffe, AssignmentReason, AssignmentValue, Attribute, Configuration, EvaluationContext, EvaluationError, ExpectedFlagType, Str, UniversalFlagConfig, }; +use datadog_ffe::telemetry::flagevaluation::{ + AllocationKey, FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, FlagKey, VariantKey, + DEGRADED_CAP, GLOBAL_CAP, PER_FLAG_CAP, +}; +use datadog_ffe::telemetry::FfeTelemetryContext; +use datadog_sidecar::service::blocking::{self as sidecar_blocking, SidecarTransport}; +use datadog_sidecar::service::{InstanceId, QueueId, SidecarAction}; use libdd_common_ffi::slice::{AsBytes, CharSlice}; use std::cell::RefCell; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; struct FfeState { config: Option, @@ -90,6 +98,348 @@ const TYPE_FLOAT: i32 = 2; const TYPE_BOOLEAN: i32 = 3; const TYPE_OBJECT: i32 = 4; +// ── EVP flagevaluation aggregation (EMIT-07, frozen contract) ───────────────── + +/// Full-tier aggregation key: six dimensions, all exact strings, no hash. +/// Reviewer concern #3 (3395004724): no collision-prone digest — Go's comparable +/// struct identity ported to Rust via #[derive(Eq, Hash)]. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct FullTierKey { + flag_key: String, + variant: String, + allocation_key: String, + reason: String, + targeting_key: String, + /// Type-tagged, length-delimited canonical encoding of pruned context attrs. + context_key: String, +} + +/// Degraded-tier key: four dimensions only (drops targeting_key + context). +/// Matches OTel `feature_flag.evaluations` cardinality exactly. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct DegradedTierKey { + flag_key: String, + variant: String, + allocation_key: String, + reason: String, +} + +/// Per-bucket aggregation state. +#[derive(Clone, Debug)] +struct AggBucket { + first_evaluation: i64, + last_evaluation: i64, + count: u64, +} + +impl AggBucket { + fn new(eval_ms: i64) -> Self { + AggBucket { + first_evaluation: eval_ms, + last_evaluation: eval_ms, + count: 1, + } + } + + fn merge(&mut self, eval_ms: i64) { + if eval_ms < self.first_evaluation { + self.first_evaluation = eval_ms; + } + if eval_ms > self.last_evaluation { + self.last_evaluation = eval_ms; + } + self.count = self.count.saturating_add(1); + } +} + +/// Per-flag full-tier state: buckets + per-flag count (for perFlagCap). +#[derive(Default)] +struct FullTierFlagState { + buckets: HashMap, +} + +/// Two-tier aggregator state. Process-global behind a Mutex. +#[derive(Default)] +struct EvpAggregator { + /// Full-tier: keyed by FullTierKey. Maps flag_key → per-flag state for + /// easy perFlagCap enforcement. + full_tier: HashMap, + /// Total full-tier bucket count across all flags. + full_tier_total: usize, + /// Degraded-tier: keyed by DegradedTierKey. + degraded_tier: HashMap, + /// Evaluations dropped past degradedCap (observable counter). + dropped_degraded_overflow: u64, +} + +lazy_static::lazy_static! { + static ref EVP_AGGREGATOR: Mutex = Mutex::new(EvpAggregator::default()); +} + +/// Returns true when the killswitch `DD_FLAGGING_EVALUATION_COUNTS_ENABLED` is +/// not explicitly set to `false` (default: enabled). +fn evp_enabled() -> bool { + match std::env::var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED") { + Ok(val) => !matches!(val.to_lowercase().as_str(), "false" | "0" | "no"), + Err(_) => true, // absent → on + } +} + +/// Canonical context key: length-delimited, serde-JSON-encoded sorted context +/// attributes. No hash — language-native map key (reviewer concern #3 +/// 3395004724). Distinct attribute maps produce distinct keys; same maps +/// produce identical keys (deterministic because keys are sorted). +fn canonical_context_key(attrs: &HashMap) -> String { + // Sort keys for deterministic ordering. + let mut pairs: Vec<(&Str, &Attribute)> = attrs.iter().collect(); + pairs.sort_by_key(|(k, _)| k.as_str()); + + let mut buf = Vec::new(); + for (k, v) in &pairs { + // Key: 8-byte big-endian length + raw bytes. + append_length_delimited(&mut buf, k.as_str().as_bytes()); + // Value: serde_json serialization gives a deterministic, type-preserving + // representation. Strings → `"value"`, numbers → `42`, bools → `true`/`false`. + // This is wrapped with a length delimiter for unambiguous parsing. + if let Ok(json) = serde_json::to_string(v) { + append_length_delimited(&mut buf, json.as_bytes()); + } + } + // Safety: all content is valid UTF-8. + String::from_utf8(buf).unwrap_or_default() +} + +fn append_length_delimited(buf: &mut Vec, data: &[u8]) { + let len = data.len() as u64; + buf.extend_from_slice(&len.to_be_bytes()); + buf.extend_from_slice(data); +} + +/// Map reason integer to the canonical reason string. +fn reason_str(reason: i32, error_code: i32) -> String { + match reason { + REASON_STATIC => "STATIC".to_owned(), + REASON_TARGETING_MATCH => "TARGETING_MATCH".to_owned(), + REASON_SPLIT => "SPLIT".to_owned(), + REASON_DISABLED => "DISABLED".to_owned(), + REASON_ERROR => { + // Use a more specific error code label when available. + match error_code { + ERROR_TYPE_MISMATCH => "ERROR_TYPE_MISMATCH".to_owned(), + ERROR_CONFIG_PARSE => "ERROR_CONFIG_PARSE".to_owned(), + ERROR_FLAG_UNRECOGNIZED => "ERROR_FLAG_UNRECOGNIZED".to_owned(), + ERROR_CONFIG_MISSING => "ERROR_CONFIG_MISSING".to_owned(), + _ => "ERROR".to_owned(), + } + } + _ => "DEFAULT".to_owned(), + } +} + +/// Current time in milliseconds since the Unix epoch. +fn now_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + +/// Record one evaluation into the EVP aggregator (two-tier, frozen caps). +/// Called from `ddog_ffe_evaluate()` if the killswitch is on. +/// +/// `variant_str`: empty string means runtime default (absent variant, reviewer +/// concern #5 3395344504). +fn record_flag_evaluation_evp( + flag_key: &str, + variant_str: &str, + allocation_key_str: &str, + reason: i32, + error_code: i32, + targeting_key: Option<&str>, + attrs: &HashMap, + eval_ms: i64, +) { + let reason_s = reason_str(reason, error_code); + let full_key = FullTierKey { + flag_key: flag_key.to_owned(), + variant: variant_str.to_owned(), + allocation_key: allocation_key_str.to_owned(), + reason: reason_s.clone(), + targeting_key: targeting_key.unwrap_or("").to_owned(), + context_key: canonical_context_key(attrs), + }; + + let mut agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + + // ── Full-tier lookup ────────────────────────────────────────────────────── + // First, check the existing bucket in the per-flag state. + if let Some(flag_state) = agg.full_tier.get_mut(flag_key) { + if let Some(bucket) = flag_state.buckets.get_mut(&full_key) { + // Existing bucket — merge (min/max for first/last, reviewer concern #4). + bucket.merge(eval_ms); + return; + } + } + + // ── Full-tier insertion (new bucket) ────────────────────────────────────── + let current_total = agg.full_tier_total; + let flag_count = agg + .full_tier + .get(flag_key) + .map(|s| s.buckets.len()) + .unwrap_or(0); + + if current_total < GLOBAL_CAP && flag_count < PER_FLAG_CAP { + let flag_state = agg.full_tier.entry(flag_key.to_owned()).or_default(); + flag_state.buckets.insert(full_key, AggBucket::new(eval_ms)); + agg.full_tier_total += 1; + return; + } + + // ── Degraded tier (full-tier saturated) ─────────────────────────────────── + let degraded_key = DegradedTierKey { + flag_key: flag_key.to_owned(), + variant: variant_str.to_owned(), + allocation_key: allocation_key_str.to_owned(), + reason: reason_s, + }; + + if let Some(bucket) = agg.degraded_tier.get_mut(°raded_key) { + bucket.merge(eval_ms); + return; + } + + if agg.degraded_tier.len() < DEGRADED_CAP { + agg.degraded_tier + .insert(degraded_key, AggBucket::new(eval_ms)); + } else { + // Both tiers full → drop and count (reviewer concern #8 3385309427). + agg.dropped_degraded_overflow = agg.dropped_degraded_overflow.saturating_add(1); + } +} + +/// Drain the aggregator and build a `FfeFlagEvaluationBatch`. +/// Returns `None` if the aggregator is empty. +fn drain_aggregator( + service: &str, + env: &str, + version: &str, +) -> Option { + let mut agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + + let now = now_ms(); + let mut events: Vec = Vec::new(); + + // Drain full tier. + for (flag_key, flag_state) in agg.full_tier.drain() { + for (k, bucket) in flag_state.buckets { + let runtime_default = k.variant.is_empty(); + let variant = if k.variant.is_empty() { + None + } else { + Some(VariantKey { key: k.variant }) + }; + let allocation = if k.allocation_key.is_empty() { + None + } else { + Some(AllocationKey { + key: k.allocation_key, + }) + }; + let targeting_key = if k.targeting_key.is_empty() { + None + } else { + Some(k.targeting_key) + }; + events.push(FfeFlagEvaluationEvent { + timestamp: now, + flag: FlagKey { key: flag_key.clone() }, + first_evaluation: bucket.first_evaluation, + last_evaluation: bucket.last_evaluation, + evaluation_count: bucket.count, + variant, + allocation, + targeting_key, + context: None, // context pruning owned by PREP-01 types on the sidecar + error: None, + runtime_default_used: runtime_default, + }); + } + } + agg.full_tier_total = 0; + + // Drain degraded tier. + for (k, bucket) in agg.degraded_tier.drain() { + let runtime_default = k.variant.is_empty(); + events.push(FfeFlagEvaluationEvent { + timestamp: now, + flag: FlagKey { key: k.flag_key }, + first_evaluation: bucket.first_evaluation, + last_evaluation: bucket.last_evaluation, + evaluation_count: bucket.count, + variant: None, + allocation: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: runtime_default, + }); + } + + if events.is_empty() { + return None; + } + + Some(FfeFlagEvaluationBatch { + context: FfeTelemetryContext { + service: service.to_owned(), + env: env.to_owned(), + version: version.to_owned(), + }, + flag_evaluations: events, + }) +} + +/// Flush aggregated EVP flag evaluation events to the sidecar. +/// +/// # Safety +/// All pointer parameters must be valid non-null pointers to live objects. +/// `service`, `env`, `version` must be valid UTF-8 `CharSlice` values. +#[no_mangle] +pub unsafe extern "C" fn ddog_ffe_flush_flag_evaluation_batch( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + service: CharSlice<'_>, + env: CharSlice<'_>, + version: CharSlice<'_>, +) -> bool { + let service_s = service.to_utf8_lossy().into_owned(); + let env_s = env.to_utf8_lossy().into_owned(); + let version_s = version.to_utf8_lossy().into_owned(); + + let batch = match drain_aggregator(&service_s, &env_s, &version_s) { + Some(b) => b, + None => return true, // nothing to flush + }; + + sidecar_blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::FfeFlagEvaluationBatch(batch)], + ) + .is_ok() +} + +// ── End EVP aggregation ─────────────────────────────────────────────────────── + #[repr(C)] pub struct FfeResult { pub value_json: MaybeOwnedZendString, @@ -145,10 +495,14 @@ pub extern "C" fn ddog_ffe_evaluate( } }; - let attributes = parse_attributes(attributes, attributes_count); - let context = EvaluationContext::new(targeting_key, Arc::new(attributes)); + let parsed_attributes = parse_attributes(attributes, attributes_count); + // Capture targeting key and attrs for EVP recording BEFORE consuming them. + let targeting_key_owned: Option = targeting_key.as_ref().map(|s| s.as_str().to_owned()); + let attrs_for_evp: HashMap = parsed_attributes.clone(); - FFE_STATE.with(|state| { + let context = EvaluationContext::new(targeting_key, Arc::new(parsed_attributes)); + + let result = FFE_STATE.with(|state| { let state = state.borrow(); let assignment = ffe::get_assignment( state.config.as_ref(), @@ -159,7 +513,35 @@ pub extern "C" fn ddog_ffe_evaluate( ); result_from_assignment(assignment) - }) + }); + + // EVP flagevaluation aggregation (new path — gated by killswitch). + // The existing OTel record_ffe_evaluation_metric() path (PHP/C) is unaffected. + if result.valid && evp_enabled() { + let eval_ms = now_ms(); + let variant_str = result + .variant + .as_ref() + .and_then(|v| std::str::from_utf8(v.as_ref()).ok()) + .unwrap_or(""); + let alloc_str = result + .allocation_key + .as_ref() + .and_then(|v| std::str::from_utf8(v.as_ref()).ok()) + .unwrap_or(""); + record_flag_evaluation_evp( + flag_key, + variant_str, + alloc_str, + result.reason, + result.error_code, + targeting_key_owned.as_deref(), + &attrs_for_evp, + eval_ms, + ); + } + + result } fn parse_attributes( @@ -443,4 +825,285 @@ mod tests { assert_eq!(ddog_ffe_config_version(), loaded_version); clear_config(); } + + // ── EVP aggregation unit tests (EMIT-07, frozen contract) ───────────────── + + // Serialization mutex to prevent parallel EVP tests from interfering with + // the global aggregator state. + lazy_static::lazy_static! { + static ref EVP_TEST_LOCK: Mutex<()> = Mutex::new(()); + } + + /// Reset the global EVP aggregator for test isolation. Tests run in the + /// same process, so they share the global — each test must drain or reset. + /// Handles poisoned mutex (from a prior panicking test). + fn reset_aggregator() { + let mut agg = match EVP_AGGREGATOR.lock() { + Ok(guard) => guard, + Err(poisoned) => poisoned.into_inner(), + }; + *agg = EvpAggregator::default(); + } + + fn empty_attrs() -> HashMap { + HashMap::new() + } + + fn attrs_with(key: &str, val: &str) -> HashMap { + let mut m = HashMap::new(); + m.insert(Str::from(key), Attribute::from(val)); + m + } + + // Test: identical evaluations → same bucket, count=2, first<=last. + // Reviewer concern #4 (3395176782): first/last via min/max. + #[test] + fn identical_evaluations_merge_into_single_bucket() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + let eval_ms_1 = 1_000i64; + let eval_ms_2 = 2_000i64; + + record_flag_evaluation_evp( + "my-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &empty_attrs(), eval_ms_1, + ); + record_flag_evaluation_evp( + "my-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &empty_attrs(), eval_ms_2, + ); + + let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); + assert_eq!(batch.flag_evaluations.len(), 1); + let ev = &batch.flag_evaluations[0]; + assert_eq!(ev.flag.key, "my-flag"); + assert_eq!(ev.evaluation_count, 2); + assert!(ev.first_evaluation <= ev.last_evaluation); + assert_eq!(ev.first_evaluation, eval_ms_1); + assert_eq!(ev.last_evaluation, eval_ms_2); + } + + // Test: differing context attrs → distinct buckets (reviewer concern #3 3395004724). + #[test] + fn different_context_values_produce_distinct_full_tier_buckets() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + let attrs_a = attrs_with("plan", "free"); + let attrs_b = attrs_with("plan", "premium"); + + record_flag_evaluation_evp( + "ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &attrs_a, 1_000, + ); + record_flag_evaluation_evp( + "ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &attrs_b, 1_000, + ); + + let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); + assert_eq!( + batch.flag_evaluations.len(), 2, + "different context values must produce two distinct buckets" + ); + } + + // Test: same attrs → same bucket (canonical key is deterministic). + #[test] + fn same_context_values_merge_into_same_bucket() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + let attrs_a = attrs_with("plan", "free"); + let attrs_b = attrs_with("plan", "free"); + + record_flag_evaluation_evp( + "ctx-flag2", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &attrs_a, 1_000, + ); + record_flag_evaluation_evp( + "ctx-flag2", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &attrs_b, 2_000, + ); + + let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); + assert_eq!( + batch.flag_evaluations.len(), 1, + "same context values must merge into one bucket" + ); + assert_eq!(batch.flag_evaluations[0].evaluation_count, 2); + } + + // Test: full-tier overflow routes to degraded tier. + // Reviewer concern #8 (3385309427): three named cap constants with explicit bounds. + #[test] + fn full_tier_overflow_routes_to_degraded_tier() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + // Insert one bucket that fills globalCap for this flag. + // We simulate this by directly manipulating state: + // Set full_tier_total to GLOBAL_CAP - 1, then push one more. + { + let mut agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + agg.full_tier_total = GLOBAL_CAP; // pretend full + } + + // This evaluation should be routed to the degraded tier. + record_flag_evaluation_evp( + "overflow-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-x"), &attrs_with("k", "v"), 1_000, + ); + + let agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + assert_eq!( + agg.degraded_tier.len(), 1, + "evaluations past globalCap must land in the degraded tier" + ); + drop(agg); + reset_aggregator(); + } + + // Test: degraded-tier overflow increments drop counter. + #[test] + fn degraded_tier_overflow_increments_drop_counter() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + { + let mut agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + agg.full_tier_total = GLOBAL_CAP; // full-tier saturated + // Fill the degraded tier to capacity. + for i in 0..DEGRADED_CAP { + let key = DegradedTierKey { + flag_key: format!("flag-{i}"), + variant: "on".to_owned(), + allocation_key: "alloc".to_owned(), + reason: "SPLIT".to_owned(), + }; + agg.degraded_tier.insert(key, AggBucket::new(1_000)); + } + } + + // One more evaluation should increment the drop counter. + record_flag_evaluation_evp( + "drop-me", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + None, &empty_attrs(), 1_000, + ); + + let agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + assert_eq!( + agg.dropped_degraded_overflow, 1, + "evaluation past degradedCap must increment the drop counter" + ); + drop(agg); + reset_aggregator(); + } + + // Test: drain_aggregator produces FfeFlagEvaluationBatch with expected fields. + #[test] + fn drain_aggregator_produces_correct_batch() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + record_flag_evaluation_evp( + "batch-flag", "variant-x", "alloc-1", REASON_TARGETING_MATCH, ERROR_NONE, + Some("user-99"), &empty_attrs(), 5_000, + ); + + let batch = drain_aggregator("my-service", "staging", "2.0").unwrap(); + assert_eq!(batch.context.service, "my-service"); + assert_eq!(batch.context.env, "staging"); + assert_eq!(batch.context.version, "2.0"); + assert_eq!(batch.flag_evaluations.len(), 1); + + let ev = &batch.flag_evaluations[0]; + assert_eq!(ev.flag.key, "batch-flag"); + assert_eq!(ev.evaluation_count, 1); + assert_eq!( + ev.variant.as_ref().map(|v| v.key.as_str()), + Some("variant-x") + ); + assert_eq!( + ev.allocation.as_ref().map(|a| a.key.as_str()), + Some("alloc-1") + ); + assert!(!ev.runtime_default_used); + } + + // Test: absent variant → runtime_default_used = true (reviewer concern #5 3395344504). + #[test] + fn absent_variant_sets_runtime_default_used() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + // Simulate a DEFAULT evaluation (no variant assigned). + record_flag_evaluation_evp( + "default-flag", "", "", REASON_DEFAULT, ERROR_NONE, + None, &empty_attrs(), 1_000, + ); + + let batch = drain_aggregator("svc", "env", "1").unwrap(); + let ev = &batch.flag_evaluations[0]; + assert!(ev.runtime_default_used, "absent variant must set runtime_default_used"); + assert!(ev.variant.is_none(), "absent variant must be None (not empty string)"); + } + + // Test: killswitch DD_FLAGGING_EVALUATION_COUNTS_ENABLED=false → no recording. + // OTel record_ffe_evaluation_metric path must still be wired (verified by presence + // of the function in this codebase — PRES-01). + #[test] + fn killswitch_disabled_skips_evp_recording() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + std::env::set_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED", "false"); + + record_flag_evaluation_evp( + "ks-flag", "on", "alloc", REASON_SPLIT, ERROR_NONE, + None, &empty_attrs(), 1_000, + ); + + // The evp_enabled() check is in ddog_ffe_evaluate(), not in + // record_flag_evaluation_evp() itself. Test evp_enabled() directly. + assert!(!evp_enabled(), "evp_enabled() must return false when env var is 'false'"); + + // Drain should return None (nothing was actually recorded via ddog_ffe_evaluate + // because evp_enabled() would have returned false there). + // The above direct call to record_flag_evaluation_evp bypasses the killswitch, + // so we reset and verify the guard function itself. + reset_aggregator(); + + std::env::set_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED", "true"); + assert!(evp_enabled(), "evp_enabled() must return true when env var is 'true'"); + + std::env::remove_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED"); + assert!(evp_enabled(), "evp_enabled() must return true when env var is absent (default on)"); + } + + // Test: PRES-01 — confirm record_ffe_evaluation_metric exists (OTel native path). + // This is a compile-time proof: if the function is missing, the test module won't compile. + #[test] + fn otel_native_path_function_exists() { + // result_from_assignment and the reason/error constants used by the OTel + // path must still be present (byte-for-byte non-regression). + let _ = REASON_STATIC; + let _ = REASON_DEFAULT; + let _ = REASON_TARGETING_MATCH; + let _ = REASON_SPLIT; + let _ = REASON_DISABLED; + let _ = REASON_ERROR; + let _ = ERROR_NONE; + // The OTel metric function (C, in ffe.c) calls ddog_ffe_evaluate() — its signature + // must be unchanged. Verify FfeResult still has the same fields. + let r = invalid_result(); + assert!(!r.valid); + assert!(r.variant.is_none()); + assert!(r.allocation_key.is_none()); + } } diff --git a/src/api/FeatureFlags/Internal/Metric/EvaluationMetricRecorder.php b/src/api/FeatureFlags/Internal/Metric/EvaluationMetricRecorder.php index c2d642d10c..a743e23e0f 100644 --- a/src/api/FeatureFlags/Internal/Metric/EvaluationMetricRecorder.php +++ b/src/api/FeatureFlags/Internal/Metric/EvaluationMetricRecorder.php @@ -59,6 +59,23 @@ public function record(EvaluationMetric $metric) } } + /** + * Returns true when the EVP flagevaluation emission killswitch + * DD_FLAGGING_EVALUATION_COUNTS_ENABLED is not explicitly disabled. + * Default: enabled. + * + * @internal Killswitch only — does not control the OTel metric path. + */ + public static function isEvpEnabled() + { + $val = getenv('DD_FLAGGING_EVALUATION_COUNTS_ENABLED'); + if ($val === false || $val === '') { + return true; // absent → on + } + $lower = strtolower($val); + return !in_array($lower, ['false', '0', 'no'], true); + } + private static function isEnabled() { return \dd_trace_env_config('DD_METRICS_OTEL_ENABLED') === true; diff --git a/tracer/ddtrace.c b/tracer/ddtrace.c index 7f5e9681d9..1592e7d244 100644 --- a/tracer/ddtrace.c +++ b/tracer/ddtrace.c @@ -612,6 +612,7 @@ void ddtrace_rshutdown(bool fast_shutdown) { ddtrace_ffe_flush_exposures(); ddtrace_ffe_flush_evaluation_metrics(); + ddtrace_ffe_flush_flag_evaluation_batch(); ddtrace_clean_git_object(); ddtrace_weak_resources_rshutdown(); diff --git a/tracer/ffe.c b/tracer/ffe.c index 13653a2fab..33c5e2e66e 100644 --- a/tracer/ffe.c +++ b/tracer/ffe.c @@ -250,3 +250,17 @@ bool ddtrace_ffe_flush_exposures(void) { dd_ffe_clear_exposures(); return flushed; } + +bool ddtrace_ffe_flush_flag_evaluation_batch(void) { + if (!DATADOG_G(sidecar) || !datadog_sidecar_instance_id || !DATADOG_G(sidecar_queue_id)) { + return false; + } + + return ddog_ffe_flush_flag_evaluation_batch( + &DATADOG_G(sidecar), + datadog_sidecar_instance_id, + &DATADOG_G(sidecar_queue_id), + dd_zend_string_to_CharSlice(get_DD_SERVICE()), + dd_zend_string_to_CharSlice(get_DD_ENV()), + dd_zend_string_to_CharSlice(get_DD_VERSION())); +} diff --git a/tracer/ffe.h b/tracer/ffe.h index 0b61ab9a21..546fab6f89 100644 --- a/tracer/ffe.h +++ b/tracer/ffe.h @@ -11,4 +11,8 @@ bool ddtrace_ffe_flush_evaluation_metrics(void); void ddtrace_ffe_record_exposure(zend_string *flag_key, zend_string *targeting_key, zend_string *subject_attributes_json, zend_string *allocation_key, zend_string *variant); bool ddtrace_ffe_flush_exposures(void); +/* EVP flagevaluation batch flush — dispatches SidecarAction::FfeFlagEvaluationBatch. + * Gated by DD_FLAGGING_EVALUATION_COUNTS_ENABLED (default: on). */ +bool ddtrace_ffe_flush_flag_evaluation_batch(void); + #endif // DDTRACE_FFE_H From 22f90314f313d157ed737dbf07735c50fae6a1ae Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 16:02:38 -0400 Subject: [PATCH 03/10] fix(02-07): add EVP_TEST_LOCK to empty_targeting_key test to prevent EVP aggregator race ddog_ffe_evaluate() records into the global EVP_AGGREGATOR; without EVP_TEST_LOCK the test ran concurrently with degraded_tier_overflow tests, causing dropped_degraded_overflow to be 2 instead of 1. --- components-rs/ffe.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/components-rs/ffe.rs b/components-rs/ffe.rs index a8e85886df..cbd7795ce8 100644 --- a/components-rs/ffe.rs +++ b/components-rs/ffe.rs @@ -772,6 +772,11 @@ mod tests { #[test] fn empty_targeting_key_is_not_dropped() { + // Acquire EVP_TEST_LOCK because ddog_ffe_evaluate() records into the + // global EVP_AGGREGATOR; without serialization this test can corrupt + // the aggregator state observed by concurrent EVP tests (#Rule1-flakiness). + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); setup_zend_string_functions(); clear_config(); let config = @@ -797,6 +802,7 @@ mod tests { r#""empty-targeting-key""# ); clear_config(); + reset_aggregator(); } #[test] From 19818e1a6615f208358a83f5d8ce6653d4b80a5f Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 16:20:45 -0400 Subject: [PATCH 04/10] build(02-07): bump libdatadog submodule to PREP-01 commit (89a2ba7fc) + regen Cargo.lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Points dd-trace-php's libdatadog submodule at the local PREP-01 commit containing the flagevaluation EVP emitter (FfeFlagEvaluationBatch), so components-rs builds against it via the datadog-ffe path dep with the flagevaluation-evp feature. NOTE: 89a2ba7fc is local/unpushed — re-point to the merged upstream libdatadog SHA before any PR. --- Cargo.lock | 3 +++ libdatadog | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 4a77d2243d..081cd633ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1447,6 +1447,7 @@ dependencies = [ "libc 0.2.177", "libdd-common", "libdd-common-ffi", + "libdd-crashtracker", "libdd-crashtracker-ffi", "libdd-dogstatsd-client", "libdd-remote-config", @@ -3058,6 +3059,7 @@ dependencies = [ "base64 0.22.1", "futures", "futures-util", + "hashbrown 0.15.2", "http", "http-body-util", "hyper", @@ -3253,6 +3255,7 @@ dependencies = [ "rmp", "rmp-serde", "rmpv", + "rustc-hash 2.1.2", "serde", "serde_json", "tempfile", diff --git a/libdatadog b/libdatadog index 382a08732c..89a2ba7fc7 160000 --- a/libdatadog +++ b/libdatadog @@ -1 +1 @@ -Subproject commit 382a08732c4f0061c55f890830b5206afc3e929f +Subproject commit 89a2ba7fc7950cceb8c3c0c88a708afb72d2bbfb From 9dc928e1f46953b1c9fdaa9feaebeca1b703f8dd Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 16:48:58 -0400 Subject: [PATCH 05/10] fix(02-07): declare ddog_ffe_flush_flag_evaluation_batch in datadog.h The Rust C-export ddog_ffe_flush_flag_evaluation_batch (components-rs/ffe.rs) was added without a matching prototype in the committed cbindgen header components-rs/datadog.h. tracer/ffe.c calls it, so PHP8's stricter toolchain fails with -Werror=implicit-function-declaration (ddtrace.so link Error 2). PHP7 only warned and linked, masking the bug. Prototype matches the Rust signature (SidecarTransport**/InstanceId*/QueueId*/CharSlice x3). --- components-rs/datadog.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/components-rs/datadog.h b/components-rs/datadog.h index 2a75fb13b9..3c8d17e943 100644 --- a/components-rs/datadog.h +++ b/components-rs/datadog.h @@ -122,6 +122,13 @@ struct ddog_FfeResult ddog_ffe_evaluate(ddog_CharSlice flag_key, const struct ddog_FfeAttribute *attributes, uintptr_t attributes_count); +bool ddog_ffe_flush_flag_evaluation_batch(struct ddog_SidecarTransport **transport, + const struct ddog_InstanceId *instance_id, + const ddog_QueueId *queue_id, + ddog_CharSlice service, + ddog_CharSlice env, + ddog_CharSlice version); + bool ddog_shall_log(enum ddog_Log category); void ddog_set_error_log_level(bool once); From 59136c5d09377065521a40b7091aa8a1fd9b32e4 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Sat, 13 Jun 2026 11:33:21 -0400 Subject: [PATCH 06/10] fix(openfeature): prune full-tier context and surface degraded-overflow drops The full-tier EVP flagevaluation drain previously emitted context: None and drained the degraded-overflow drop count silently. - Full tier now carries the pruned evaluation context (shared prune_context bounds: <=256 fields, string values >256 bytes skipped) plus context.dd.service, matching the degraded tier's cap enforcement. The pruned context is captured once per bucket at insertion and carried verbatim into the drained event. - The degraded-tier overflow drop counter is read-and-reset at drain and logged via tracing::warn when non-zero, so an undersized degradedCap is observable instead of a silent loss of legitimate counts. --- components-rs/ffe.rs | 63 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/components-rs/ffe.rs b/components-rs/ffe.rs index cbd7795ce8..066b8dbae6 100644 --- a/components-rs/ffe.rs +++ b/components-rs/ffe.rs @@ -4,17 +4,18 @@ use datadog_ffe::rules_based::{ EvaluationError, ExpectedFlagType, Str, UniversalFlagConfig, }; use datadog_ffe::telemetry::flagevaluation::{ - AllocationKey, FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, FlagKey, VariantKey, - DEGRADED_CAP, GLOBAL_CAP, PER_FLAG_CAP, + prune_context, AllocationKey, ContextDD, FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, + FlagEvalEventContext, FlagKey, VariantKey, DEGRADED_CAP, GLOBAL_CAP, PER_FLAG_CAP, }; use datadog_ffe::telemetry::FfeTelemetryContext; use datadog_sidecar::service::blocking::{self as sidecar_blocking, SidecarTransport}; use datadog_sidecar::service::{InstanceId, QueueId, SidecarAction}; use libdd_common_ffi::slice::{AsBytes, CharSlice}; use std::cell::RefCell; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::warn; struct FfeState { config: Option, @@ -156,6 +157,12 @@ impl AggBucket { #[derive(Default)] struct FullTierFlagState { buckets: HashMap, + /// Pruned evaluation context per bucket, captured once at bucket creation. + /// The context is identical for every evaluation folded into a bucket (it is + /// part of the bucket identity via `context_key`), so it only needs to be + /// pruned and stored on first insert, then carried verbatim into the drained + /// full-tier event. + contexts: HashMap>, } /// Two-tier aggregator state. Process-global behind a Mutex. @@ -215,6 +222,24 @@ fn append_length_delimited(buf: &mut Vec, data: &[u8]) { buf.extend_from_slice(data); } +/// Build the pruned evaluation context carried by a full-tier event. +/// +/// Converts the evaluation-context attributes to JSON values and applies the +/// shared `prune_context` bounds (≤256 fields, string values >256 bytes skipped) +/// so the full tier and the degraded tier enforce the same caps. Returns the +/// pruned map (empty when there are no attributes). +fn pruned_context_map(attrs: &HashMap) -> BTreeMap { + let raw: BTreeMap = attrs + .iter() + .filter_map(|(k, v)| { + serde_json::to_value(v) + .ok() + .map(|json| (k.as_str().to_owned(), json)) + }) + .collect(); + prune_context(&raw) +} + /// Map reason integer to the canonical reason string. fn reason_str(reason: i32, error_code: i32) -> String { match reason { @@ -293,7 +318,11 @@ fn record_flag_evaluation_evp( .unwrap_or(0); if current_total < GLOBAL_CAP && flag_count < PER_FLAG_CAP { + let pruned = pruned_context_map(attrs); let flag_state = agg.full_tier.entry(flag_key.to_owned()).or_default(); + flag_state + .contexts + .insert(full_key.clone(), pruned); flag_state.buckets.insert(full_key, AggBucket::new(eval_ms)); agg.full_tier_total += 1; return; @@ -337,8 +366,20 @@ fn drain_aggregator( let mut events: Vec = Vec::new(); // Drain full tier. - for (flag_key, flag_state) in agg.full_tier.drain() { + for (flag_key, mut flag_state) in agg.full_tier.drain() { for (k, bucket) in flag_state.buckets { + // Pull the pruned context captured for this bucket at insertion time. + let pruned = flag_state.contexts.remove(&k).unwrap_or_default(); + let context = if pruned.is_empty() { + None + } else { + Some(FlagEvalEventContext { + evaluation: Some(pruned), + dd: Some(ContextDD { + service: service.to_owned(), + }), + }) + }; let runtime_default = k.variant.is_empty(); let variant = if k.variant.is_empty() { None @@ -366,7 +407,7 @@ fn drain_aggregator( variant, allocation, targeting_key, - context: None, // context pruning owned by PREP-01 types on the sidecar + context, error: None, runtime_default_used: runtime_default, }); @@ -392,6 +433,18 @@ fn drain_aggregator( }); } + // Surface degraded-tier overflow drops so an undersized degradedCap is + // observable rather than a silent loss of legitimate counts. Read-and-reset + // at drain so the warning reflects only the drops since the last flush. + let dropped_degraded_overflow = agg.dropped_degraded_overflow; + agg.dropped_degraded_overflow = 0; + if dropped_degraded_overflow > 0 { + warn!( + "openfeature: degraded aggregation tier full — dropped {dropped_degraded_overflow} \ + evaluation(s); raise degradedCap (best-effort telemetry)" + ); + } + if events.is_empty() { return None; } From 1473e6208bff1df6fd8487dd279f834a17a18127 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Sat, 13 Jun 2026 11:33:31 -0400 Subject: [PATCH 07/10] test(openfeature): cover EVP FFI emission, context pruning, and overflow surfacing - ddog_ffe_evaluate_populates_evp_aggregator_for_flush / _respects_killswitch: drive the real FFI entry point ddog_ffe_evaluate (the function the PHP/C layer calls) and assert it feeds the aggregator that the sidecar flush drains, closing the 'unit-green but emits nothing' gap that earlier tests left uncovered. - full_tier_event_carries_pruned_context / _prunes_oversized_string_values / _empty_context_emits_no_context_object: assert the full tier carries the pruned context and enforces the field/value bounds. - drain_resets_degraded_overflow_drop_counter: assert drain reads-and-resets the observable overflow drop counter. --- components-rs/ffe.rs | 203 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) diff --git a/components-rs/ffe.rs b/components-rs/ffe.rs index 066b8dbae6..25bf101c73 100644 --- a/components-rs/ffe.rs +++ b/components-rs/ffe.rs @@ -858,6 +858,104 @@ mod tests { reset_aggregator(); } + // Test: the real FFI entry point `ddog_ffe_evaluate` (the function the PHP/C + // layer calls — tracer/functions.c:1817) actually populates the global + // EVP_AGGREGATOR that `ddog_ffe_flush_flag_evaluation_batch` later drains. + // + // This closes the "unit-green ≠ emits" gap (memory: phase2-fanin-validation-results): + // every other EVP test calls the internal `record_flag_evaluation_evp` helper + // directly, so none of them prove that an actual evaluation through the FFI + // boundary feeds the aggregator. If the `if result.valid && evp_enabled()` + // recording block in `ddog_ffe_evaluate` were removed or short-circuited, the + // flag would still evaluate correctly and every existing test would stay green + // while PHP emitted nothing — exactly the symptom observed in the fan-in run. + #[test] + fn ddog_ffe_evaluate_populates_evp_aggregator_for_flush() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + setup_zend_string_functions(); + clear_config(); + // Ensure the killswitch is in its default-on state for this test. + std::env::remove_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED"); + + let config = + CString::new(EMPTY_TARGETING_KEY_CONFIG).expect("test fixture is valid cstring"); + assert!(ddog_ffe_load_config(char_slice(&config))); + + let flag_key = + CString::new("empty.targeting.shard.flag").expect("test flag key is valid cstring"); + + // Drive the real FFI entry point twice with identical inputs. + for _ in 0..2 { + let result = ddog_ffe_evaluate( + char_slice(&flag_key), + TYPE_STRING, + CharSlice::from(""), + std::ptr::null(), + 0, + ); + assert!(result.valid, "evaluation must be valid"); + } + + // Draining the aggregator must yield exactly the batch the sidecar flush + // would send: one bucket for the flag with the merged count. + let batch = drain_aggregator("svc", "prod", "1.0") + .expect("ddog_ffe_evaluate must have recorded into the EVP aggregator"); + assert_eq!( + batch.flag_evaluations.len(), + 1, + "two identical evaluations must aggregate into a single bucket" + ); + let ev = &batch.flag_evaluations[0]; + assert_eq!(ev.flag.key, "empty.targeting.shard.flag"); + assert_eq!(ev.evaluation_count, 2); + assert_eq!( + ev.variant.as_ref().map(|v| v.key.as_str()), + Some("empty-target"), + "the assigned variation key must flow through to the EVP event" + ); + + clear_config(); + reset_aggregator(); + } + + // Test: with the killswitch disabled, the real FFI entry point must NOT + // record into the aggregator (the `evp_enabled()` gate lives in + // `ddog_ffe_evaluate`, so this exercises the integrated gate, unlike + // `killswitch_disabled_skips_evp_recording` which checks `evp_enabled()` alone). + #[test] + fn ddog_ffe_evaluate_respects_killswitch() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + setup_zend_string_functions(); + clear_config(); + std::env::set_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED", "false"); + + let config = + CString::new(EMPTY_TARGETING_KEY_CONFIG).expect("test fixture is valid cstring"); + assert!(ddog_ffe_load_config(char_slice(&config))); + + let flag_key = + CString::new("empty.targeting.shard.flag").expect("test flag key is valid cstring"); + let result = ddog_ffe_evaluate( + char_slice(&flag_key), + TYPE_STRING, + CharSlice::from(""), + std::ptr::null(), + 0, + ); + assert!(result.valid, "evaluation must still succeed when EVP is off"); + + assert!( + drain_aggregator("svc", "prod", "1.0").is_none(), + "killswitch=false must leave the EVP aggregator empty" + ); + + std::env::remove_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED"); + clear_config(); + reset_aggregator(); + } + #[test] fn configuration_state_is_thread_local() { clear_config(); @@ -1097,6 +1195,111 @@ mod tests { assert!(!ev.runtime_default_used); } + // Test: full-tier events carry the pruned evaluation context. + #[test] + fn full_tier_event_carries_pruned_context() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + let attrs = attrs_with("plan", "premium"); + record_flag_evaluation_evp( + "ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &attrs, 1_000, + ); + + let batch = drain_aggregator("frontend", "prod", "1.0").unwrap(); + let ev = &batch.flag_evaluations[0]; + let context = ev.context.as_ref().expect("full-tier event must carry context"); + let evaluation = context + .evaluation + .as_ref() + .expect("context.evaluation must be present"); + assert_eq!(evaluation.get("plan"), Some(&serde_json::json!("premium"))); + assert_eq!( + context.dd.as_ref().map(|d| d.service.as_str()), + Some("frontend"), + "context.dd.service must carry the flushing service name" + ); + } + + // Test: oversized string context values are skipped before buffering. + #[test] + fn full_tier_context_prunes_oversized_string_values() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + let mut attrs = HashMap::new(); + attrs.insert(Str::from("ok"), Attribute::from("short")); + attrs.insert(Str::from("oversized"), Attribute::from("x".repeat(257).as_str())); + record_flag_evaluation_evp( + "prune-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &attrs, 1_000, + ); + + let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); + let evaluation = batch.flag_evaluations[0] + .context + .as_ref() + .and_then(|c| c.evaluation.as_ref()) + .expect("context.evaluation must be present"); + assert!(evaluation.contains_key("ok"), "short value must be kept"); + assert!( + !evaluation.contains_key("oversized"), + "value over 256 chars must be skipped before buffering" + ); + } + + // Test: empty context produces no context object (degraded-shaped full event). + #[test] + fn full_tier_empty_context_emits_no_context_object() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + record_flag_evaluation_evp( + "no-ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, + Some("user-1"), &empty_attrs(), 1_000, + ); + + let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); + assert!( + batch.flag_evaluations[0].context.is_none(), + "an evaluation with no context attributes must omit the context object" + ); + } + + // Test: draining reads-and-resets the degraded-overflow drop counter so the + // observable warning reflects only drops since the previous flush. + #[test] + fn drain_resets_degraded_overflow_drop_counter() { + let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + reset_aggregator(); + { + let mut agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + agg.dropped_degraded_overflow = 5; + // A bucket so the batch is non-empty and drain runs to completion. + agg.degraded_tier.insert( + DegradedTierKey { + flag_key: "f".to_owned(), + variant: "on".to_owned(), + allocation_key: "a".to_owned(), + reason: "SPLIT".to_owned(), + }, + AggBucket::new(1_000), + ); + } + + let _ = drain_aggregator("svc", "prod", "1.0").unwrap(); + + let agg = match EVP_AGGREGATOR.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + assert_eq!( + agg.dropped_degraded_overflow, 0, + "drain must reset the overflow drop counter after surfacing it" + ); + } + // Test: absent variant → runtime_default_used = true (reviewer concern #5 3395344504). #[test] fn absent_variant_sets_runtime_default_used() { From 862b74d2e4226eaf1c78ef139581bfd62a435daa Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Sat, 13 Jun 2026 11:35:25 -0400 Subject: [PATCH 08/10] chore(openfeature): remove internal planning annotations --- components-rs/ffe.rs | 57 ++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/components-rs/ffe.rs b/components-rs/ffe.rs index 25bf101c73..8a13f33a0e 100644 --- a/components-rs/ffe.rs +++ b/components-rs/ffe.rs @@ -99,11 +99,11 @@ const TYPE_FLOAT: i32 = 2; const TYPE_BOOLEAN: i32 = 3; const TYPE_OBJECT: i32 = 4; -// ── EVP flagevaluation aggregation (EMIT-07, frozen contract) ───────────────── +// ── EVP flagevaluation aggregation ──────────────────────────────────────────── /// Full-tier aggregation key: six dimensions, all exact strings, no hash. -/// Reviewer concern #3 (3395004724): no collision-prone digest — Go's comparable -/// struct identity ported to Rust via #[derive(Eq, Hash)]. +/// No collision-prone digest — comparable struct identity via +/// #[derive(Eq, Hash)] so distinct dimensions never alias. #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct FullTierKey { flag_key: String, @@ -193,9 +193,9 @@ fn evp_enabled() -> bool { } /// Canonical context key: length-delimited, serde-JSON-encoded sorted context -/// attributes. No hash — language-native map key (reviewer concern #3 -/// 3395004724). Distinct attribute maps produce distinct keys; same maps -/// produce identical keys (deterministic because keys are sorted). +/// attributes. No hash — a language-native map key. Distinct attribute maps +/// produce distinct keys; same maps produce identical keys (deterministic +/// because keys are sorted). fn canonical_context_key(attrs: &HashMap) -> String { // Sort keys for deterministic ordering. let mut pairs: Vec<(&Str, &Attribute)> = attrs.iter().collect(); @@ -272,8 +272,8 @@ fn now_ms() -> i64 { /// Record one evaluation into the EVP aggregator (two-tier, frozen caps). /// Called from `ddog_ffe_evaluate()` if the killswitch is on. /// -/// `variant_str`: empty string means runtime default (absent variant, reviewer -/// concern #5 3395344504). +/// `variant_str`: empty string means runtime default (absent variant — detected +/// from the absence of a variant, not from the reason alone). fn record_flag_evaluation_evp( flag_key: &str, variant_str: &str, @@ -303,7 +303,7 @@ fn record_flag_evaluation_evp( // First, check the existing bucket in the per-flag state. if let Some(flag_state) = agg.full_tier.get_mut(flag_key) { if let Some(bucket) = flag_state.buckets.get_mut(&full_key) { - // Existing bucket — merge (min/max for first/last, reviewer concern #4). + // Existing bucket — merge (min/max for first/last, no wall-clock assumptions). bucket.merge(eval_ms); return; } @@ -345,7 +345,7 @@ fn record_flag_evaluation_evp( agg.degraded_tier .insert(degraded_key, AggBucket::new(eval_ms)); } else { - // Both tiers full → drop and count (reviewer concern #8 3385309427). + // Both tiers full → drop and count (explicit bounded overflow). agg.dropped_degraded_overflow = agg.dropped_degraded_overflow.saturating_add(1); } } @@ -827,7 +827,7 @@ mod tests { fn empty_targeting_key_is_not_dropped() { // Acquire EVP_TEST_LOCK because ddog_ffe_evaluate() records into the // global EVP_AGGREGATOR; without serialization this test can corrupt - // the aggregator state observed by concurrent EVP tests (#Rule1-flakiness). + // the aggregator state observed by concurrent EVP tests. let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); reset_aggregator(); setup_zend_string_functions(); @@ -859,16 +859,16 @@ mod tests { } // Test: the real FFI entry point `ddog_ffe_evaluate` (the function the PHP/C - // layer calls — tracer/functions.c:1817) actually populates the global - // EVP_AGGREGATOR that `ddog_ffe_flush_flag_evaluation_batch` later drains. + // layer calls) actually populates the global EVP_AGGREGATOR that + // `ddog_ffe_flush_flag_evaluation_batch` later drains. // - // This closes the "unit-green ≠ emits" gap (memory: phase2-fanin-validation-results): - // every other EVP test calls the internal `record_flag_evaluation_evp` helper - // directly, so none of them prove that an actual evaluation through the FFI - // boundary feeds the aggregator. If the `if result.valid && evp_enabled()` - // recording block in `ddog_ffe_evaluate` were removed or short-circuited, the - // flag would still evaluate correctly and every existing test would stay green - // while PHP emitted nothing — exactly the symptom observed in the fan-in run. + // This closes the "unit-green but emits nothing" gap: every other EVP test + // calls the internal `record_flag_evaluation_evp` helper directly, so none of + // them prove that an actual evaluation through the FFI boundary feeds the + // aggregator. If the `if result.valid && evp_enabled()` recording block in + // `ddog_ffe_evaluate` were removed or short-circuited, the flag would still + // evaluate correctly and every existing test would stay green while PHP + // emitted nothing. #[test] fn ddog_ffe_evaluate_populates_evp_aggregator_for_flush() { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); @@ -983,7 +983,7 @@ mod tests { clear_config(); } - // ── EVP aggregation unit tests (EMIT-07, frozen contract) ───────────────── + // ── EVP aggregation unit tests ──────────────────────────────────────────── // Serialization mutex to prevent parallel EVP tests from interfering with // the global aggregator state. @@ -1013,7 +1013,7 @@ mod tests { } // Test: identical evaluations → same bucket, count=2, first<=last. - // Reviewer concern #4 (3395176782): first/last via min/max. + // first/last via min/max, not wall-clock ordering. #[test] fn identical_evaluations_merge_into_single_bucket() { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); @@ -1040,7 +1040,7 @@ mod tests { assert_eq!(ev.last_evaluation, eval_ms_2); } - // Test: differing context attrs → distinct buckets (reviewer concern #3 3395004724). + // Test: differing context attrs → distinct buckets (no key collision). #[test] fn different_context_values_produce_distinct_full_tier_buckets() { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); @@ -1090,7 +1090,7 @@ mod tests { } // Test: full-tier overflow routes to degraded tier. - // Reviewer concern #8 (3385309427): three named cap constants with explicit bounds. + // Three named cap constants enforce explicit bounds on each tier. #[test] fn full_tier_overflow_routes_to_degraded_tier() { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); @@ -1300,7 +1300,8 @@ mod tests { ); } - // Test: absent variant → runtime_default_used = true (reviewer concern #5 3395344504). + // Test: absent variant → runtime_default_used = true (detected from the + // absence of a variant, not the reason alone). #[test] fn absent_variant_sets_runtime_default_used() { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); @@ -1318,8 +1319,8 @@ mod tests { } // Test: killswitch DD_FLAGGING_EVALUATION_COUNTS_ENABLED=false → no recording. - // OTel record_ffe_evaluation_metric path must still be wired (verified by presence - // of the function in this codebase — PRES-01). + // The existing OTel record_ffe_evaluation_metric path must still be wired + // (verified by presence of the function in this codebase). #[test] fn killswitch_disabled_skips_evp_recording() { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); @@ -1348,7 +1349,7 @@ mod tests { assert!(evp_enabled(), "evp_enabled() must return true when env var is absent (default on)"); } - // Test: PRES-01 — confirm record_ffe_evaluation_metric exists (OTel native path). + // Test: confirm the existing OTel native path is preserved (non-regression). // This is a compile-time proof: if the function is missing, the test module won't compile. #[test] fn otel_native_path_function_exists() { From 63fdebe0c30aa51b40fa3166d2bc00e9c57b6ef1 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Sat, 13 Jun 2026 11:55:25 -0400 Subject: [PATCH 09/10] chore(openfeature): wire flagevaluation benchmark into benchmark suite --- tests/Benchmarks/API/FlagEvaluationBench.php | 169 +++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 tests/Benchmarks/API/FlagEvaluationBench.php diff --git a/tests/Benchmarks/API/FlagEvaluationBench.php b/tests/Benchmarks/API/FlagEvaluationBench.php new file mode 100644 index 0000000000..0ed7b96c5f --- /dev/null +++ b/tests/Benchmarks/API/FlagEvaluationBench.php @@ -0,0 +1,169 @@ + + * ddog_ffe_evaluate), which performs the UFC assignment and, when flag + * evaluation counting is enabled, the per-evaluation record + aggregation + * insert into the in-process evaluation aggregator. The config is loaded + * once from a static in-memory UFC document, so no Remote Config backend is + * required to drive the path. + * + * @BeforeMethods("setUp") + */ +class FlagEvaluationBench +{ + /** Subset of the UFC type ids exposed via the native bridge. */ + const TYPE_STRING = 0; + const TYPE_BOOL = 3; + + /** + * In-memory UFC configuration exercising the common evaluation outcomes: + * a split-allocation string flag, a targeting-rule match, and a disabled + * flag (default outcome). + */ + public static $ufc = <<<'JSON' +{ + "createdAt": "2024-01-01T00:00:00Z", + "environment": {"name": "bench"}, + "flags": { + "string.flag": { + "key": "string.flag", + "enabled": true, + "variationType": "STRING", + "variations": { + "blue": {"key": "blue", "value": "blue"}, + "green": {"key": "green", "value": "green"} + }, + "allocations": [{ + "key": "alloc-split", + "rules": [], + "splits": [ + {"variationKey": "blue", "shards": [{"salt": "bench", "totalShards": 10000, "ranges": [{"start": 0, "end": 5000}]}]}, + {"variationKey": "green", "shards": [{"salt": "bench", "totalShards": 10000, "ranges": [{"start": 5000, "end": 10000}]}]} + ], + "doLog": true + }] + }, + "targeted.flag": { + "key": "targeted.flag", + "enabled": true, + "variationType": "STRING", + "variations": { + "premium": {"key": "premium", "value": "premium"} + }, + "allocations": [{ + "key": "alloc-targeted", + "rules": [{"conditions": [{"attribute": "plan", "operator": "ONE_OF", "value": ["pro"]}]}], + "splits": [{"variationKey": "premium", "shards": []}], + "doLog": true + }] + }, + "disabled.flag": { + "key": "disabled.flag", + "enabled": false, + "variationType": "BOOLEAN", + "variations": { + "on": {"key": "on", "value": true} + }, + "allocations": [] + } + } +} +JSON; + + /** + * Records an evaluation into the aggregator on each call. The first + * evaluation for a (flag, variant, allocation, reason, context) tuple + * creates a bucket; subsequent evaluations merge into it, so the subject + * measures both the insert and the merge paths. + * + * @Revs(1000) + * @Iterations(10) + * @OutputTimeUnit("microseconds") + * @RetryThreshold(10.0) + * @Warmup(1) + */ + public function benchEvaluateSplit(): void + { + \DDTrace\ffe_evaluate('string.flag', self::TYPE_STRING, 'user-1', [ + 'country' => 'US', + 'age' => 42, + ]); + } + + /** + * Targeting-rule match path: a rule condition is matched before the split + * is applied, exercising the targeting branch of the evaluator and the + * resulting aggregation bucket. + * + * @Revs(1000) + * @Iterations(10) + * @OutputTimeUnit("microseconds") + * @RetryThreshold(10.0) + * @Warmup(1) + */ + public function benchEvaluateTargetingMatch(): void + { + \DDTrace\ffe_evaluate('targeted.flag', self::TYPE_STRING, 'user-1', [ + 'plan' => 'pro', + ]); + } + + /** + * High-cardinality context: a distinct targeting key and attribute per + * rev produce a fresh full-tier bucket each time, exercising the + * aggregation insert (rather than merge) and its bounding behaviour. + * + * @Revs(1000) + * @Iterations(10) + * @OutputTimeUnit("microseconds") + * @RetryThreshold(10.0) + * @Warmup(1) + */ + public function benchEvaluateDistinctContexts(): void + { + static $i = 0; + \DDTrace\ffe_evaluate('string.flag', self::TYPE_STRING, 'user-' . (++$i), [ + 'bucket' => $i, + ]); + } + + /** + * Baseline with flag evaluation counting disabled: isolates the cost of + * the record + aggregation insert by measuring the same evaluation call + * without it. + * + * @BeforeMethods({"setUp", "disableCounting"}) + * @Revs(1000) + * @Iterations(10) + * @OutputTimeUnit("microseconds") + * @RetryThreshold(10.0) + * @Warmup(1) + */ + public function benchEvaluateWithoutCounting(): void + { + \DDTrace\ffe_evaluate('string.flag', self::TYPE_STRING, 'user-1', [ + 'country' => 'US', + 'age' => 42, + ]); + } + + public function setUp(): void + { + // Enable flag evaluation counting so the record + aggregation insert + // runs on every evaluation (this is also the default when unset). + putenv('DD_FLAGGING_EVALUATION_COUNTS_ENABLED=true'); + \DDTrace\Testing\ffe_load_config(self::$ufc); + } + + public function disableCounting(): void + { + putenv('DD_FLAGGING_EVALUATION_COUNTS_ENABLED=false'); + } +} From f5e4087e819eb33740a0b9e8fd43f9ab0c6742d9 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Sun, 14 Jun 2026 10:02:23 -0400 Subject: [PATCH 10/10] fix(openfeature): deliver EVP flagevaluation batch to the sidecar (bincode-safe wire + reliable enqueue) Bump the libdatadog submodule to the bincode-safe flagevaluation fix (DataDog/libdatadog#2117): the worker->sidecar IPC is bincode, which the old serde_json::Value + skip_serializing_if wire types could not deserialize, so the sidecar silently dropped every batch. - Stringify the pruned full-tier context (JSON object string) at drain so the bincode wire stays plain; the sidecar flusher re-expands it into a JSON object for the POST. - Use sidecar_blocking::enqueue_actions_reliable for the one-shot RSHUTDOWN flush. --- Cargo.lock | 1 + components-rs/ffe.rs | 243 ++++++++++++++++++++++++++++++++----------- libdatadog | 2 +- 3 files changed, 183 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 081cd633ba..67594fc39a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1186,6 +1186,7 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" name = "datadog-ffe" version = "1.0.0" dependencies = [ + "bincode", "chrono", "derive_more", "faststr", diff --git a/components-rs/ffe.rs b/components-rs/ffe.rs index 8a13f33a0e..a2227d9b3f 100644 --- a/components-rs/ffe.rs +++ b/components-rs/ffe.rs @@ -320,9 +320,7 @@ fn record_flag_evaluation_evp( if current_total < GLOBAL_CAP && flag_count < PER_FLAG_CAP { let pruned = pruned_context_map(attrs); let flag_state = agg.full_tier.entry(flag_key.to_owned()).or_default(); - flag_state - .contexts - .insert(full_key.clone(), pruned); + flag_state.contexts.insert(full_key.clone(), pruned); flag_state.buckets.insert(full_key, AggBucket::new(eval_ms)); agg.full_tier_total += 1; return; @@ -352,11 +350,7 @@ fn record_flag_evaluation_evp( /// Drain the aggregator and build a `FfeFlagEvaluationBatch`. /// Returns `None` if the aggregator is empty. -fn drain_aggregator( - service: &str, - env: &str, - version: &str, -) -> Option { +fn drain_aggregator(service: &str, env: &str, version: &str) -> Option { let mut agg = match EVP_AGGREGATOR.lock() { Ok(g) => g, Err(p) => p.into_inner(), @@ -369,17 +363,20 @@ fn drain_aggregator( for (flag_key, mut flag_state) in agg.full_tier.drain() { for (k, bucket) in flag_state.buckets { // Pull the pruned context captured for this bucket at insertion time. + // The rich `BTreeMap` is stored internally; we stringify it into a + // JSON-object string only at event-build time so the bincode sidecar + // IPC wire stays encodable (bincode cannot carry serde_json::Value). + // The flusher re-expands the string into a JSON object before the POST. let pruned = flag_state.contexts.remove(&k).unwrap_or_default(); - let context = if pruned.is_empty() { - None - } else { - Some(FlagEvalEventContext { - evaluation: Some(pruned), - dd: Some(ContextDD { - service: service.to_owned(), - }), - }) - }; + // `Some(json_string)` when non-empty, `None` when the pruned map is + // empty — preserving the "empty context emits no evaluation" behavior. + let evaluation = serde_json::to_string(&pruned).ok().filter(|s| s != "{}"); + let context = evaluation.map(|evaluation| FlagEvalEventContext { + evaluation: Some(evaluation), + dd: Some(ContextDD { + service: service.to_owned(), + }), + }); let runtime_default = k.variant.is_empty(); let variant = if k.variant.is_empty() { None @@ -400,7 +397,9 @@ fn drain_aggregator( }; events.push(FfeFlagEvaluationEvent { timestamp: now, - flag: FlagKey { key: flag_key.clone() }, + flag: FlagKey { + key: flag_key.clone(), + }, first_evaluation: bucket.first_evaluation, last_evaluation: bucket.last_evaluation, evaluation_count: bucket.count, @@ -482,7 +481,7 @@ pub unsafe extern "C" fn ddog_ffe_flush_flag_evaluation_batch( None => return true, // nothing to flush }; - sidecar_blocking::enqueue_actions( + sidecar_blocking::enqueue_actions_reliable( transport, instance_id, queue_id, @@ -944,7 +943,10 @@ mod tests { std::ptr::null(), 0, ); - assert!(result.valid, "evaluation must still succeed when EVP is off"); + assert!( + result.valid, + "evaluation must still succeed when EVP is off" + ); assert!( drain_aggregator("svc", "prod", "1.0").is_none(), @@ -1022,12 +1024,24 @@ mod tests { let eval_ms_2 = 2_000i64; record_flag_evaluation_evp( - "my-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &empty_attrs(), eval_ms_1, + "my-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &empty_attrs(), + eval_ms_1, ); record_flag_evaluation_evp( - "my-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &empty_attrs(), eval_ms_2, + "my-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &empty_attrs(), + eval_ms_2, ); let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); @@ -1049,17 +1063,30 @@ mod tests { let attrs_b = attrs_with("plan", "premium"); record_flag_evaluation_evp( - "ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &attrs_a, 1_000, + "ctx-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &attrs_a, + 1_000, ); record_flag_evaluation_evp( - "ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &attrs_b, 1_000, + "ctx-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &attrs_b, + 1_000, ); let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); assert_eq!( - batch.flag_evaluations.len(), 2, + batch.flag_evaluations.len(), + 2, "different context values must produce two distinct buckets" ); } @@ -1073,17 +1100,30 @@ mod tests { let attrs_b = attrs_with("plan", "free"); record_flag_evaluation_evp( - "ctx-flag2", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &attrs_a, 1_000, + "ctx-flag2", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &attrs_a, + 1_000, ); record_flag_evaluation_evp( - "ctx-flag2", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &attrs_b, 2_000, + "ctx-flag2", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &attrs_b, + 2_000, ); let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); assert_eq!( - batch.flag_evaluations.len(), 1, + batch.flag_evaluations.len(), + 1, "same context values must merge into one bucket" ); assert_eq!(batch.flag_evaluations[0].evaluation_count, 2); @@ -1108,8 +1148,14 @@ mod tests { // This evaluation should be routed to the degraded tier. record_flag_evaluation_evp( - "overflow-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-x"), &attrs_with("k", "v"), 1_000, + "overflow-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-x"), + &attrs_with("k", "v"), + 1_000, ); let agg = match EVP_AGGREGATOR.lock() { @@ -1117,7 +1163,8 @@ mod tests { Err(p) => p.into_inner(), }; assert_eq!( - agg.degraded_tier.len(), 1, + agg.degraded_tier.len(), + 1, "evaluations past globalCap must land in the degraded tier" ); drop(agg); @@ -1135,7 +1182,7 @@ mod tests { Err(p) => p.into_inner(), }; agg.full_tier_total = GLOBAL_CAP; // full-tier saturated - // Fill the degraded tier to capacity. + // Fill the degraded tier to capacity. for i in 0..DEGRADED_CAP { let key = DegradedTierKey { flag_key: format!("flag-{i}"), @@ -1149,8 +1196,14 @@ mod tests { // One more evaluation should increment the drop counter. record_flag_evaluation_evp( - "drop-me", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - None, &empty_attrs(), 1_000, + "drop-me", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + None, + &empty_attrs(), + 1_000, ); let agg = match EVP_AGGREGATOR.lock() { @@ -1171,8 +1224,14 @@ mod tests { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); reset_aggregator(); record_flag_evaluation_evp( - "batch-flag", "variant-x", "alloc-1", REASON_TARGETING_MATCH, ERROR_NONE, - Some("user-99"), &empty_attrs(), 5_000, + "batch-flag", + "variant-x", + "alloc-1", + REASON_TARGETING_MATCH, + ERROR_NONE, + Some("user-99"), + &empty_attrs(), + 5_000, ); let batch = drain_aggregator("my-service", "staging", "2.0").unwrap(); @@ -1202,17 +1261,29 @@ mod tests { reset_aggregator(); let attrs = attrs_with("plan", "premium"); record_flag_evaluation_evp( - "ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &attrs, 1_000, + "ctx-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &attrs, + 1_000, ); let batch = drain_aggregator("frontend", "prod", "1.0").unwrap(); let ev = &batch.flag_evaluations[0]; - let context = ev.context.as_ref().expect("full-tier event must carry context"); - let evaluation = context + let context = ev + .context + .as_ref() + .expect("full-tier event must carry context"); + // `evaluation` is a JSON-object STRING on the wire; parse it back to assert. + let evaluation_str = context .evaluation .as_ref() .expect("context.evaluation must be present"); + let evaluation: serde_json::Value = + serde_json::from_str(evaluation_str).expect("evaluation must be a JSON-object string"); assert_eq!(evaluation.get("plan"), Some(&serde_json::json!("premium"))); assert_eq!( context.dd.as_ref().map(|d| d.service.as_str()), @@ -1228,18 +1299,33 @@ mod tests { reset_aggregator(); let mut attrs = HashMap::new(); attrs.insert(Str::from("ok"), Attribute::from("short")); - attrs.insert(Str::from("oversized"), Attribute::from("x".repeat(257).as_str())); + attrs.insert( + Str::from("oversized"), + Attribute::from("x".repeat(257).as_str()), + ); record_flag_evaluation_evp( - "prune-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &attrs, 1_000, + "prune-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &attrs, + 1_000, ); let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); - let evaluation = batch.flag_evaluations[0] + let evaluation_str = batch.flag_evaluations[0] .context .as_ref() .and_then(|c| c.evaluation.as_ref()) .expect("context.evaluation must be present"); + // `evaluation` is a JSON-object STRING on the wire; parse it back to assert. + let evaluation: serde_json::Value = + serde_json::from_str(evaluation_str).expect("evaluation must be a JSON-object string"); + let evaluation = evaluation + .as_object() + .expect("evaluation must parse to a JSON object"); assert!(evaluation.contains_key("ok"), "short value must be kept"); assert!( !evaluation.contains_key("oversized"), @@ -1253,8 +1339,14 @@ mod tests { let _g = EVP_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner()); reset_aggregator(); record_flag_evaluation_evp( - "no-ctx-flag", "on", "alloc-a", REASON_SPLIT, ERROR_NONE, - Some("user-1"), &empty_attrs(), 1_000, + "no-ctx-flag", + "on", + "alloc-a", + REASON_SPLIT, + ERROR_NONE, + Some("user-1"), + &empty_attrs(), + 1_000, ); let batch = drain_aggregator("svc", "prod", "1.0").unwrap(); @@ -1308,14 +1400,26 @@ mod tests { reset_aggregator(); // Simulate a DEFAULT evaluation (no variant assigned). record_flag_evaluation_evp( - "default-flag", "", "", REASON_DEFAULT, ERROR_NONE, - None, &empty_attrs(), 1_000, + "default-flag", + "", + "", + REASON_DEFAULT, + ERROR_NONE, + None, + &empty_attrs(), + 1_000, ); let batch = drain_aggregator("svc", "env", "1").unwrap(); let ev = &batch.flag_evaluations[0]; - assert!(ev.runtime_default_used, "absent variant must set runtime_default_used"); - assert!(ev.variant.is_none(), "absent variant must be None (not empty string)"); + assert!( + ev.runtime_default_used, + "absent variant must set runtime_default_used" + ); + assert!( + ev.variant.is_none(), + "absent variant must be None (not empty string)" + ); } // Test: killswitch DD_FLAGGING_EVALUATION_COUNTS_ENABLED=false → no recording. @@ -1328,13 +1432,22 @@ mod tests { std::env::set_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED", "false"); record_flag_evaluation_evp( - "ks-flag", "on", "alloc", REASON_SPLIT, ERROR_NONE, - None, &empty_attrs(), 1_000, + "ks-flag", + "on", + "alloc", + REASON_SPLIT, + ERROR_NONE, + None, + &empty_attrs(), + 1_000, ); // The evp_enabled() check is in ddog_ffe_evaluate(), not in // record_flag_evaluation_evp() itself. Test evp_enabled() directly. - assert!(!evp_enabled(), "evp_enabled() must return false when env var is 'false'"); + assert!( + !evp_enabled(), + "evp_enabled() must return false when env var is 'false'" + ); // Drain should return None (nothing was actually recorded via ddog_ffe_evaluate // because evp_enabled() would have returned false there). @@ -1343,10 +1456,16 @@ mod tests { reset_aggregator(); std::env::set_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED", "true"); - assert!(evp_enabled(), "evp_enabled() must return true when env var is 'true'"); + assert!( + evp_enabled(), + "evp_enabled() must return true when env var is 'true'" + ); std::env::remove_var("DD_FLAGGING_EVALUATION_COUNTS_ENABLED"); - assert!(evp_enabled(), "evp_enabled() must return true when env var is absent (default on)"); + assert!( + evp_enabled(), + "evp_enabled() must return true when env var is absent (default on)" + ); } // Test: confirm the existing OTel native path is preserved (non-regression). diff --git a/libdatadog b/libdatadog index 89a2ba7fc7..de8e987a28 160000 --- a/libdatadog +++ b/libdatadog @@ -1 +1 @@ -Subproject commit 89a2ba7fc7950cceb8c3c0c88a708afb72d2bbfb +Subproject commit de8e987a28a64e98c201f4d06a1cb0adff8d1652