Add correlation metadata and multi-collector workflow support across processes
Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115, #116, #117
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.8
Design Goals (from Parent #114)
Scope: Single-system correlation tracking (NOT distributed tracing)
- Performance: Correlation overhead <5% of base local IPC latency
- Cross-Platform: Windows, Linux, macOS, FreeBSD
- Security: Local process tracking only (no network exposure)
- Code Safety: Zero unsafe code in correlation logic
- Dependencies: Only use interprocess, tokio, prost, dashmap
Critical Architectural Context
Cross-process correlation tracking:
- daemoneye-agent (separate process): hosts broker and correlates events from local processes
- Collector processes (procmond, netmond, etc.): separate OS processes on same machine publishing events via IPC
- Correlation metadata tracks events across local process boundaries
- EventBus envelopes carry correlation data through local cross-process IPC
This is for local process coordination, NOT distributed tracing across networks
Overview
This task implements correlation metadata and multi-collector workflow support within daemoneye-eventbus, enabling the agent to track and correlate events across multiple local collector processes for complex detection workflows and forensic investigations.
Correlation Architecture
Correlation Metadata (Carried Across Local IPC)
/// Correlation metadata for tracking workflows across local processes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CorrelationMetadata {
/// Unique workflow identifier
pub correlation_id: Uuid,
/// Parent correlation for hierarchical workflows
pub parent_correlation_id: Option<Uuid>,
/// Root workflow identifier
pub root_correlation_id: Uuid,
/// Sequence number within workflow
pub sequence_number: u64,
/// Current workflow stage
pub workflow_stage: String,
/// Originating local collector process
pub source_process: String,
/// Flexible tagging for forensic correlation
pub correlation_tags: HashMap<String, String>,
/// Timestamp when correlation started
pub started_at: SystemTime,
}
EventBus Envelope (Transmitted via Local IPC)
/// Event envelope carrying payload and correlation metadata via local IPC
#[derive(Debug, Serialize, Deserialize)]
pub struct EventEnvelope {
/// Topic this event was published to
pub topic: String,
/// Serialized event payload
pub payload: Vec<u8>,
/// Timestamp when published
pub timestamp: SystemTime,
/// Local publisher process ID
pub publisher_process_id: ProcessId,
/// Correlation metadata (optional)
pub correlation_metadata: Option<CorrelationMetadata>,
}
Correlation Tracker (In Broker Process)
/// Tracks correlation workflows across local collector processes
pub struct CorrelationTracker {
// Active workflows indexed by correlation ID
active_workflows: Arc<DashMap<Uuid, WorkflowState>>,
// Event history for forensic analysis
event_history: Arc<RwLock<VecDeque<CorrelatedEvent>>>,
// Maximum history size
max_history_size: usize,
}
pub struct WorkflowState {
correlation_id: Uuid,
started_at: Instant,
stages: HashMap<String, StageInfo>,
participating_processes: HashSet<ProcessId>,
total_events: usize,
}
pub struct StageInfo {
stage_name: String,
started_at: Instant,
completed_at: Option<Instant>,
events_count: usize,
}
pub struct CorrelatedEvent {
correlation_id: Uuid,
event: EventEnvelope,
received_at: Instant,
}
Workflow Scenarios
1. Simple Single-Stage Workflow (Across Local Processes)
// Agent initiates workflow
let correlation_id = Uuid::new_v4();
let metadata = CorrelationMetadata {
correlation_id,
root_correlation_id: correlation_id,
workflow_stage: "enumeration".to_string(),
source_process: "daemoneye-agent".to_string(),
..Default::default()
};
// Publish task with correlation to local collectors
broker.publish_with_correlation(
"control.collector.task.process",
task,
metadata.clone()
).await?;
// Collector processes respond with same correlation_id
// Results automatically grouped by correlation tracker
2. Multi-Stage Workflow (Across Local Processes)
// Stage 1: Enumeration (local procmond processes)
let root_id = Uuid::new_v4();
let stage1 = CorrelationMetadata {
correlation_id: Uuid::new_v4(),
root_correlation_id: root_id,
workflow_stage: "enumeration".to_string(),
..Default::default()
};
broker.publish_with_correlation("control.collector.task.process", enum_task, stage1).await?;
// Wait for stage 1 results from local processes
let enum_results = broker.wait_for_correlation_results(stage1.correlation_id).await?;
// Stage 2: Integrity checks on enumerated processes (local procmond)
let stage2 = CorrelationMetadata {
correlation_id: Uuid::new_v4(),
parent_correlation_id: Some(stage1.correlation_id),
root_correlation_id: root_id,
workflow_stage: "integrity".to_string(),
correlation_tags: hashmap! {
"parent_stage".to_string() => "enumeration".to_string(),
},
..Default::default()
};
broker.publish_with_correlation("control.collector.task.process", integrity_task, stage2).await?;
// Stage 3: Network connections from suspicious processes (local netmond)
let stage3 = CorrelationMetadata {
correlation_id: Uuid::new_v4(),
parent_correlation_id: Some(stage2.correlation_id),
root_correlation_id: root_id,
workflow_stage: "network_analysis".to_string(),
..Default::default()
};
broker.publish_with_correlation("control.collector.task.network", network_task, stage3).await?;
3. Forensic Correlation (Local Process Investigation)
// Tag events for forensic investigation of local processes
let forensic_metadata = CorrelationMetadata {
correlation_id: incident_id,
root_correlation_id: incident_id,
workflow_stage: "forensic_collection".to_string(),
correlation_tags: hashmap! {
"investigation_id".to_string() => "INC-12345".to_string(),
"suspicious_pid".to_string() => "4892".to_string(),
"analyst".to_string() => "alice".to_string(),
},
..Default::default()
};
// Collect all events related to PID 4892 from local collectors
broker.publish_with_correlation("control.collector.task.process", forensic_task, forensic_metadata).await?;
// Later: Query all events for this investigation from local event history
let related_events = correlation_tracker
.find_events_by_tag("investigation_id", "INC-12345")
.await?;
Implementation Steps
1. Correlation Metadata Propagation
impl EventBusBroker {
/// Publish event with correlation metadata to local subscribers
pub async fn publish_with_correlation(
&self,
topic: &str,
payload: Vec<u8>,
correlation: CorrelationMetadata,
) -> Result<()> {
let envelope = EventEnvelope {
topic: topic.to_string(),
payload,
timestamp: SystemTime::now(),
publisher_process_id: self.process_id(),
correlation_metadata: Some(correlation.clone()),
};
// Track workflow
self.correlation_tracker.track_event(&envelope).await?;
// Route to local subscribers
self.route_envelope(envelope).await?;
Ok(())
}
}
2. Correlation Tracking
impl CorrelationTracker {
/// Track event in correlation workflow
pub async fn track_event(&self, envelope: &EventEnvelope) -> Result<()> {
if let Some(correlation) = &envelope.correlation_metadata {
// Update workflow state
let mut workflow = self.active_workflows
.entry(correlation.correlation_id)
.or_insert_with(|| WorkflowState::new(correlation.correlation_id));
workflow.total_events += 1;
workflow.participating_processes.insert(envelope.publisher_process_id);
// Track stage
let stage = workflow.stages
.entry(correlation.workflow_stage.clone())
.or_insert_with(|| StageInfo::new(&correlation.workflow_stage));
stage.events_count += 1;
// Add to event history
let mut history = self.event_history.write().await;
history.push_back(CorrelatedEvent {
correlation_id: correlation.correlation_id,
event: envelope.clone(),
received_at: Instant::now(),
});
// Trim history if needed
if history.len() > self.max_history_size {
history.pop_front();
}
}
Ok(())
}
/// Wait for all results from a correlation workflow
pub async fn wait_for_correlation_results(
&self,
correlation_id: Uuid,
timeout: Duration,
) -> Result<Vec<EventEnvelope>> {
let deadline = Instant::now() + timeout;
let mut results = Vec::new();
loop {
// Check for completion
if let Some(workflow) = self.active_workflows.get(&correlation_id) {
if workflow.is_complete() {
// Collect all events for this correlation from local history
let history = self.event_history.read().await;
for event in history.iter() {
if event.correlation_id == correlation_id {
results.push(event.event.clone());
}
}
return Ok(results);
}
}
// Check timeout
if Instant::now() > deadline {
return Err(Error::CorrelationTimeout(correlation_id));
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
3. Forensic Query API
impl CorrelationTracker {
/// Find events by correlation tag (for forensic investigations)
pub async fn find_events_by_tag(
&self,
tag_key: &str,
tag_value: &str,
) -> Result<Vec<EventEnvelope>> {
let history = self.event_history.read().await;
Ok(history
.iter()
.filter(|event| {
event.event.correlation_metadata
.as_ref()
.and_then(|meta| meta.correlation_tags.get(tag_key))
.map(|v| v == tag_value)
.unwrap_or(false)
})
.map(|event| event.event.clone())
.collect())
}
/// Get workflow timeline (for forensic analysis)
pub async fn get_workflow_timeline(&self, root_correlation_id: Uuid) -> Result<WorkflowTimeline> {
let history = self.event_history.read().await;
let events: Vec<_> = history
.iter()
.filter(|event| {
event.event.correlation_metadata
.as_ref()
.map(|meta| meta.root_correlation_id == root_correlation_id)
.unwrap_or(false)
})
.map(|event| TimelineEvent {
timestamp: event.received_at,
stage: event.event.correlation_metadata.as_ref().unwrap().workflow_stage.clone(),
source_process: event.event.publisher_process_id,
event: event.event.clone(),
})
.collect();
Ok(WorkflowTimeline {
root_correlation_id,
events,
})
}
}
Acceptance Criteria
Testing Strategy
Unit Tests
- Correlation ID generation and propagation
- Workflow state tracking
- Event history management
- Tag-based querying
Integration Tests
- Simple single-stage workflow across local processes
- Multi-stage workflow with dependencies
- Forensic tagging and retrieval
- Timeline reconstruction
Performance Requirements (from Parent #114)
- Correlation Overhead: <5% of base local IPC latency (<50μs additional per message)
- Event History Query: <10ms for tag-based search in 10K events
- Timeline Reconstruction: <100ms for workflows with 1K events
- Memory Overhead: <10MB for 10K correlated events in history
Code Safety
- Zero unsafe code in correlation tracking logic
- Built entirely on: interprocess, tokio, prost, dashmap
- All dependencies are well-audited
Use Cases
1. Threat Hunting (Local System)
- Correlate process execution, network activity, and file access from local collectors
- Tag suspicious patterns for investigation
- Reconstruct attack timeline from local event history
2. Compliance Auditing (Local System)
- Track multi-stage data access workflows across local processes
- Correlate user actions with system events
- Generate audit trails from local event history
3. Performance Monitoring (Local System)
- Correlate resource usage across local collector domains
- Track performance impact of specific workflows
- Identify bottlenecks in multi-stage processing on local system
Status: Not Started
Priority: Medium
Estimated Effort: 1 week
Depends On: #115, #116, #117
Add correlation metadata and multi-collector workflow support across processes
Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115, #116, #117
Specification:
.kiro/specs/daemoneye-core-monitoring/tasks.mdSection 2.6.8Design Goals (from Parent #114)
Scope: Single-system correlation tracking (NOT distributed tracing)
Critical Architectural Context
Cross-process correlation tracking:
This is for local process coordination, NOT distributed tracing across networks
Overview
This task implements correlation metadata and multi-collector workflow support within daemoneye-eventbus, enabling the agent to track and correlate events across multiple local collector processes for complex detection workflows and forensic investigations.
Correlation Architecture
Correlation Metadata (Carried Across Local IPC)
EventBus Envelope (Transmitted via Local IPC)
Correlation Tracker (In Broker Process)
Workflow Scenarios
1. Simple Single-Stage Workflow (Across Local Processes)
2. Multi-Stage Workflow (Across Local Processes)
3. Forensic Correlation (Local Process Investigation)
Implementation Steps
1. Correlation Metadata Propagation
2. Correlation Tracking
3. Forensic Query API
Acceptance Criteria
Testing Strategy
Unit Tests
Integration Tests
Performance Requirements (from Parent #114)
Code Safety
Use Cases
1. Threat Hunting (Local System)
2. Compliance Auditing (Local System)
3. Performance Monitoring (Local System)
Status: Not Started
Priority: Medium
Estimated Effort: 1 week
Depends On: #115, #116, #117