Private
Public Access
1
0

feat: add CRL health aggregation logic and audit events (PR 5 of 6)
All checks were successful
CI Pipeline / Rust Format Check (push) Successful in 5s
CI Pipeline / Clippy Lints (push) Successful in 52s
CI Pipeline / Rust Unit Tests (push) Successful in 1m11s
CI Pipeline / Security Audit (push) Successful in 5s
CI Pipeline / Frontend Lint & Type Check (push) Successful in 16s
CI Pipeline / Build .deb & Release (push) Has been skipped

* feat: add CRL health aggregation logic and audit events (PR 5 of 6)

* style: fix cargo fmt in health_poller.rs

---------

Co-authored-by: Draco Lunaris <331325+Draco-Lunaris@users.noreply.github.com>
This commit is contained in:
Draco-Lunaris-Echo
2026-06-05 16:42:39 -05:00
committed by GitHub
parent ea8337b944
commit 5ab3532833
4 changed files with 316 additions and 105 deletions

View File

@ -57,6 +57,10 @@ pub enum AuditAction {
IpWhitelistUpdated, IpWhitelistUpdated,
OidcTestPerformed, OidcTestPerformed,
OidcDiscoverPerformed, OidcDiscoverPerformed,
// CRL health aggregation events (system-initiated)
CrlStatusChanged,
CrlStaleDetected,
CrlInvalid,
} }
impl AuditAction { impl AuditAction {
@ -100,6 +104,10 @@ impl AuditAction {
Self::IpWhitelistUpdated => "ip_whitelist_updated", Self::IpWhitelistUpdated => "ip_whitelist_updated",
Self::OidcTestPerformed => "oidc_test_performed", Self::OidcTestPerformed => "oidc_test_performed",
Self::OidcDiscoverPerformed => "oidc_discover_performed", Self::OidcDiscoverPerformed => "oidc_discover_performed",
// CRL health aggregation events
Self::CrlStatusChanged => "crl_status_changed",
Self::CrlStaleDetected => "crl_stale_detected",
Self::CrlInvalid => "crl_invalid",
} }
} }
} }

View File

