From 5ab353283369f34c601228d99c03da44b309ef5a Mon Sep 17 00:00:00 2001 From: Draco-Lunaris-Echo Date: Fri, 5 Jun 2026 16:42:39 -0500 Subject: [PATCH] feat: add CRL health aggregation logic and audit events (PR 5 of 6) * feat: add CRL health aggregation logic and audit events (PR 5 of 6) * style: fix cargo fmt in health_poller.rs --------- Co-authored-by: Draco Lunaris <331325+Draco-Lunaris@users.noreply.github.com> --- crates/pm-core/src/audit.rs | 8 + crates/pm-worker/src/health_poller.rs | 378 +++++++++++++++++++------- docs/REST_API.md | 27 ++ migrations/022_crl_audit_actions.sql | 8 + 4 files changed, 316 insertions(+), 105 deletions(-) create mode 100644 migrations/022_crl_audit_actions.sql diff --git a/crates/pm-core/src/audit.rs b/crates/pm-core/src/audit.rs index 4c9a1dd..e5f606e 100644 --- a/crates/pm-core/src/audit.rs +++ b/crates/pm-core/src/audit.rs @@ -57,6 +57,10 @@ pub enum AuditAction { IpWhitelistUpdated, OidcTestPerformed, OidcDiscoverPerformed, + // CRL health aggregation events (system-initiated) + CrlStatusChanged, + CrlStaleDetected, + CrlInvalid, } impl AuditAction { @@ -100,6 +104,10 @@ impl AuditAction { Self::IpWhitelistUpdated => "ip_whitelist_updated", Self::OidcTestPerformed => "oidc_test_performed", Self::OidcDiscoverPerformed => "oidc_discover_performed", + // CRL health aggregation events + Self::CrlStatusChanged => "crl_status_changed", + Self::CrlStaleDetected => "crl_stale_detected", + Self::CrlInvalid => "crl_invalid", } } } diff --git a/crates/pm-worker/src/health_poller.rs b/crates/pm-worker/src/health_poller.rs index dca04b9..e9c9616 100644 --- a/crates/pm-worker/src/health_poller.rs +++ b/crates/pm-worker/src/health_poller.rs @@ -4,11 +4,24 @@ //! `health_poll_interval_secs`, with bounded concurrency controlled by a //! [`tokio::sync::Semaphore`]. Also calls `/system/info` to refresh //! `os_family`, `os_name`, `arch`, and `agent_version` in the hosts table. +//! +//! CRL health aggregation rules (PR 5): +//! - `crl_status = "invalid"` → host health_status overridden to `unreachable` +//! - `crl_status = "expired"` → host health_status overridden to `degraded` (if currently healthy) +//! - `crl_status = "missing"` AND registered > 24h ago → host health_status overridden to `degraded` (if currently healthy) +//! - `crl_status = "valid"` or NULL → no override +//! +//! Audit events are logged for CRL state transitions. use std::sync::Arc; +use chrono::{DateTime, Duration, Utc}; use pm_agent_client::{AgentClient, AgentClientError}; -use pm_core::{config::AppConfig, models::HostHealthStatus}; +use pm_core::{ + audit::{log_event, AuditAction}, + config::AppConfig, + models::HostHealthStatus, +}; use sqlx::{FromRow, PgPool}; use tokio::{sync::Semaphore, time}; use uuid::Uuid; @@ -21,6 +34,10 @@ struct HostRow { id: Uuid, ip_address: String, agent_port: i32, + /// Current CRL status from the hosts table (for transition detection). + crl_status: Option, + /// When the host was first registered (for enrollment age checks). + registered_at: DateTime, } /// Run the health poller loop indefinitely. @@ -50,9 +67,9 @@ pub async fn run_health_poller(pool: PgPool, config: Arc) { let client_key = Arc::new(certs.client_key); let ca_cert = Arc::new(certs.ca_cert); - // Fetch all hosts. + // Fetch all hosts with CRL status and registration time. let hosts: Vec = match sqlx::query_as( - "SELECT id, host(ip_address)::text AS ip_address, agent_port FROM hosts ORDER BY id", + "SELECT id, host(ip_address)::text AS ip_address, agent_port, crl_status, registered_at FROM hosts ORDER BY id", ) .fetch_all(&pool) .await @@ -118,7 +135,8 @@ pub async fn run_health_poller(pool: PgPool, config: Arc) { /// /// Also updates `agent_version` from the health response, /// `os_family`/`os_name`/`arch` from the `/system/info` endpoint when available, -/// and CRL status fields from the health response when reported by the agent. +/// CRL status fields from the health response when reported by the agent, +/// and applies CRL health aggregation rules. async fn poll_host_health( pool: PgPool, host: HostRow, @@ -127,108 +145,119 @@ async fn poll_host_health( ca_cert: &[u8], ) -> HostHealthStatus { // Determine status, payload, agent version, optional system info, and CRL fields. - let (status, payload, agent_version, sys_info, crl_status, crl_age_seconds, crl_next_update) = - match AgentClient::new( - &host.ip_address, - host.agent_port as u16, - client_cert, - client_key, - ca_cert, - ) { - Err(e) => { - tracing::warn!( - host_id = %host.id, - error = %e, - "Health poller: failed to build AgentClient" - ); - ( - HostHealthStatus::Unreachable, - serde_json::Value::Object(Default::default()), - None, - None, - None, - None, - None, - ) - }, - Ok(client) => { - let (status, payload, version, crl_status, crl_age, crl_next) = match client - .health() - .await - { - Ok(data) => { - let payload = serde_json::to_value(&data).unwrap_or_default(); - let crl_status = data.crl_status.clone(); - let crl_age = data.crl_age_seconds; - let crl_next = data.crl_next_update.clone(); - ( - HostHealthStatus::Healthy, - payload, - Some(data.version), - crl_status, - crl_age, - crl_next, - ) - }, - Err(AgentClientError::Timeout) => { - tracing::warn!(host_id = %host.id, "Health poller: agent timed out"); - ( - HostHealthStatus::Unreachable, - serde_json::Value::Object(Default::default()), - None, - None, - None, - None, - ) - }, - Err(AgentClientError::Connect(_)) => { - tracing::warn!(host_id = %host.id, "Health poller: agent connection refused"); - ( - HostHealthStatus::Unreachable, - serde_json::Value::Object(Default::default()), - None, - None, - None, - None, - ) - }, + let ( + natural_status, + payload, + agent_version, + sys_info, + crl_status, + crl_age_seconds, + crl_next_update, + ) = match AgentClient::new( + &host.ip_address, + host.agent_port as u16, + client_cert, + client_key, + ca_cert, + ) { + Err(e) => { + tracing::warn!( + host_id = %host.id, + error = %e, + "Health poller: failed to build AgentClient" + ); + ( + HostHealthStatus::Unreachable, + serde_json::Value::Object(Default::default()), + None, + None, + None, + None, + None, + ) + }, + Ok(client) => { + let (status, payload, version, crl_status, crl_age, crl_next) = match client + .health() + .await + { + Ok(data) => { + let payload = serde_json::to_value(&data).unwrap_or_default(); + let crl_status = data.crl_status.clone(); + let crl_age = data.crl_age_seconds; + let crl_next = data.crl_next_update.clone(); + ( + HostHealthStatus::Healthy, + payload, + Some(data.version), + crl_status, + crl_age, + crl_next, + ) + }, + Err(AgentClientError::Timeout) => { + tracing::warn!(host_id = %host.id, "Health poller: agent timed out"); + ( + HostHealthStatus::Unreachable, + serde_json::Value::Object(Default::default()), + None, + None, + None, + None, + ) + }, + Err(AgentClientError::Connect(_)) => { + tracing::warn!(host_id = %host.id, "Health poller: agent connection refused"); + ( + HostHealthStatus::Unreachable, + serde_json::Value::Object(Default::default()), + None, + None, + None, + None, + ) + }, + Err(e) => { + tracing::warn!(host_id = %host.id, error = %e, "Health poller: agent error"); + ( + HostHealthStatus::Degraded, + serde_json::Value::Object(Default::default()), + None, + None, + None, + None, + ) + }, + }; + + // Try to fetch system info for OS/arch details (best-effort). + let sys_info = if status != HostHealthStatus::Unreachable { + match client.system_info().await { + Ok(info) => Some(info), Err(e) => { - tracing::warn!(host_id = %host.id, error = %e, "Health poller: agent error"); - ( - HostHealthStatus::Degraded, - serde_json::Value::Object(Default::default()), - None, - None, - None, - None, - ) + tracing::debug!( + host_id = %host.id, + error = %e, + "Health poller: failed to get system info (non-fatal)" + ); + None }, - }; + } + } else { + None + }; - // Try to fetch system info for OS/arch details (best-effort). - let sys_info = if status != HostHealthStatus::Unreachable { - match client.system_info().await { - Ok(info) => Some(info), - Err(e) => { - tracing::debug!( - host_id = %host.id, - error = %e, - "Health poller: failed to get system info (non-fatal)" - ); - None - }, - } - } else { - None - }; + ( + status, payload, version, sys_info, crl_status, crl_age, crl_next, + ) + }, + }; - ( - status, payload, version, sys_info, crl_status, crl_age, crl_next, - ) - }, - }; + // Apply CRL health aggregation rules to determine the effective status. + // Only apply when the agent reported a CRL status (non-NULL). + let effective_status = apply_crl_health_rules(&natural_status, &crl_status, host.registered_at); - // Insert into host_health_data. + // Insert into host_health_data with the natural (pre-aggregation) status. if let Err(e) = sqlx::query( r#" INSERT INTO host_health_data (host_id, status, payload) @@ -236,7 +265,7 @@ async fn poll_host_health( "#, ) .bind(host.id) - .bind(&status) + .bind(&natural_status) .bind(&payload) .execute(&pool) .await @@ -255,7 +284,8 @@ async fn poll_host_health( .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .map(|dt| dt.to_utc()); - // Update hosts table with health status, agent version, OS details, and CRL fields. + // Update hosts table with the effective (post-aggregation) health status, + // agent version, OS details, and CRL fields. // COALESCE preserves existing values when new data is unavailable. if let Err(e) = sqlx::query( r#" @@ -272,7 +302,7 @@ async fn poll_host_health( "#, ) .bind(host.id) - .bind(&status) + .bind(&effective_status) .bind(&agent_version) .bind(sys_info.as_ref().map(|i| i.os.as_str())) .bind(os_name_from_sysinfo) @@ -284,7 +314,145 @@ async fn poll_host_health( .await { tracing::error!(host_id = %host.id, error = %e, "Health poller: failed to update host status"); + // Don't log audit events if the DB update failed. + return effective_status; } - status + // Log CRL audit events after successful database update. + if let Some(ref new_crl) = crl_status { + log_crl_audit_events( + &pool, + host.id, + host.crl_status.as_deref(), + new_crl, + crl_age_seconds, + ) + .await; + } + + effective_status +} + +/// Apply CRL health aggregation rules to determine the effective health status. +/// +/// Rules: +/// - `crl_status = "invalid"` → `Unreachable` (security event, always overrides) +/// - `crl_status = "expired"` → `Degraded` (only if natural status is `Healthy`) +/// - `crl_status = "missing"` AND registered > 24h ago → `Degraded` (only if natural status is `Healthy`) +/// - `crl_status = "valid"` or NULL → no override +fn apply_crl_health_rules( + natural_status: &HostHealthStatus, + crl_status: &Option, + registered_at: DateTime, +) -> HostHealthStatus { + let Some(crl) = crl_status else { + // Older agent not reporting CRL — don't modify health status. + return natural_status.clone(); + }; + + match crl.as_str() { + "invalid" => HostHealthStatus::Unreachable, + "expired" => { + if *natural_status == HostHealthStatus::Healthy { + HostHealthStatus::Degraded + } else { + natural_status.clone() + } + }, + "missing" => { + let age = Utc::now() - registered_at; + if age > Duration::hours(24) && *natural_status == HostHealthStatus::Healthy { + HostHealthStatus::Degraded + } else { + natural_status.clone() + } + }, + // "valid" or any other value — no override + _ => natural_status.clone(), + } +} + +/// Log audit events for CRL state transitions. +/// +/// Called after the hosts table has been successfully updated. +/// Logs: +/// - `CrlStatusChanged` when the CRL status transitions to a different value +/// - `CrlStaleDetected` when CRL status becomes "expired" +/// - `CrlInvalid` when CRL status becomes "invalid" +async fn log_crl_audit_events( + pool: &PgPool, + host_id: Uuid, + old_crl_status: Option<&str>, + new_crl_status: &str, + crl_age_seconds: Option, +) { + let host_id_str = host_id.to_string(); + let old_str = old_crl_status.unwrap_or("null"); + + // Log a transition event if the status changed. + if old_crl_status != Some(new_crl_status) { + let details = serde_json::json!({ + "host_id": host_id_str, + "old_crl_status": old_str, + "new_crl_status": new_crl_status, + "crl_age_seconds": crl_age_seconds, + }); + log_event( + pool, + AuditAction::CrlStatusChanged, + None, // actor_user_id — system-initiated + None, // actor_username + Some("host"), // target_type + Some(&host_id_str), // target_id + details, + None, // ip_address + None, // request_id + ) + .await; + } + + // Log specific events for problematic CRL states. + match new_crl_status { + "expired" => { + let details = serde_json::json!({ + "host_id": host_id_str, + "old_crl_status": old_str, + "new_crl_status": new_crl_status, + "crl_age_seconds": crl_age_seconds, + }); + log_event( + pool, + AuditAction::CrlStaleDetected, + None, + None, + Some("host"), + Some(&host_id_str), + details, + None, + None, + ) + .await; + }, + "invalid" => { + let details = serde_json::json!({ + "host_id": host_id_str, + "old_crl_status": old_str, + "new_crl_status": new_crl_status, + "crl_age_seconds": crl_age_seconds, + }); + log_event( + pool, + AuditAction::CrlInvalid, + None, + None, + Some("host"), + Some(&host_id_str), + details, + None, + None, + ) + .await; + }, + _ => {}, + } } diff --git a/docs/REST_API.md b/docs/REST_API.md index 2c99489..b99af9f 100644 --- a/docs/REST_API.md +++ b/docs/REST_API.md @@ -161,6 +161,33 @@ Fleet status response includes CRL counts: | `crl_invalid` | `integer` | Hosts with CRL status `invalid` (security event) | | `crl_not_reporting` | `integer` | Hosts not reporting CRL status (older agents) | +### CRL Audit Events + +The health poller logs the following system-initiated audit events when a host's CRL status changes: + +| Audit Action | Trigger | Details Fields | +|---|---|---| +| `crl_status_changed` | Any CRL status transition | `host_id`, `old_crl_status`, `new_crl_status`, `crl_age_seconds` | +| `crl_stale_detected` | CRL status becomes `expired` | `host_id`, `old_crl_status`, `new_crl_status`, `crl_age_seconds` | +| `crl_invalid` | CRL status becomes `invalid` | `host_id`, `old_crl_status`, `new_crl_status`, `crl_age_seconds` | + +All CRL audit events use `target_type = "host"` and `target_id = `. Actor fields (`actor_user_id`, `actor_username`) are `null` because these are system-initiated events. + +### CRL Health Aggregation Rules + +The health poller applies the following rules to determine a host's effective health status based on CRL state: + +| CRL Status | Condition | Effective Health Status | +|---|---|---| +| `invalid` | Always | `unreachable` (security event) | +| `expired` | If natural status is `healthy` | `degraded` | +| `missing` | Registered > 24h ago AND natural status is `healthy` | `degraded` | +| `missing` | Registered ≤ 24h ago | Natural status (new agent enrollment) | +| `valid` | Any | Natural status (no override) | +| `null` | Any | Natural status (older agent, not reporting CRL) | + +When CRL status transitions from `invalid`/`expired`/`missing` back to `valid`, the next health poll cycle restores the host to its natural health status based on the agent's health response. + ## 14. Real-Time Updates (WebSocket) | Method | Endpoint | Description | |--------|----------|-------------| diff --git a/migrations/022_crl_audit_actions.sql b/migrations/022_crl_audit_actions.sql new file mode 100644 index 0000000..7cdfb2b --- /dev/null +++ b/migrations/022_crl_audit_actions.sql @@ -0,0 +1,8 @@ +-- Migration: 022_crl_audit_actions +-- Description: Add audit_action enum values for CRL health aggregation events. +-- These are system-initiated events logged by the health poller +-- when a host's CRL status transitions or indicates a problem. + +ALTER TYPE audit_action ADD VALUE IF NOT EXISTS 'crl_status_changed'; +ALTER TYPE audit_action ADD VALUE IF NOT EXISTS 'crl_stale_detected'; +ALTER TYPE audit_action ADD VALUE IF NOT EXISTS 'crl_invalid';