Add result aggregation and load balancing for collector processes
Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115 (Coordination Workflows), #116 (Task Distribution)
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.4
Design Goals (from Parent #114)
Scope: Single-system, local multi-process coordination
- Performance: <1ms p99 latency for result aggregation from local processes
- Cross-Platform: Windows, Linux, macOS, FreeBSD
- Security: Local IPC only (no network exposure)
- Code Safety: Zero unsafe code in aggregation implementation
- Dependencies: Only use interprocess, tokio, prost, dashmap
Critical Architectural Context
Cross-process result aggregation:
- daemoneye-agent (separate process): hosts broker, aggregates results from local collectors
- Collector processes (procmond-1, procmond-2, netmond-1, etc.): separate OS processes on same machine
- All communication via local cross-process IPC using interprocess crate
- Result aggregation and load balancing happen across process boundaries
Overview
This task implements result aggregation and advanced load balancing capabilities within daemoneye-eventbus, enabling the agent to efficiently collect and correlate results from multiple collector processes on the local system.
Architecture
Result Aggregation Flow (Cross-Process)
1. Collector Processes Execute Tasks
├─▶ procmond-1: Process enumeration
├─▶ procmond-2: Process integrity checks
└─▶ netmond-1: Network connections
2. Results Published via Local IPC
├─▶ procmond-1 → events.process.lifecycle
├─▶ procmond-2 → events.process.integrity
└─▶ netmond-1 → events.network.connections
3. Broker Receives Results (In Agent Process)
├─▶ Route by topic pattern
├─▶ Group by correlation_id
└─▶ Buffer until complete
4. Aggregated Results (In Agent Process)
├─▶ Merge related events from multiple local processes
├─▶ Deduplicate if needed
└─▶ Forward to detection engine
Result Aggregator (In Broker Process)
/// Aggregates results from multiple local collector processes
pub struct ResultAggregator {
// Pending result sets indexed by correlation ID
pending_results: Arc<DashMap<Uuid, ResultSet>>,
// Completion callback
on_complete: Arc<dyn Fn(AggregatedResult) + Send + Sync>,
// Timeout for incomplete result sets
timeout_ms: u64,
}
pub struct ResultSet {
correlation_id: Uuid,
expected_collectors: HashSet<ProcessId>,
received_results: Vec<(ProcessId, DetectionResult)>,
created_at: Instant,
}
impl ResultAggregator {
/// Add result from local collector process
pub async fn add_result(
&self,
process_id: ProcessId,
correlation_id: Uuid,
result: DetectionResult,
) -> Result<()> {
let mut result_set = self.pending_results
.entry(correlation_id)
.or_insert_with(|| ResultSet::new(correlation_id));
result_set.received_results.push((process_id, result));
// Check if complete
if result_set.is_complete() {
let aggregated = self.aggregate(result_set)?;
(self.on_complete)(aggregated);
self.pending_results.remove(&correlation_id);
}
Ok(())
}
/// Aggregate results from multiple local processes
fn aggregate(&self, result_set: ResultSet) -> Result<AggregatedResult> {
// Merge results, deduplicate, correlate
let merged = self.merge_results(result_set.received_results)?;
Ok(AggregatedResult {
correlation_id: result_set.correlation_id,
results: merged,
collector_count: result_set.expected_collectors.len(),
complete: true,
})
}
}
Advanced Load Balancer (Across Local Processes)
pub struct AdvancedLoadBalancer {
// Real-time load metrics per local process
metrics: Arc<DashMap<ProcessId, LoadMetrics>>,
// Historical performance data
history: Arc<DashMap<ProcessId, PerformanceHistory>>,
}
pub struct LoadMetrics {
active_tasks: AtomicUsize,
queue_depth: AtomicUsize,
cpu_usage: AtomicU64, // 0-100%
memory_bytes: AtomicU64,
last_heartbeat: AtomicU64, // Unix timestamp
}
pub struct PerformanceHistory {
avg_task_duration_ms: f64,
success_rate: f64,
total_tasks_completed: u64,
}
impl AdvancedLoadBalancer {
/// Select best local process using weighted scoring
pub fn select_weighted(
&self,
candidates: Vec<ProcessId>,
) -> Result<ProcessId> {
candidates
.into_iter()
.max_by_key(|&pid| {
self.calculate_score(pid)
})
.ok_or(Error::NoCollectorAvailable)
}
/// Calculate fitness score for local process
fn calculate_score(&self, process_id: ProcessId) -> u64 {
let metrics = self.metrics.get(&process_id)?;
let history = self.history.get(&process_id)?;
// Lower is better for load indicators
let load_score = 1000 - metrics.active_tasks.load(Ordering::Relaxed) * 10;
let queue_score = 1000 - metrics.queue_depth.load(Ordering::Relaxed) * 5;
// Higher is better for performance
let perf_score = (history.success_rate * 100.0) as u64;
load_score + queue_score + perf_score
}
/// Update metrics from local collector heartbeat
pub fn update_metrics(&self, process_id: ProcessId, heartbeat: CollectorHeartbeat) {
let mut metrics = self.metrics.entry(process_id).or_insert_with(LoadMetrics::default);
metrics.active_tasks.store(heartbeat.active_tasks as usize, Ordering::Relaxed);
metrics.queue_depth.store(heartbeat.queue_depth as usize, Ordering::Relaxed);
metrics.cpu_usage.store(heartbeat.cpu_usage.to_bits(), Ordering::Relaxed);
metrics.memory_bytes.store(heartbeat.memory_bytes, Ordering::Relaxed);
}
}
Failover Manager (For Local Processes)
pub struct FailoverManager {
health_tracker: Arc<HealthTracker>,
task_router: Arc<TaskRouter>,
}
impl FailoverManager {
/// Handle local collector process failure
pub async fn handle_failure(&self, failed_process_id: ProcessId) -> Result<()> {
// 1. Mark process as unhealthy
self.health_tracker.mark_unhealthy(failed_process_id).await?;
// 2. Get pending tasks from failed local process
let pending_tasks = self.get_pending_tasks(failed_process_id).await?;
// 3. Redistribute to healthy local processes
for task in pending_tasks {
self.task_router.route_task(task).await?;
}
// 4. Clean up subscriptions
self.cleanup_subscriptions(failed_process_id).await?;
Ok(())
}
/// Detect failed local processes via heartbeat monitoring
pub async fn monitor_heartbeats(&self) {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let now = Instant::now();
let stale_processes = self.health_tracker
.find_stale_processes(Duration::from_secs(30))
.await?;
for process_id in stale_processes {
warn!("Local collector process {} missed heartbeat, initiating failover", process_id);
self.handle_failure(process_id).await?;
}
}
}
}
Implementation Steps
1. Result Aggregator
impl ResultAggregator {
pub fn new(timeout_ms: u64, on_complete: impl Fn(AggregatedResult) + Send + Sync + 'static) -> Self {
Self {
pending_results: Arc::new(DashMap::new()),
on_complete: Arc::new(on_complete),
timeout_ms,
}
}
/// Start timeout monitor for incomplete result sets
pub fn start_timeout_monitor(self: Arc<Self>) {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(self.timeout_ms / 2)).await;
let now = Instant::now();
let mut timed_out = Vec::new();
for entry in self.pending_results.iter() {
let result_set = entry.value();
if now.duration_since(result_set.created_at).as_millis() as u64 > self.timeout_ms {
timed_out.push(*entry.key());
}
}
for correlation_id in timed_out {
if let Some((_, result_set)) = self.pending_results.remove(&correlation_id) {
warn!("Result set {} timed out, partial results available", correlation_id);
let partial = self.aggregate(result_set)?;
(self.on_complete)(partial);
}
}
}
});
}
}
2. Advanced Load Balancer
impl AdvancedLoadBalancer {
/// Update performance history from completed task
pub fn record_task_completion(
&self,
process_id: ProcessId,
duration_ms: u64,
success: bool,
) {
let mut history = self.history.entry(process_id).or_insert_with(PerformanceHistory::default);
// Update moving average
let alpha = 0.3; // Smoothing factor
history.avg_task_duration_ms =
alpha * duration_ms as f64 + (1.0 - alpha) * history.avg_task_duration_ms;
// Update success rate
let total = history.total_tasks_completed as f64;
history.success_rate =
(history.success_rate * total + if success { 1.0 } else { 0.0 }) / (total + 1.0);
history.total_tasks_completed += 1;
}
}
3. Failover Manager
impl FailoverManager {
/// Attempt to restart failed local collector process
pub async fn attempt_restart(&self, failed_process_id: ProcessId) -> Result<()> {
// Get collector metadata
let collector_info = self.get_collector_info(failed_process_id)?;
// Spawn new collector process
let new_pid = self.spawn_collector_process(&collector_info).await?;
// Wait for registration
tokio::time::timeout(
Duration::from_secs(30),
self.wait_for_registration(new_pid)
).await??;
info!("Successfully restarted local collector process {}", new_pid);
Ok(())
}
}
Acceptance Criteria
Testing Strategy
Unit Tests
- Result merging and deduplication
- Weighted scoring algorithm
- Timeout handling
- Heartbeat staleness detection
Integration Tests
- Aggregate results from multiple local processes
- Load balancing across healthy local processes
- Failover when local process crashes
- Task redistribution on failure
Performance Tests
- Aggregation latency with varying local process counts
- Load balancing decision time
- Failover response time (<3 heartbeat intervals = 30s)
- Target: <1ms p99 latency for aggregation
Performance Requirements
- Result Aggregation Latency: <1ms p99 for merging results from local processes
- Load Balancing Decision Time: <1ms for weighted scoring
- Failover Detection: Within 3 heartbeat intervals (default 30s)
- Task Redistribution: <10s for all pending tasks from failed local process
Code Safety
- Zero unsafe code in aggregation and balancing logic
- Built entirely on: interprocess, tokio, prost, dashmap
- All dependencies are well-audited
Status: Not Started
Priority: High
Estimated Effort: 1 week
Depends On: #115 (Coordination Workflows), #116 (Task Distribution)
Add result aggregation and load balancing for collector processes
Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115 (Coordination Workflows), #116 (Task Distribution)
Specification:
.kiro/specs/daemoneye-core-monitoring/tasks.mdSection 2.6.4Design Goals (from Parent #114)
Scope: Single-system, local multi-process coordination
Critical Architectural Context
Cross-process result aggregation:
Overview
This task implements result aggregation and advanced load balancing capabilities within daemoneye-eventbus, enabling the agent to efficiently collect and correlate results from multiple collector processes on the local system.
Architecture
Result Aggregation Flow (Cross-Process)
Result Aggregator (In Broker Process)
Advanced Load Balancer (Across Local Processes)
Failover Manager (For Local Processes)
Implementation Steps
1. Result Aggregator
2. Advanced Load Balancer
3. Failover Manager
Acceptance Criteria
Testing Strategy
Unit Tests
Integration Tests
Performance Tests
Performance Requirements
Code Safety
Status: Not Started
Priority: High
Estimated Effort: 1 week
Depends On: #115 (Coordination Workflows), #116 (Task Distribution)