@ -4,11 +4,24 @@
//! `health_poll_interval_secs`, with bounded concurrency controlled by a //! `health_poll_interval_secs`, with bounded concurrency controlled by a
//! [`tokio::sync::Semaphore`]. Also calls `/system/info` to refresh //! [`tokio::sync::Semaphore`]. Also calls `/system/info` to refresh
//! `os_family`, `os_name`, `arch`, and `agent_version` in the hosts table. //! `os_family`, `os_name`, `arch`, and `agent_version` in the hosts table.
//!
//! CRL health aggregation rules (PR 5):
//! - `crl_status = "invalid"` → host health_status overridden to `unreachable`
//! - `crl_status = "expired"` → host health_status overridden to `degraded` (if currently healthy)
//! - `crl_status = "missing"` AND registered > 24h ago → host health_status overridden to `degraded` (if currently healthy)
//! - `crl_status = "valid"` or NULL → no override
//!
//! Audit events are logged for CRL state transitions.
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, Duration, Utc};
use pm_agent_client::{AgentClient, AgentClientError}; use pm_agent_client::{AgentClient, AgentClientError};
use pm_core::{config::AppConfig, models::HostHealthStatus}; use pm_core::{
audit::{log_event, AuditAction},
config::AppConfig,
models::HostHealthStatus,
};
use sqlx::{FromRow, PgPool}; use sqlx::{FromRow, PgPool};
use tokio::{sync::Semaphore, time}; use tokio::{sync::Semaphore, time};
use uuid::Uuid; use uuid::Uuid;
@ -21,6 +34,10 @@ struct HostRow {
id: Uuid, id: Uuid,
ip_address: String, ip_address: String,
agent_port: i32, agent_port: i32,
/// Current CRL status from the hosts table (for transition detection).
crl_status: Option<String>,
/// When the host was first registered (for enrollment age checks).
registered_at: DateTime<Utc>,
} }
/// Run the health poller loop indefinitely. /// Run the health poller loop indefinitely.
@ -50,9 +67,9 @@ pub async fn run_health_poller(pool: PgPool, config: Arc<AppConfig>) {
let client_key = Arc::new(certs.client_key); let client_key = Arc::new(certs.client_key);
let ca_cert = Arc::new(certs.ca_cert); let ca_cert = Arc::new(certs.ca_cert);
// Fetch all hosts. // Fetch all hosts with CRL status and registration time.
let hosts: Vec<HostRow> = match sqlx::query_as( let hosts: Vec<HostRow> = match sqlx::query_as(
"SELECT id, host(ip_address)::text AS ip_address, agent_port FROM hosts ORDER BY id", "SELECT id, host(ip_address)::text AS ip_address, agent_port, crl_status, registered_at FROM hosts ORDER BY id",
) )
.fetch_all(&pool) .fetch_all(&pool)
.await .await
@ -118,7 +135,8 @@ pub async fn run_health_poller(pool: PgPool, config: Arc<AppConfig>) {
/// ///
/// Also updates `agent_version` from the health response, /// Also updates `agent_version` from the health response,
/// `os_family`/`os_name`/`arch` from the `/system/info` endpoint when available, /// `os_family`/`os_name`/`arch` from the `/system/info` endpoint when available,
/// and CRL status fields from the health response when reported by the agent. /// CRL status fields from the health response when reported by the agent,
/// and applies CRL health aggregation rules.
async fn poll_host_health( async fn poll_host_health(
pool: PgPool, pool: PgPool,
host: HostRow, host: HostRow,
@ -127,108 +145,119 @@ async fn poll_host_health(
ca_cert: &[u8], ca_cert: &[u8],
) -> HostHealthStatus { ) -> HostHealthStatus {
// Determine status, payload, agent version, optional system info, and CRL fields. // Determine status, payload, agent version, optional system info, and CRL fields.
let (status, payload, agent_version, sys_info, crl_status, crl_age_seconds, crl_next_update) = let (
match AgentClient::new( natural_status,
&host.ip_address, payload,
host.agent_port as u16, agent_version,
client_cert, sys_info,
client_key, crl_status,
ca_cert, crl_age_seconds,
) { crl_next_update,
Err(e) => { ) = match AgentClient::new(
tracing::warn!( &host.ip_address,
host_id = %host.id, host.agent_port as u16,
error = %e, client_cert,
"Health poller: failed to build AgentClient" client_key,
); ca_cert,
( ) {
HostHealthStatus::Unreachable, Err(e) => {
serde_json::Value::Object(Default::default()), tracing::warn!(
None, host_id = %host.id,
None, error = %e,
None, "Health poller: failed to build AgentClient"
None, );
None, (
) HostHealthStatus::Unreachable,
}, serde_json::Value::Object(Default::default()),
Ok(client) => { None,
let (status, payload, version, crl_status, crl_age, crl_next) = match client None,
.health() None,
.await None,
{ None,
Ok(data) => { )
let payload = serde_json::to_value(&data).unwrap_or_default(); },
let crl_status = data.crl_status.clone(); Ok(client) => {
let crl_age = data.crl_age_seconds; let (status, payload, version, crl_status, crl_age, crl_next) = match client
let crl_next = data.crl_next_update.clone(); .health()
( .await
HostHealthStatus::Healthy, {
payload, Ok(data) => {
Some(data.version), let payload = serde_json::to_value(&data).unwrap_or_default();
crl_status, let crl_status = data.crl_status.clone();
crl_age, let crl_age = data.crl_age_seconds;
crl_next, let crl_next = data.crl_next_update.clone();
) (
}, HostHealthStatus::Healthy,
Err(AgentClientError::Timeout) => { payload,
tracing::warn!(host_id = %host.id, "Health poller: agent timed out"); Some(data.version),
( crl_status,
HostHealthStatus::Unreachable, crl_age,
serde_json::Value::Object(Default::default()), crl_next,
None, )
None, },
None, Err(AgentClientError::Timeout) => {
None, tracing::warn!(host_id = %host.id, "Health poller: agent timed out");
) (
}, HostHealthStatus::Unreachable,
Err(AgentClientError::Connect(_)) => { serde_json::Value::Object(Default::default()),
tracing::warn!(host_id = %host.id, "Health poller: agent connection refused"); None,
( None,
HostHealthStatus::Unreachable, None,
serde_json::Value::Object(Default::default()), None,
None, )
None, },
None, Err(AgentClientError::Connect(_)) => {
None, tracing::warn!(host_id = %host.id, "Health poller: agent connection refused");
) (
}, HostHealthStatus::Unreachable,
serde_json::Value::Object(Default::default()),
None,
None,
None,
None,
)
},
Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Health poller: agent error");
(
HostHealthStatus::Degraded,
serde_json::Value::Object(Default::default()),
None,
None,
None,
None,
)
},
};
// Try to fetch system info for OS/arch details (best-effort).
let sys_info = if status != HostHealthStatus::Unreachable {
match client.system_info().await {
Ok(info) => Some(info),
Err(e) => { Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Health poller: agent error"); tracing::debug!(
( host_id = %host.id,
HostHealthStatus::Degraded, error = %e,
serde_json::Value::Object(Default::default()), "Health poller: failed to get system info (non-fatal)"
None, );
None, None
None,
None,
)
}, },
}; }
} else {
None
};
// Try to fetch system info for OS/arch details (best-effort). (
let sys_info = if status != HostHealthStatus::Unreachable { status, payload, version, sys_info, crl_status, crl_age, crl_next,
match client.system_info().await { )
Ok(info) => Some(info), },
Err(e) => { };
tracing::debug!(
host_id = %host.id,
error = %e,
"Health poller: failed to get system info (non-fatal)"
);
None
},
}
} else {
None
};
( // Apply CRL health aggregation rules to determine the effective status.
status, payload, version, sys_info, crl_status, crl_age, crl_next, // Only apply when the agent reported a CRL status (non-NULL).
) let effective_status = apply_crl_health_rules(&natural_status, &crl_status, host.registered_at);
},
};
// Insert into host_health_data. // Insert into host_health_data with the natural (pre-aggregation) status.
if let Err(e) = sqlx::query( if let Err(e) = sqlx::query(
r#" r#"
INSERT INTO host_health_data (host_id, status, payload) INSERT INTO host_health_data (host_id, status, payload)
@ -236,7 +265,7 @@ async fn poll_host_health(
"#, "#,
) )
.bind(host.id) .bind(host.id)
.bind(&status) .bind(&natural_status)
.bind(&payload) .bind(&payload)
.execute(&pool) .execute(&pool)
.await .await
@ -255,7 +284,8 @@ async fn poll_host_health(
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.to_utc()); .map(|dt| dt.to_utc());
// Update hosts table with health status, agent version, OS details, and CRL fields. // Update hosts table with the effective (post-aggregation) health status,
// agent version, OS details, and CRL fields.
// COALESCE preserves existing values when new data is unavailable. // COALESCE preserves existing values when new data is unavailable.
if let Err(e) = sqlx::query( if let Err(e) = sqlx::query(
r#" r#"
@ -272,7 +302,7 @@ async fn poll_host_health(
"#, "#,
) )
.bind(host.id) .bind(host.id)
.bind(&status) .bind(&effective_status)
.bind(&agent_version) .bind(&agent_version)
.bind(sys_info.as_ref().map(|i| i.os.as_str())) .bind(sys_info.as_ref().map(|i| i.os.as_str()))
.bind(os_name_from_sysinfo) .bind(os_name_from_sysinfo)
@ -284,7 +314,145 @@ async fn poll_host_health(
.await .await
{ {
tracing::error!(host_id = %host.id, error = %e, "Health poller: failed to update host status"); tracing::error!(host_id = %host.id, error = %e, "Health poller: failed to update host status");
// Don't log audit events if the DB update failed.
return effective_status;
} }
status // Log CRL audit events after successful database update.
if let Some(ref new_crl) = crl_status {
log_crl_audit_events(
&pool,
host.id,
host.crl_status.as_deref(),
new_crl,
crl_age_seconds,
)
.await;
}
effective_status
}
/// Apply CRL health aggregation rules to determine the effective health status.
///
/// Rules:
/// - `crl_status = "invalid"` → `Unreachable` (security event, always overrides)
/// - `crl_status = "expired"` → `Degraded` (only if natural status is `Healthy`)
/// - `crl_status = "missing"` AND registered > 24h ago → `Degraded` (only if natural status is `Healthy`)
/// - `crl_status = "valid"` or NULL → no override
fn apply_crl_health_rules(
natural_status: &HostHealthStatus,
crl_status: &Option<String>,
registered_at: DateTime<Utc>,
) -> HostHealthStatus {
let Some(crl) = crl_status else {
// Older agent not reporting CRL — don't modify health status.
return natural_status.clone();
};
match crl.as_str() {
"invalid" => HostHealthStatus::Unreachable,
"expired" => {
if *natural_status == HostHealthStatus::Healthy {
HostHealthStatus::Degraded
} else {
natural_status.clone()
}
},
"missing" => {
let age = Utc::now() - registered_at;
if age > Duration::hours(24) && *natural_status == HostHealthStatus::Healthy {
HostHealthStatus::Degraded
} else {
natural_status.clone()
}
},
// "valid" or any other value — no override
_ => natural_status.clone(),
}
}
/// Log audit events for CRL state transitions.
///
/// Called after the hosts table has been successfully updated.
/// Logs:
/// - `CrlStatusChanged` when the CRL status transitions to a different value
/// - `CrlStaleDetected` when CRL status becomes "expired"
/// - `CrlInvalid` when CRL status becomes "invalid"
async fn log_crl_audit_events(
pool: &PgPool,
host_id: Uuid,
old_crl_status: Option<&str>,
new_crl_status: &str,
crl_age_seconds: Option<i64>,
) {
let host_id_str = host_id.to_string();
let old_str = old_crl_status.unwrap_or("null");
// Log a transition event if the status changed.
if old_crl_status != Some(new_crl_status) {
let details = serde_json::json!({
"host_id": host_id_str,
"old_crl_status": old_str,
"new_crl_status": new_crl_status,
"crl_age_seconds": crl_age_seconds,
});
log_event(
pool,
AuditAction::CrlStatusChanged,
None, // actor_user_id — system-initiated
None, // actor_username
Some("host"), // target_type
Some(&host_id_str), // target_id
details,
None, // ip_address
None, // request_id
)
.await;
}
// Log specific events for problematic CRL states.
match new_crl_status {
"expired" => {
let details = serde_json::json!({
"host_id": host_id_str,
"old_crl_status": old_str,
"new_crl_status": new_crl_status,
"crl_age_seconds": crl_age_seconds,
});
log_event(
pool,
AuditAction::CrlStaleDetected,
None,
None,
Some("host"),
Some(&host_id_str),
details,
None,
None,
)
.await;
},
"invalid" => {
let details = serde_json::json!({
"host_id": host_id_str,
"old_crl_status": old_str,
"new_crl_status": new_crl_status,
"crl_age_seconds": crl_age_seconds,
});
log_event(
pool,
AuditAction::CrlInvalid,
None,
None,
Some("host"),
Some(&host_id_str),
details,
None,
None,
)
.await;
},
_ => {},
}
} }

View File

@ -161,6 +161,33 @@ Fleet status response includes CRL counts:
| `crl_invalid` | `integer` | Hosts with CRL status `invalid` (security event) | | `crl_invalid` | `integer` | Hosts with CRL status `invalid` (security event) |
| `crl_not_reporting` | `integer` | Hosts not reporting CRL status (older agents) | | `crl_not_reporting` | `integer` | Hosts not reporting CRL status (older agents) |
### CRL Audit Events
The health poller logs the following system-initiated audit events when a host's CRL status changes:
| Audit Action | Trigger | Details Fields |
|---|---|---|
| `crl_status_changed` | Any CRL status transition | `host_id`, `old_crl_status`, `new_crl_status`, `crl_age_seconds` |
| `crl_stale_detected` | CRL status becomes `expired` | `host_id`, `old_crl_status`, `new_crl_status`, `crl_age_seconds` |
| `crl_invalid` | CRL status becomes `invalid` | `host_id`, `old_crl_status`, `new_crl_status`, `crl_age_seconds` |
All CRL audit events use `target_type = "host"` and `target_id = <host_id>`. Actor fields (`actor_user_id`, `actor_username`) are `null` because these are system-initiated events.
### CRL Health Aggregation Rules
The health poller applies the following rules to determine a host's effective health status based on CRL state:
| CRL Status | Condition | Effective Health Status |
|---|---|---|
| `invalid` | Always | `unreachable` (security event) |
| `expired` | If natural status is `healthy` | `degraded` |
| `missing` | Registered > 24h ago AND natural status is `healthy` | `degraded` |
| `missing` | Registered ≤ 24h ago | Natural status (new agent enrollment) |
| `valid` | Any | Natural status (no override) |
| `null` | Any | Natural status (older agent, not reporting CRL) |
When CRL status transitions from `invalid`/`expired`/`missing` back to `valid`, the next health poll cycle restores the host to its natural health status based on the agent's health response.
## 14. Real-Time Updates (WebSocket) ## 14. Real-Time Updates (WebSocket)
| Method | Endpoint | Description | | Method | Endpoint | Description |
|--------|----------|-------------| |--------|----------|-------------|

View File

@ -0,0 +1,8 @@
-- Migration: 022_crl_audit_actions
-- Description: Add audit_action enum values for CRL health aggregation events.
-- These are system-initiated events logged by the health poller
-- when a host's CRL status transitions or indicates a problem.
ALTER TYPE audit_action ADD VALUE IF NOT EXISTS 'crl_status_changed';
ALTER TYPE audit_action ADD VALUE IF NOT EXISTS 'crl_stale_detected';
ALTER TYPE audit_action ADD VALUE IF NOT EXISTS 'crl_invalid';