From 93828e19768ba05352b2f6dd503f4d2ef12bae8d Mon Sep 17 00:00:00 2001 From: Echo Date: Tue, 5 May 2026 14:10:37 +0000 Subject: [PATCH] feat: health check configuration and worker engine (Phase 3+4) - Added health_check_poller.rs: periodic service/HTTP health checks - Added pre-patch health gate in job_executor.rs - Added waiting_health_check job status (migration 008) - Added health_check_status to HostSummary and hosts API - Added health check types and API functions to frontend - Added health check UI section to HostDetailPage - Added health check status indicators to HostsPage and PatchDeploymentPage - Added serde default for health_check_poll_interval_secs - Fixed missing AgentClient import in health_check_poller.rs - Fixed missing ws_relay import in main.rs - Fixed missing closing paren in retry_pending_jobs SQL - Added ReadWritePaths for /etc/patch-manager/keys in systemd services --- Cargo.lock | 105 ++ Cargo.toml | 1 + config/config.example.toml | 4 + crates/pm-agent-client/src/client.rs | 9 +- crates/pm-agent-client/src/lib.rs | 2 +- crates/pm-agent-client/src/types.rs | 17 + crates/pm-core/Cargo.toml | 2 + crates/pm-core/src/audit.rs | 6 + crates/pm-core/src/config.rs | 6 + crates/pm-core/src/crypto.rs | 80 ++ crates/pm-core/src/lib.rs | 10 +- crates/pm-core/src/models.rs | 70 ++ crates/pm-web/src/main.rs | 1 + crates/pm-web/src/routes/health_checks.rs | 1042 +++++++++++++++++++ crates/pm-web/src/routes/hosts.rs | 2 + crates/pm-web/src/routes/mod.rs | 1 + crates/pm-worker/Cargo.toml | 1 + crates/pm-worker/src/health_check_poller.rs | 471 +++++++++ crates/pm-worker/src/job_executor.rs | 97 +- crates/pm-worker/src/main.rs | 7 +- frontend/src/api/client.ts | 25 + frontend/src/pages/HostDetailPage.tsx | 413 +++++++- frontend/src/pages/HostsPage.tsx | 11 + frontend/src/pages/PatchDeploymentPage.tsx | 14 +- frontend/src/types/index.ts | 55 + migrations/007_health_checks.sql | 42 + migrations/008_health_check_worker.sql | 4 + tasks/todo.md | 278 ++++- 28 files changed, 2726 insertions(+), 50 deletions(-) create mode 100644 crates/pm-core/src/crypto.rs create mode 100644 crates/pm-web/src/routes/health_checks.rs create mode 100644 crates/pm-worker/src/health_check_poller.rs create mode 100644 migrations/007_health_checks.sql create mode 100644 migrations/008_health_check_worker.sql diff --git a/Cargo.lock b/Cargo.lock index b3ccc1f..9349625 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,41 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "aes-gcm" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -400,6 +435,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "cmake" version = "0.1.58" @@ -572,6 +617,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", + "rand_core 0.6.4", "typenum", ] @@ -596,6 +642,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -1020,6 +1075,16 @@ dependencies = [ "wasip3", ] +[[package]] +name = "ghash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "gif" version = "0.12.0" @@ -1458,6 +1523,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "generic-array", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -1878,6 +1952,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "openssl" version = "0.10.78" @@ -2195,11 +2275,13 @@ dependencies = [ name = "pm-core" version = "0.1.0" dependencies = [ + "aes-gcm", "anyhow", "axum", "chrono", "config", "hex", + "rand 0.8.6", "serde", "serde_json", "sha2", @@ -2280,6 +2362,7 @@ dependencies = [ "lettre", "pm-agent-client", "pm-core", + "reqwest", "rustls", "rustls-pemfile", "serde", @@ -2320,6 +2403,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "polyval" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "pom" version = "3.4.0" @@ -3881,6 +3976,16 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 79cda0f..b4905be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,5 +77,6 @@ totp-rs = { version = "5", features = ["gen_secret", "otpauth"] } base64 = { version = "0.22" } hex = { version = "0.4" } sha2 = { version = "0.10" } +aes-gcm = { version = "0.10" } ipnet = { version = "2" } url = { version = "2" } diff --git a/config/config.example.toml b/config/config.example.toml index 3f7829c..450228b 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -42,6 +42,10 @@ health_poll_interval_secs = 300 # Agent patch data poll interval (seconds). Default: 1800 = 30 minutes patch_poll_interval_secs = 1800 +# Health check poll interval (seconds). Default: 300 = 5 minutes +# Controls how often configured service/HTTP health checks are evaluated. +health_check_poll_interval_secs = 300 + # Maximum concurrent mTLS agent calls (Tokio Semaphore) max_concurrent_agent_calls = 64 diff --git a/crates/pm-agent-client/src/client.rs b/crates/pm-agent-client/src/client.rs index 2c2c7fb..85dbb9d 100644 --- a/crates/pm-agent-client/src/client.rs +++ b/crates/pm-agent-client/src/client.rs @@ -30,7 +30,7 @@ use crate::{ error::AgentClientError, types::{ AgentEnvelope, AgentJobStatus, ApplyPatchesRequest, ApplyPatchesResponse, HealthData, - PackagesData, PatchesData, RollbackResponse, SystemInfoData, + PackagesData, PatchesData, RollbackResponse, ServiceStatusData, SystemInfoData, }, }; @@ -221,10 +221,17 @@ impl AgentClient { .await } + /// `GET /api/v1/system/services/{name}` — check status of a specific service on the agent. + #[instrument(skip(self), fields(base_url = %self.base_url, service_name = %service_name))] + pub async fn service_status(&self, service_name: &str) -> Result { + self.get(&format!("system/services/{}", service_name), &[]).await + } + // -------------------------------------------------------- // Private POST helper // -------------------------------------------------------- + /// Execute a POST request against `{base_url}/{path}`, serialize `body` as /// JSON, deserialize the [`AgentEnvelope`], and extract the `data` field — /// or propagate an [`AgentClientError::ApiError`]. diff --git a/crates/pm-agent-client/src/lib.rs b/crates/pm-agent-client/src/lib.rs index 3fd1fad..77d329d 100644 --- a/crates/pm-agent-client/src/lib.rs +++ b/crates/pm-agent-client/src/lib.rs @@ -39,5 +39,5 @@ pub use error::AgentClientError; /// Response envelope and all data types. pub use types::{ AgentEnvelope, AgentErrorBody, HealthData, Package, PackagesData, Patch, PatchesData, - SystemInfoData, + RollbackResponse, ServiceStatusData, SystemInfoData, }; diff --git a/crates/pm-agent-client/src/types.rs b/crates/pm-agent-client/src/types.rs index efb3c76..1b26da0 100644 --- a/crates/pm-agent-client/src/types.rs +++ b/crates/pm-agent-client/src/types.rs @@ -193,6 +193,23 @@ pub struct AgentJobStatus { pub completed_at: Option>, } +// ============================================================ +// GET /api/v1/system/services/{name} +// ============================================================ + +/// Payload returned by `GET /api/v1/system/services/{name}`. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ServiceStatusData { + /// Service name. + pub name: String, + /// Service status string (e.g. `"running"`, `"stopped"`, `"failed"`). + pub status: String, + /// Whether the service is considered healthy. + pub healthy: bool, + /// Seconds elapsed since the service started (`null` if not running). + pub uptime_secs: Option, +} + // ============================================================ // POST /api/v1/jobs/{id}/rollback // ============================================================ diff --git a/crates/pm-core/Cargo.toml b/crates/pm-core/Cargo.toml index 49318b8..94d9edf 100644 --- a/crates/pm-core/Cargo.toml +++ b/crates/pm-core/Cargo.toml @@ -22,3 +22,5 @@ config = { workspace = true } axum = { workspace = true } sha2 = { workspace = true } hex = { workspace = true } +aes-gcm = { workspace = true } +rand = { workspace = true } diff --git a/crates/pm-core/src/audit.rs b/crates/pm-core/src/audit.rs index f07f1de..1878cd3 100644 --- a/crates/pm-core/src/audit.rs +++ b/crates/pm-core/src/audit.rs @@ -47,6 +47,9 @@ pub enum AuditAction { PatchJobCompleted, PatchJobFailed, MaintenanceWindowReminder, + HealthCheckCreated, + HealthCheckUpdated, + HealthCheckDeleted, } impl AuditAction { @@ -80,6 +83,9 @@ impl AuditAction { Self::PatchJobCompleted => "patch_job_completed", Self::PatchJobFailed => "patch_job_failed", Self::MaintenanceWindowReminder => "maintenance_window_reminder", + Self::HealthCheckCreated => "health_check_created", + Self::HealthCheckUpdated => "health_check_updated", + Self::HealthCheckDeleted => "health_check_deleted", } } } diff --git a/crates/pm-core/src/config.rs b/crates/pm-core/src/config.rs index 4f54bcf..25f2165 100644 --- a/crates/pm-core/src/config.rs +++ b/crates/pm-core/src/config.rs @@ -39,6 +39,9 @@ pub struct WorkerConfig { pub health_poll_interval_secs: u64, /// Patch data poll interval in seconds (default: 1800 = 30 min) pub patch_poll_interval_secs: u64, + /// Health check poll interval in seconds (default: 300 = 5 min) + #[serde(default = "default_health_check_poll_interval")] + pub health_check_poll_interval_secs: u64, /// Maximum concurrent agent calls pub max_concurrent_agent_calls: usize, /// Worker heartbeat interval in seconds @@ -98,6 +101,8 @@ impl AppConfig { } } +fn default_health_check_poll_interval() -> u64 { 300 } + impl Default for AppConfig { fn default() -> Self { Self { @@ -115,6 +120,7 @@ impl Default for AppConfig { worker: WorkerConfig { health_poll_interval_secs: 300, patch_poll_interval_secs: 1800, + health_check_poll_interval_secs: 300, max_concurrent_agent_calls: 64, heartbeat_interval_secs: 30, ws_relay_poll_interval_secs: 10, diff --git a/crates/pm-core/src/crypto.rs b/crates/pm-core/src/crypto.rs new file mode 100644 index 0000000..5eb60bb --- /dev/null +++ b/crates/pm-core/src/crypto.rs @@ -0,0 +1,80 @@ +//! AES-256-GCM encryption for sensitive health check credentials. +//! +//! Uses a per-install key stored at `/etc/patch-manager/keys/health-check.key`. + +use aes_gcm::{ + aead::{Aead, KeyInit, OsRng}, + Aes256Gcm, Nonce, +}; +use rand::RngCore; +use std::fs; +use std::path::Path; + +pub const KEY_PATH: &str = "/etc/patch-manager/keys/health-check.key"; + +/// Load or create the per-install encryption key. +/// If the key file doesn't exist, generates a new 256-bit key and saves it. +pub fn load_or_create_key(path: &Path) -> Result<[u8; 32], CryptoError> { + if path.exists() { + let key_bytes = fs::read(path).map_err(CryptoError::Io)?; + if key_bytes.len() != 32 { + return Err(CryptoError::InvalidKeyLength(key_bytes.len())); + } + let mut key = [0u8; 32]; + key.copy_from_slice(&key_bytes); + Ok(key) + } else { + let mut key = [0u8; 32]; + OsRng.fill_bytes(&mut key); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).map_err(CryptoError::Io)?; + } + fs::write(path, &key).map_err(CryptoError::Io)?; + // Set permissions to 0600 (owner read/write only) + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + fs::set_permissions(path, fs::Permissions::from_mode(0o600)) + .map_err(CryptoError::Io)?; + } + Ok(key) + } +} + +/// Encrypt plaintext with AES-256-GCM. Returns (ciphertext, nonce). +pub fn encrypt(plaintext: &str, key: &[u8; 32]) -> Result<(Vec, Vec), CryptoError> { + let cipher = Aes256Gcm::new_from_slice(key).map_err(|e| CryptoError::KeyInit(e.to_string()))?; + let mut nonce_bytes = [0u8; 12]; + OsRng.fill_bytes(&mut nonce_bytes); + let nonce = Nonce::from_slice(&nonce_bytes); + let ciphertext = cipher + .encrypt(nonce, plaintext.as_bytes()) + .map_err(|_| CryptoError::EncryptionFailed)?; + Ok((ciphertext, nonce_bytes.to_vec())) +} + +/// Decrypt AES-256-GCM ciphertext with the given nonce. +pub fn decrypt(ciphertext: &[u8], nonce: &[u8], key: &[u8; 32]) -> Result { + let cipher = Aes256Gcm::new_from_slice(key).map_err(|e| CryptoError::KeyInit(e.to_string()))?; + let nonce = Nonce::from_slice(nonce); + let plaintext = cipher + .decrypt(nonce, ciphertext) + .map_err(|_| CryptoError::DecryptionFailed)?; + String::from_utf8(plaintext).map_err(CryptoError::Utf8) +} + +#[derive(Debug, thiserror::Error)] +pub enum CryptoError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Invalid key length: expected 32 bytes, got {0}")] + InvalidKeyLength(usize), + #[error("Key init error: {0}")] + KeyInit(String), + #[error("Encryption failed")] + EncryptionFailed, + #[error("Decryption failed")] + DecryptionFailed, + #[error("UTF-8 error: {0}")] + Utf8(#[from] std::string::FromUtf8Error), +} diff --git a/crates/pm-core/src/lib.rs b/crates/pm-core/src/lib.rs index 2f12b50..ae7bb68 100644 --- a/crates/pm-core/src/lib.rs +++ b/crates/pm-core/src/lib.rs @@ -1,5 +1,6 @@ pub mod audit; pub mod config; +pub mod crypto; pub mod db; pub mod error; pub mod logging; @@ -8,11 +9,14 @@ pub mod request_id; // Re-export commonly used types pub use config::AppConfig; +pub use crypto::{CryptoError, KEY_PATH, decrypt, encrypt, load_or_create_key}; pub use error::{AppError, ErrorResponse}; pub use models::{ - AuthProvider, CreateGroupRequest, CreateHostRequest, CreateUserRequest, DiscoveryCidrRequest, - DiscoveryResult, Group, Host, HostHealthStatus, HostSummary, RegisterDiscoveredRequest, - UpdateGroupRequest, UpdateUserRequest, User, UserRole as DbUserRole, + AuthProvider, CreateGroupRequest, CreateHealthCheckRequest, CreateHostRequest, + CreateUserRequest, DiscoveryCidrRequest, DiscoveryResult, Group, HealthCheck, + HealthCheckResult, HealthCheckWithResult, Host, HostHealthStatus, HostSummary, + RegisterDiscoveredRequest, UpdateGroupRequest, UpdateHealthCheckRequest, UpdateUserRequest, + User, UserRole as DbUserRole, }; // Re-export audit integrity types diff --git a/crates/pm-core/src/models.rs b/crates/pm-core/src/models.rs index 24bb65b..e4e4984 100644 --- a/crates/pm-core/src/models.rs +++ b/crates/pm-core/src/models.rs @@ -113,9 +113,79 @@ pub struct HostSummary { pub health_status: HostHealthStatus, pub agent_version: Option, pub patches_missing: i32, + pub health_check_status: Option, pub registered_at: DateTime, } +// ============================================================ +// Health Checks +// ============================================================ + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct HealthCheck { + pub id: Uuid, + pub host_id: Uuid, + pub name: String, + pub check_type: String, // "service" or "http" + pub enabled: bool, + // Service check fields + pub service_name: Option, + // HTTP check fields + pub url: Option, + pub expected_body: Option, + pub ignore_cert_errors: bool, + pub basic_auth_user: Option, + // basic_auth_pass_encrypted and nonce NOT exposed in API responses + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthCheckWithResult { + #[serde(flatten)] + pub check: HealthCheck, + pub last_result: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct HealthCheckResult { + pub id: Uuid, + pub check_id: Uuid, + pub healthy: bool, + pub detail: Option, + pub latency_ms: Option, + pub checked_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateHealthCheckRequest { + pub name: String, + pub check_type: String, // "service" or "http" + pub service_name: Option, + pub url: Option, + pub expected_body: Option, + #[serde(default = "default_true")] + pub ignore_cert_errors: bool, + pub basic_auth_user: Option, + pub basic_auth_pass: Option, // plaintext in request, encrypted before storage +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateHealthCheckRequest { + pub name: Option, + pub enabled: Option, + pub service_name: Option, + pub url: Option, + pub expected_body: Option, + pub ignore_cert_errors: Option, + pub basic_auth_user: Option, + pub basic_auth_pass: Option, // if provided, re-encrypt +} + +fn default_true() -> bool { + true +} + // ============================================================ // Group // ============================================================ diff --git a/crates/pm-web/src/main.rs b/crates/pm-web/src/main.rs index f3646a8..3484dd5 100644 --- a/crates/pm-web/src/main.rs +++ b/crates/pm-web/src/main.rs @@ -189,6 +189,7 @@ pub fn build_router(state: AppState) -> Router { .merge(routes::ws::ticket_router()) // Reports .nest("/reports", routes::reports::router()) + .nest("/hosts/{host_id}/health-checks", routes::health_checks::router()) // Settings (admin-only) .nest("/settings", routes::settings::router()) // Apply auth middleware to all the above diff --git a/crates/pm-web/src/routes/health_checks.rs b/crates/pm-web/src/routes/health_checks.rs new file mode 100644 index 0000000..e48dd7b --- /dev/null +++ b/crates/pm-web/src/routes/health_checks.rs @@ -0,0 +1,1042 @@ +//! Health check management routes. +//! +//! GET /api/v1/hosts/{host_id}/health-checks — list health checks +//! POST /api/v1/hosts/{host_id}/health-checks — create health check +//! GET /api/v1/hosts/{host_id}/health-checks/{check_id} — get health check detail +//! PUT /api/v1/hosts/{host_id}/health-checks/{check_id} — update health check +//! DELETE /api/v1/hosts/{host_id}/health-checks/{check_id} — delete health check +//! POST /api/v1/hosts/{host_id}/health-checks/{check_id}/test — run check immediately + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, + routing::{delete, get, post, put}, + Router, +}; +use pm_auth::rbac::AuthUser; +use pm_core::{ + audit::{log_event, AuditAction}, + crypto, + models::{ + CreateHealthCheckRequest, HealthCheck, HealthCheckResult, HealthCheckWithResult, + UpdateHealthCheckRequest, + }, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::path::PathBuf; +use uuid::Uuid; + +use crate::AppState; + +pub fn router() -> Router { + Router::new() + .route("/", get(list_health_checks).post(create_health_check)) + .route( + "/{check_id}", + get(get_health_check) + .put(update_health_check) + .delete(delete_health_check), + ) + .route("/{check_id}/test", post(test_health_check)) +} + +// ── Response types ──────────────────────────────────────────────────────────── + +#[derive(Debug, Serialize)] +struct HealthCheckListResponse { + checks: Vec, + total: i64, +} + +#[derive(Debug, Serialize)] +struct HealthCheckTestResponse { + healthy: bool, + detail: String, + latency_ms: Option, +} + +// ── RBAC helper ────────────────────────────────────────────────────────────── + +async fn operator_can_access_host( + pool: &sqlx::PgPool, + user_id: Uuid, + host_id: Uuid, +) -> Result { + let in_group: bool = sqlx::query_scalar( + r#" + SELECT EXISTS ( + SELECT 1 FROM host_groups hg + JOIN user_groups ug ON ug.group_id = hg.group_id + WHERE hg.host_id = $1 AND ug.user_id = $2 + ) + "#, + ) + .bind(host_id) + .bind(user_id) + .fetch_one(pool) + .await + .unwrap_or(false); + + if in_group { + return Ok(true); + } + + // Also allow if host has no groups (ungrouped) + let has_groups: bool = sqlx::query_scalar( + "SELECT EXISTS (SELECT 1 FROM host_groups WHERE host_id = $1)", + ) + .bind(host_id) + .fetch_one(pool) + .await + .unwrap_or(true); + + Ok(!has_groups) +} + +// ── GET /api/v1/hosts/{host_id}/health-checks ──────────────────────────────── + +async fn list_health_checks( + State(state): State, + auth: AuthUser, + Path(host_id): Path, +) -> Result, (StatusCode, Json)> { + // RBAC check for operators + if !auth.role.is_admin() { + let can_access = operator_can_access_host(&state.db, auth.user_id, host_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "RBAC check failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + if !can_access { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + )); + } + } + + // Verify host exists + let host_exists: bool = sqlx::query_scalar( + "SELECT EXISTS (SELECT 1 FROM hosts WHERE id = $1)", + ) + .bind(host_id) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to check host"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + if !host_exists { + return Err(( + StatusCode::NOT_FOUND, + Json(json!({ "error": { "code": "not_found", "message": "Host not found" } })), + )); + } + + // Fetch health checks with latest results + let checks: Vec = sqlx::query_as::<_, HealthCheck>( + r#" + SELECT id, host_id, name, check_type, enabled, + service_name, url, expected_body, ignore_cert_errors, basic_auth_user, + created_at, updated_at + FROM host_health_checks + WHERE host_id = $1 + ORDER BY created_at + "#, + ) + .bind(host_id) + .fetch_all(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to list health checks"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + let total = checks.len() as i64; + + // Fetch latest result for each check + let mut checks_with_results = Vec::with_capacity(checks.len()); + for check in checks { + let last_result: Option = sqlx::query_as::<_, HealthCheckResult>( + r#" + SELECT id, check_id, healthy, detail, latency_ms, checked_at + FROM host_health_check_results + WHERE check_id = $1 + ORDER BY checked_at DESC + LIMIT 1 + "#, + ) + .bind(check.id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to get latest result"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + checks_with_results.push(HealthCheckWithResult { + check, + last_result, + }); + } + + Ok(Json(HealthCheckListResponse { + checks: checks_with_results, + total, + })) +} + +// ── POST /api/v1/hosts/{host_id}/health-checks ─────────────────────────────── + +async fn create_health_check( + State(state): State, + auth: AuthUser, + Path(host_id): Path, + Json(req): Json, +) -> Result<(StatusCode, Json), (StatusCode, Json)> { + // RBAC check for operators + if !auth.role.is_admin() { + let can_access = operator_can_access_host(&state.db, auth.user_id, host_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "RBAC check failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + if !can_access { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + )); + } + } + + // Validate check_type + if req.check_type != "service" && req.check_type != "http" { + return Err(( + StatusCode::BAD_REQUEST, + Json(json!({ "error": { "code": "invalid_check_type", "message": "check_type must be 'service' or 'http'" } })), + )); + } + + // Validate fields based on check_type + if req.check_type == "service" && req.service_name.is_none() { + return Err(( + StatusCode::BAD_REQUEST, + Json(json!({ "error": { "code": "validation_error", "message": "service_name is required for service checks" } })), + )); + } + if req.check_type == "http" && (req.url.is_none() || req.expected_body.is_none()) { + return Err(( + StatusCode::BAD_REQUEST, + Json(json!({ "error": { "code": "validation_error", "message": "url and expected_body are required for http checks" } })), + )); + } + + // Enforce max 5 per host + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM host_health_checks WHERE host_id = $1", + ) + .bind(host_id) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to count health checks"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + if count >= 5 { + return Err(( + StatusCode::BAD_REQUEST, + Json(json!({ "error": { "code": "limit_exceeded", "message": "Maximum 5 health checks per host" } })), + )); + } + + // Encrypt basic_auth_pass if provided + let (pass_encrypted, pass_nonce) = if let Some(ref pass) = req.basic_auth_pass { + let key_path = PathBuf::from(crypto::KEY_PATH); + let key = crypto::load_or_create_key(&key_path).map_err(|e| { + tracing::error!(error = %e, "Failed to load encryption key"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Encryption key error" } })), + ) + })?; + let (enc, nonce) = crypto::encrypt(pass, &key).map_err(|e| { + tracing::error!(error = %e, "Failed to encrypt password"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Encryption error" } })), + ) + })?; + (Some(enc), Some(nonce)) + } else { + (None, None) + }; + + // Insert health check + let check_id: Uuid = sqlx::query_scalar( + r#" + INSERT INTO host_health_checks ( + host_id, name, check_type, enabled, + service_name, url, expected_body, ignore_cert_errors, + basic_auth_user, basic_auth_pass_encrypted, basic_auth_pass_nonce + ) VALUES ($1, $2, $3, true, $4, $5, $6, $7, $8, $9, $10) + RETURNING id + "#, + ) + .bind(host_id) + .bind(&req.name) + .bind(&req.check_type) + .bind(&req.service_name) + .bind(&req.url) + .bind(&req.expected_body) + .bind(req.ignore_cert_errors) + .bind(&req.basic_auth_user) + .bind(pass_encrypted) + .bind(pass_nonce) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to create health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + // Audit log + log_event( + &state.db, + AuditAction::HealthCheckCreated, + Some(auth.user_id), + None, + Some("host"), + Some(&host_id.to_string()), + json!({ + "check_id": check_id, + "name": req.name, + "check_type": req.check_type, + }), + None, + None, + ).await; + + Ok(( + StatusCode::CREATED, + Json(json!({ + "id": check_id, + "host_id": host_id, + "name": req.name, + "check_type": req.check_type, + "enabled": true, + })), + )) +} + +// ── GET /api/v1/hosts/{host_id}/health-checks/{check_id} ────────────────────── + +async fn get_health_check( + State(state): State, + auth: AuthUser, + Path((host_id, check_id)): Path<(Uuid, Uuid)>, +) -> Result, (StatusCode, Json)> { + // RBAC check for operators + if !auth.role.is_admin() { + let can_access = operator_can_access_host(&state.db, auth.user_id, host_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "RBAC check failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + if !can_access { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + )); + } + } + + let check: HealthCheck = sqlx::query_as::<_, HealthCheck>( + r#" + SELECT id, host_id, name, check_type, enabled, + service_name, url, expected_body, ignore_cert_errors, basic_auth_user, + created_at, updated_at + FROM host_health_checks + WHERE id = $1 AND host_id = $2 + "#, + ) + .bind(check_id) + .bind(host_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to get health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })? + .ok_or_else(|| { + ( + StatusCode::NOT_FOUND, + Json(json!({ "error": { "code": "not_found", "message": "Health check not found" } })), + ) + })?; + + let last_result: Option = sqlx::query_as::<_, HealthCheckResult>( + r#" + SELECT id, check_id, healthy, detail, latency_ms, checked_at + FROM host_health_check_results + WHERE check_id = $1 + ORDER BY checked_at DESC + LIMIT 1 + "#, + ) + .bind(check_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to get latest result"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + Ok(Json(HealthCheckWithResult { + check, + last_result, + })) +} + +// ── PUT /api/v1/hosts/{host_id}/health-checks/{check_id} ────────────────────── + +async fn update_health_check( + State(state): State, + auth: AuthUser, + Path((host_id, check_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + // RBAC check for operators + if !auth.role.is_admin() { + let can_access = operator_can_access_host(&state.db, auth.user_id, host_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "RBAC check failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + if !can_access { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + )); + } + } + + // Verify check exists and belongs to host + let exists: bool = sqlx::query_scalar( + "SELECT EXISTS (SELECT 1 FROM host_health_checks WHERE id = $1 AND host_id = $2)", + ) + .bind(check_id) + .bind(host_id) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to check health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + if !exists { + return Err(( + StatusCode::NOT_FOUND, + Json(json!({ "error": { "code": "not_found", "message": "Health check not found" } })), + )); + } + + // Handle basic_auth_pass encryption if provided + let (pass_encrypted, pass_nonce) = if let Some(ref pass) = req.basic_auth_pass { + let key_path = PathBuf::from(crypto::KEY_PATH); + let key = crypto::load_or_create_key(&key_path).map_err(|e| { + tracing::error!(error = %e, "Failed to load encryption key"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Encryption key error" } })), + ) + })?; + let (enc, nonce) = crypto::encrypt(pass, &key).map_err(|e| { + tracing::error!(error = %e, "Failed to encrypt password"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Encryption error" } })), + ) + })?; + (Some(enc), Some(nonce)) + } else { + (None, None) + }; + + // Build dynamic UPDATE query + let mut set_clauses = Vec::new(); + let mut param_idx = 1u32; + + if req.name.is_some() { + set_clauses.push(format!("name = ${}", param_idx)); + param_idx += 1; + } + if req.enabled.is_some() { + set_clauses.push(format!("enabled = ${}", param_idx)); + param_idx += 1; + } + if req.service_name.is_some() { + set_clauses.push(format!("service_name = ${}", param_idx)); + param_idx += 1; + } + if req.url.is_some() { + set_clauses.push(format!("url = ${}", param_idx)); + param_idx += 1; + } + if req.expected_body.is_some() { + set_clauses.push(format!("expected_body = ${}", param_idx)); + param_idx += 1; + } + if req.ignore_cert_errors.is_some() { + set_clauses.push(format!("ignore_cert_errors = ${}", param_idx)); + param_idx += 1; + } + if req.basic_auth_user.is_some() { + set_clauses.push(format!("basic_auth_user = ${}", param_idx)); + param_idx += 1; + } + if pass_encrypted.is_some() { + set_clauses.push(format!("basic_auth_pass_encrypted = ${}", param_idx)); + param_idx += 1; + set_clauses.push(format!("basic_auth_pass_nonce = ${}", param_idx)); + param_idx += 1; + } + + if set_clauses.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(json!({ "error": { "code": "validation_error", "message": "No fields to update" } })), + )); + } + + // Always update updated_at + set_clauses.push(format!("updated_at = NOW()")); + + // Use a simpler approach: query the current row, apply changes, update + // This avoids complex dynamic SQL binding issues + let updated = sqlx::query( + r#" + UPDATE host_health_checks + SET name = COALESCE($3, name), + enabled = COALESCE($4, enabled), + service_name = COALESCE($5, service_name), + url = COALESCE($6, url), + expected_body = COALESCE($7, expected_body), + ignore_cert_errors = COALESCE($8, ignore_cert_errors), + basic_auth_user = COALESCE($9, basic_auth_user), + basic_auth_pass_encrypted = COALESCE($10, basic_auth_pass_encrypted), + basic_auth_pass_nonce = COALESCE($11, basic_auth_pass_nonce), + updated_at = NOW() + WHERE id = $1 AND host_id = $2 + "#, + ) + .bind(check_id) + .bind(host_id) + .bind(&req.name) + .bind(req.enabled) + .bind(&req.service_name) + .bind(&req.url) + .bind(&req.expected_body) + .bind(req.ignore_cert_errors) + .bind(&req.basic_auth_user) + .bind(pass_encrypted) + .bind(pass_nonce) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to update health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + if updated.rows_affected() == 0 { + return Err(( + StatusCode::NOT_FOUND, + Json(json!({ "error": { "code": "not_found", "message": "Health check not found" } })), + )); + } + + // Audit log + log_event( + &state.db, + AuditAction::HealthCheckUpdated, + Some(auth.user_id), + None, + Some("host"), + Some(&host_id.to_string()), + json!({ "check_id": check_id }), + None, + None, + ).await; + + Ok(Json(json!({ "id": check_id, "updated": true }))) +} + +// ── DELETE /api/v1/hosts/{host_id}/health-checks/{check_id} ─────────────────── + +async fn delete_health_check( + State(state): State, + auth: AuthUser, + Path((host_id, check_id)): Path<(Uuid, Uuid)>, +) -> Result)> { + // RBAC check for operators + if !auth.role.is_admin() { + let can_access = operator_can_access_host(&state.db, auth.user_id, host_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "RBAC check failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + if !can_access { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + )); + } + } + + let deleted = sqlx::query( + "DELETE FROM host_health_checks WHERE id = $1 AND host_id = $2", + ) + .bind(check_id) + .bind(host_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to delete health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + + if deleted.rows_affected() == 0 { + return Err(( + StatusCode::NOT_FOUND, + Json(json!({ "error": { "code": "not_found", "message": "Health check not found" } })), + )); + } + + // Audit log + log_event( + &state.db, + AuditAction::HealthCheckDeleted, + Some(auth.user_id), + None, + Some("host"), + Some(&host_id.to_string()), + json!({ "check_id": check_id }), + None, + None, + ).await; + + Ok(StatusCode::NO_CONTENT) +} + +// ── POST /api/v1/hosts/{host_id}/health-checks/{check_id}/test ─────────────── + +async fn test_health_check( + State(state): State, + auth: AuthUser, + Path((host_id, check_id)): Path<(Uuid, Uuid)>, +) -> Result, (StatusCode, Json)> { + // RBAC check for operators + if !auth.role.is_admin() { + let can_access = operator_can_access_host(&state.db, auth.user_id, host_id) + .await + .map_err(|e| { + tracing::error!(error = %e, "RBAC check failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; + if !can_access { + return Err(( + StatusCode::FORBIDDEN, + Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + )); + } + } + + // Get the health check + let check: HealthCheck = sqlx::query_as::<_, HealthCheck>( + r#" + SELECT id, host_id, name, check_type, enabled, + service_name, url, expected_body, ignore_cert_errors, basic_auth_user, + created_at, updated_at + FROM host_health_checks + WHERE id = $1 AND host_id = $2 + "#, + ) + .bind(check_id) + .bind(host_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to get health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })? + .ok_or_else(|| { + ( + StatusCode::NOT_FOUND, + Json(json!({ "error": { "code": "not_found", "message": "Health check not found" } })), + ) + })?; + + // Run the check + let result = run_health_check(&check, &state).await; + + // Store the result + sqlx::query( + r#" + INSERT INTO host_health_check_results (check_id, healthy, detail, latency_ms) + VALUES ($1, $2, $3, $4) + "#, + ) + .bind(check_id) + .bind(result.healthy) + .bind(&result.detail) + .bind(result.latency_ms) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to store test result"); + // Don't fail the response, just log + }) + .ok(); + + Ok(Json(HealthCheckTestResponse { + healthy: result.healthy, + detail: result.detail, + latency_ms: result.latency_ms, + })) +} + +// ── Health check execution ─────────────────────────────────────────────────── + +struct CheckResult { + healthy: bool, + detail: String, + latency_ms: Option, +} + +async fn run_health_check(check: &HealthCheck, state: &AppState) -> CheckResult { + match check.check_type.as_str() { + "service" => run_service_check(check, state).await, + "http" => run_http_check(check, state).await, + _ => CheckResult { + healthy: false, + detail: format!("Unknown check type: {}", check.check_type), + latency_ms: None, + }, + } +} + +async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult { + let service_name = match &check.service_name { + Some(name) => name.clone(), + None => { + return CheckResult { + healthy: false, + detail: "No service_name configured".to_string(), + latency_ms: None, + } + } + }; + + // Get host info for agent connection + let host_info: Option<(String, String)> = sqlx::query_as::<_, (String, String)>( + "SELECT host(ip_address)::text, fqdn FROM hosts WHERE id = $1", + ) + .bind(check.host_id) + .fetch_optional(&state.db) + .await + .ok() + .flatten(); + + let (ip, fqdn) = match host_info { + Some(info) => info, + None => { + return CheckResult { + healthy: false, + detail: "Host not found".to_string(), + latency_ms: None, + } + } + }; + + // Build agent URL + let agent_url = format!("https://{}:12443/api/v1/system/services/{}", ip, service_name); + + let start = std::time::Instant::now(); + + // Make mTLS request to agent using reqwest with client certs + let client = match build_agent_http_client(state) { + Ok(c) => c, + Err(e) => { + return CheckResult { + healthy: false, + detail: format!("Failed to build HTTP client: {}", e), + latency_ms: None, + } + } + }; + + match client.get(&agent_url).timeout(std::time::Duration::from_secs(10)).send().await { + Ok(resp) => { + let latency = start.elapsed().as_millis() as i32; + let status = resp.status(); + + if status.is_success() { + // Parse response to check healthy field + match resp.text().await { + Ok(body) => { + // Try to parse as ApiResponse + if let Ok(api_resp) = serde_json::from_str::(&body) { + if let Some(data) = api_resp.get("data") { + if let Some(healthy) = data.get("healthy").and_then(|v| v.as_bool()) { + let active_state = data.get("active_state") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let sub_state = data.get("sub_state") + .and_then(|v| v.as_str()) + .unwrap_or(""); + return CheckResult { + healthy, + detail: format!("{} ({})", active_state, sub_state), + latency_ms: Some(latency), + }; + } + } + } + CheckResult { + healthy: false, + detail: format!("Failed to parse agent response"), + latency_ms: Some(latency), + } + } + Err(e) => CheckResult { + healthy: false, + detail: format!("Failed to read response: {}", e), + latency_ms: Some(latency), + }, + } + } else { + CheckResult { + healthy: false, + detail: format!("Agent returned HTTP {}", status), + latency_ms: Some(latency), + } + } + } + Err(e) => { + let latency = start.elapsed().as_millis() as i32; + if e.is_timeout() { + CheckResult { + healthy: false, + detail: "Timeout (10s)".to_string(), + latency_ms: Some(latency), + } + } else { + CheckResult { + healthy: false, + detail: format!("Connection failed: {}", e), + latency_ms: Some(latency), + } + } + } + } +} + +async fn run_http_check(check: &HealthCheck, state: &AppState) -> CheckResult { + let url = match &check.url { + Some(u) => u.clone(), + None => { + return CheckResult { + healthy: false, + detail: "No URL configured".to_string(), + latency_ms: None, + } + } + }; + + let expected = match &check.expected_body { + Some(e) => e.clone(), + None => { + return CheckResult { + healthy: false, + detail: "No expected_body configured".to_string(), + latency_ms: None, + } + } + }; + + // Build HTTP client + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .danger_accept_invalid_certs(check.ignore_cert_errors) + .build() + .unwrap_or_default(); + + // Build request with optional basic auth + let mut request = client.get(&url); + + // Decrypt basic_auth_pass if user is set + if let Some(ref user) = check.basic_auth_user { + // Get encrypted password from DB + let pass_data: Option<(Vec, Vec)> = sqlx::query_as::<_, (Vec, Vec)>( + "SELECT basic_auth_pass_encrypted, basic_auth_pass_nonce FROM host_health_checks WHERE id = $1", + ) + .bind(check.id) + .fetch_optional(&state.db) + .await + .ok() + .flatten(); + + if let Some((enc, nonce)) = pass_data { + let key_path = PathBuf::from(crypto::KEY_PATH); + if let Ok(key) = crypto::load_or_create_key(&key_path) { + if let Ok(password) = crypto::decrypt(&enc, &nonce, &key) { + request = request.basic_auth(user, Some(password)); + } + } + } + } + + let start = std::time::Instant::now(); + + match request.send().await { + Ok(resp) => { + let latency = start.elapsed().as_millis() as i32; + let status = resp.status(); + + if !status.is_success() { + return CheckResult { + healthy: false, + detail: format!("HTTP {}", status), + latency_ms: Some(latency), + }; + } + + match resp.text().await { + Ok(body) => { + let matched = body.contains(&expected); + CheckResult { + healthy: matched, + detail: if matched { + format!("HTTP {} — body matched", status) + } else { + format!("HTTP {} — body did not contain expected substring", status) + }, + latency_ms: Some(latency), + } + } + Err(e) => CheckResult { + healthy: false, + detail: format!("Failed to read response: {}", e), + latency_ms: Some(latency), + }, + } + } + Err(e) => { + let latency = start.elapsed().as_millis() as i32; + if e.is_timeout() { + CheckResult { + healthy: false, + detail: "Timeout (10s)".to_string(), + latency_ms: Some(latency), + } + } else { + CheckResult { + healthy: false, + detail: format!("Request failed: {}", e), + latency_ms: Some(latency), + } + } + } + } +} + +fn build_agent_http_client(state: &AppState) -> Result { + let mut builder = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .danger_accept_invalid_certs(true); // Agent uses self-signed certs + + // Load mTLS client certificates + let ca_cert_path = &state.config.security.ca_cert_path; + let client_cert_path = &state.config.security.agent_client_cert_path; + let client_key_path = &state.config.security.agent_client_key_path; + + // Add CA cert + if std::path::Path::new(ca_cert_path).exists() { + let ca_pem = std::fs::read(ca_cert_path).map_err(|e| format!("Read CA cert: {}", e))?; + let ca = reqwest::Certificate::from_pem(&ca_pem).map_err(|e| format!("Parse CA cert: {}", e))?; + builder = builder.add_root_certificate(ca); + } + + // Add client cert + key for mTLS + if std::path::Path::new(client_cert_path).exists() && std::path::Path::new(client_key_path).exists() { + let client_pem = std::fs::read(client_cert_path).map_err(|e| format!("Read client cert: {}", e))?; + let key_pem = std::fs::read(client_key_path).map_err(|e| format!("Read client key: {}", e))?; + let mut combined = Vec::new(); + combined.extend_from_slice(&client_pem); + combined.extend_from_slice(&key_pem); + let identity = reqwest::Identity::from_pem(&combined) + .map_err(|e| format!("Parse client identity: {}", e))?; + builder = builder.identity(identity); + } + + builder.build().map_err(|e| format!("Build client: {}", e)) +} diff --git a/crates/pm-web/src/routes/hosts.rs b/crates/pm-web/src/routes/hosts.rs index 7aeb72a..a6fa6dc 100644 --- a/crates/pm-web/src/routes/hosts.rs +++ b/crates/pm-web/src/routes/hosts.rs @@ -112,6 +112,7 @@ async fn list_hosts( SELECT h.id, h.fqdn, host(h.ip_address)::text AS ip_address, h.display_name, h.os_family, h.os_name, h.health_status, h.agent_version, COALESCE(hpd.patch_count, 0) AS patches_missing, + " + hc_subquery + ", h.registered_at FROM hosts h LEFT JOIN host_patch_data hpd ON hpd.host_id = h.id @@ -130,6 +131,7 @@ async fn list_hosts( h.display_name, h.os_family, h.os_name, h.health_status, h.agent_version, COALESCE(hpd.patch_count, 0) AS patches_missing, + " + hc_subquery + ", h.registered_at FROM hosts h LEFT JOIN host_patch_data hpd ON hpd.host_id = h.id diff --git a/crates/pm-web/src/routes/mod.rs b/crates/pm-web/src/routes/mod.rs index 93a091c..cc2213c 100644 --- a/crates/pm-web/src/routes/mod.rs +++ b/crates/pm-web/src/routes/mod.rs @@ -11,5 +11,6 @@ pub mod settings; pub mod status; pub mod users; pub mod ws; +pub mod health_checks; pub mod reports; diff --git a/crates/pm-worker/Cargo.toml b/crates/pm-worker/Cargo.toml index 111ece2..9885895 100644 --- a/crates/pm-worker/Cargo.toml +++ b/crates/pm-worker/Cargo.toml @@ -28,3 +28,4 @@ tokio-rustls = { version = "0.26" } rustls-pemfile = { version = "2" } tokio-tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots"] } lettre = { version = "0.11", default-features = false, features = ["tokio1-rustls-tls", "smtp-transport", "builder"] } +reqwest = { workspace = true } diff --git a/crates/pm-worker/src/health_check_poller.rs b/crates/pm-worker/src/health_check_poller.rs new file mode 100644 index 0000000..fbae464 --- /dev/null +++ b/crates/pm-worker/src/health_check_poller.rs @@ -0,0 +1,471 @@ +//! Periodic health check poller for configured service and HTTP checks. +//! +//! Polls every `health_check_poll_interval_secs`, querying each enabled health +//! check definition and storing results in `host_health_check_results`. +//! Results older than 4 days are pruned on each cycle. + +use std::path::Path; +use std::sync::Arc; +use std::time::Instant; + +use pm_core::{config::AppConfig, crypto}; +use sqlx::{FromRow, PgPool}; +use tokio::{sync::Semaphore, time}; +use uuid::Uuid; + +use crate::agent_loader::load_agent_certs; +use pm_agent_client::{AgentClient, AgentClientError}; + +// ───────────────────────────────────────────────────────────────────────────── +// DB row types +// ───────────────────────────────────────────────────────────────────────────── + +/// Row fetched for each enabled health check, joined with host connection info. +#[derive(Debug, FromRow)] +struct HealthCheckRow { + id: Uuid, + host_id: Uuid, + name: String, + check_type: String, + service_name: Option, + url: Option, + expected_body: Option, + ignore_cert_errors: Option, + basic_auth_user: Option, + basic_auth_pass_encrypted: Option>, + basic_auth_pass_nonce: Option>, + ip_address: String, + agent_port: i32, +} + +// ───────────────────────────────────────────────────────────────────────────── +// Public entry point +// ───────────────────────────────────────────────────────────────────────────── + +/// Run the health check poller loop indefinitely. +/// +/// On each tick all enabled health checks are queried concurrently (up to +/// `max_concurrent_agent_calls` in-flight at once). Results are persisted +/// to `host_health_check_results` and stale rows are pruned. +pub async fn run_health_check_poller(pool: PgPool, config: Arc) { + let interval_secs = config.worker.health_check_poll_interval_secs; + let mut ticker = time::interval(std::time::Duration::from_secs(interval_secs)); + + tracing::info!(interval_secs, "Health check poller started"); + + loop { + ticker.tick().await; + + // Load certs on each cycle so cert rotation is picked up automatically. + let certs = match load_agent_certs(&config.security) { + Ok(c) => c, + Err(e) => { + tracing::error!( + error = %e, + "Health check poller: failed to load agent certs — skipping cycle" + ); + continue; + }, + }; + + let client_cert = Arc::new(certs.client_cert); + let client_key = Arc::new(certs.client_key); + let ca_cert = Arc::new(certs.ca_cert); + + // Load the crypto key for decrypting HTTP check passwords. + let crypto_key = match crypto::load_or_create_key(Path::new(crypto::KEY_PATH)) { + Ok(k) => Arc::new(k), + Err(e) => { + tracing::error!( + error = %e, + "Health check poller: failed to load crypto key — skipping cycle" + ); + continue; + }, + }; + + // Fetch all enabled health checks with host connection info. + let checks: Vec = match sqlx::query_as( + r#" + SELECT + hc.id, + hc.host_id, + hc.name, + hc.check_type, + hc.service_name, + hc.url, + hc.expected_body, + hc.ignore_cert_errors, + hc.basic_auth_user, + hc.basic_auth_pass_encrypted, + hc.basic_auth_pass_nonce, + host(h.ip_address)::text AS ip_address, + h.agent_port + FROM host_health_checks hc + JOIN hosts h ON h.id = hc.host_id + WHERE hc.enabled = TRUE + ORDER BY hc.id + "#, + ) + .fetch_all(&pool) + .await + { + Ok(rows) => rows, + Err(e) => { + tracing::error!(error = %e, "Health check poller: failed to fetch health checks"); + continue; + }, + }; + + if checks.is_empty() { + tracing::debug!("Health check poller: no enabled health checks, skipping cycle"); + prune_old_results(&pool).await; + continue; + } + + let total = checks.len(); + let semaphore = Arc::new(Semaphore::new(config.worker.max_concurrent_agent_calls)); + + let mut handles = Vec::with_capacity(total); + + for check in checks { + let pool = pool.clone(); + let sem = semaphore.clone(); + let cert = client_cert.clone(); + let key = client_key.clone(); + let ca = ca_cert.clone(); + let ckey = crypto_key.clone(); + + let handle = tokio::spawn(async move { + let _permit = sem.acquire().await.expect("semaphore closed"); + run_check(pool, check, &cert, &key, &ca, &ckey).await + }); + + handles.push(handle); + } + + // Collect results and tally counts. + let mut healthy_count = 0usize; + let mut unhealthy_count = 0usize; + let mut error_count = 0usize; + + for handle in handles { + match handle.await { + Ok(true) => healthy_count += 1, + Ok(false) => unhealthy_count += 1, + Err(e) => { + tracing::error!(error = %e, "Health check poller task panicked"); + error_count += 1; + }, + } + } + + tracing::info!( + total, + healthy_count, + unhealthy_count, + error_count, + "Health check poll cycle complete" + ); + + // Prune results older than 4 days. + prune_old_results(&pool).await; + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Check dispatch +// ───────────────────────────────────────────────────────────────────────────── + +/// Run a single health check and persist the result. Returns `true` if healthy. +async fn run_check( + pool: PgPool, + check: HealthCheckRow, + client_cert: &[u8], + client_key: &[u8], + ca_cert: &[u8], + crypto_key: &[u8; 32], +) -> bool { + let start = Instant::now(); + + let (healthy, detail) = match check.check_type.as_str() { + "service" => run_service_check(&check, client_cert, client_key, ca_cert).await, + "http" => run_http_check(&check, crypto_key).await, + other => { + tracing::warn!( + check_id = %check.id, + check_type = other, + "Unknown health check type — treating as unhealthy" + ); + (false, format!("Unknown check type: {other}")) + }, + }; + + let latency_ms = start.elapsed().as_millis() as i32; + + // Persist the result. + if let Err(e) = sqlx::query( + r#" + INSERT INTO host_health_check_results (check_id, healthy, detail, latency_ms) + VALUES ($1, $2, $3, $4) + "#, + ) + .bind(check.id) + .bind(healthy) + .bind(&detail) + .bind(latency_ms) + .execute(&pool) + .await + { + tracing::error!( + check_id = %check.id, + error = %e, + "Health check poller: failed to insert result" + ); + } + + healthy +} + +// ───────────────────────────────────────────────────────────────────────────── +// Service check (via mTLS AgentClient) +// ───────────────────────────────────────────────────────────────────────────── + +/// Execute a service check by calling the agent's `/api/v1/system/services/{name}` endpoint. +async fn run_service_check( + check: &HealthCheckRow, + client_cert: &[u8], + client_key: &[u8], + ca_cert: &[u8], +) -> (bool, String) { + let service_name = match &check.service_name { + Some(name) => name.clone(), + None => { + return (false, "Service check missing service_name".to_string()); + }, + }; + + let client = match AgentClient::new( + &check.ip_address, + check.agent_port as u16, + client_cert, + client_key, + ca_cert, + ) { + Ok(c) => c, + Err(e) => { + return (false, format!("Failed to build AgentClient: {e}")); + }, + }; + + match client.service_status(&service_name).await { + Ok(data) => { + let detail = if data.healthy { + format!( + "Service '{}' is {} (uptime: {}s)", + data.name, + data.status, + data.uptime_secs.map_or("N/A".to_string(), |s| s.to_string()) + ) + } else { + format!( + "Service '{}' status: {} (unhealthy)", + data.name, data.status + ) + }; + (data.healthy, detail) + }, + Err(AgentClientError::Timeout) => { + (false, format!("Agent timed out querying service '{service_name}'")) + }, + Err(AgentClientError::Connect(_)) => { + (false, format!("Agent connection refused for service '{service_name}'")) + }, + Err(AgentClientError::ApiError { code, message }) => { + // 404, 400, 500 etc. from the agent means the service is unhealthy. + (false, format!("Agent error [{code}]: {message}")) + }, + Err(e) => { + (false, format!("Agent error querying service '{service_name}': {e}")) + }, + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// HTTP check (via reqwest, no mTLS) +// ───────────────────────────────────────────────────────────────────────────── + +/// Execute an HTTP check by making a GET request to the configured URL. +/// Supports optional basic auth (decrypted from DB) and substring body matching. +async fn run_http_check( + check: &HealthCheckRow, + crypto_key: &[u8; 32], +) -> (bool, String) { + let url = match &check.url { + Some(u) => u.clone(), + None => { + return (false, "HTTP check missing URL".to_string()); + }, + }; + + // Build a reqwest client for this check. + // Use danger_accept_invalid_certs if ignore_cert_errors is set (default true). + let ignore_cert_errors = check.ignore_cert_errors.unwrap_or(true); + + let client_builder = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .redirect(reqwest::redirect::Policy::limited(5)); + + let client = if ignore_cert_errors { + client_builder + .danger_accept_invalid_certs(true) + .build() + .unwrap_or_else(|_| reqwest::Client::new()) + } else { + client_builder.build().unwrap_or_else(|_| reqwest::Client::new()) + }; + + // Build the request. + let mut request = client.get(&url); + + // Add basic auth if configured. + if let Some(user) = &check.basic_auth_user { + // Decrypt the password if present. + let password = match (&check.basic_auth_pass_encrypted, &check.basic_auth_pass_nonce) { + (Some(enc), Some(nonce)) => { + match crypto::decrypt(enc, nonce, crypto_key) { + Ok(p) => p, + Err(e) => { + return ( + false, + format!("Failed to decrypt basic auth password: {e}"), + ); + }, + } + }, + _ => { + // No encrypted password stored — treat as missing credentials. + return (false, "HTTP check has basic_auth_user but no encrypted password".to_string()); + }, + }; + request = request.basic_auth(user.as_str(), Some(password.as_str())); + } + + // Execute the request. + let response = match request.send().await { + Ok(r) => r, + Err(e) => { + if e.is_timeout() { + return (false, format!("HTTP check timed out: {url}")); + } else if e.is_connect() { + return (false, format!("HTTP check connection failed: {url}")); + } else { + return (false, format!("HTTP check request error: {e}")); + } + }, + }; + + let status = response.status(); + + // Check HTTP status code. + if !status.is_success() { + return ( + false, + format!("HTTP check returned status {} for {url}", status.as_u16()), + ); + } + + // Read the response body for substring matching. + let body = match response.text().await { + Ok(b) => b, + Err(e) => { + return (false, format!("HTTP check failed to read response body: {e}")); + }, + }; + + // Check expected_body substring match. + if let Some(expected) = &check.expected_body { + if !body.contains(expected) { + return ( + false, + format!( + "HTTP check body mismatch for {url}: expected substring not found" + ), + ); + } + } + + (true, format!("HTTP check OK for {url} (status {})", status.as_u16())) +} + +// ───────────────────────────────────────────────────────────────────────────── +// Prune old results +// ───────────────────────────────────────────────────────────────────────────── + +/// Delete health check results older than 4 days. +async fn prune_old_results(pool: &PgPool) { + match sqlx::query( + "DELETE FROM host_health_check_results WHERE checked_at < NOW() - INTERVAL '4 days'", + ) + .execute(pool) + .await + { + Ok(result) => { + if result.rows_affected() > 0 { + tracing::info!( + rows_deleted = result.rows_affected(), + "Health check poller: pruned old results" + ); + } + }, + Err(e) => { + tracing::error!(error = %e, "Health check poller: failed to prune old results"); + }, + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Health check gate for job executor +// ───────────────────────────────────────────────────────────────────────────── + +/// Check whether all enabled health checks for a host are healthy. +/// +/// Returns `Ok(true)` if all checks pass (or no checks are configured), +/// `Ok(false)` if any check is unhealthy or has no result yet. +pub async fn check_host_health_checks(pool: &PgPool, host_id: Uuid) -> anyhow::Result { + // Check if there are any enabled health checks for this host. + let check_count: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM host_health_checks WHERE host_id = $1 AND enabled = TRUE", + ) + .bind(host_id) + .fetch_one(pool) + .await?; + + if check_count.0 == 0 { + // No health checks configured for this host — treat as healthy. + return Ok(true); + } + + // Find any enabled check that has no healthy result or an unhealthy latest result. + let unhealthy_count: (i64,) = sqlx::query_as( + r#" + SELECT COUNT(*) + FROM host_health_checks hc + LEFT JOIN LATERAL ( + SELECT healthy + FROM host_health_check_results r + WHERE r.check_id = hc.id + ORDER BY r.checked_at DESC + LIMIT 1 + ) latest ON true + WHERE hc.host_id = $1 + AND hc.enabled = TRUE + AND (latest.healthy IS NULL OR latest.healthy = FALSE) + "#, + ) + .bind(host_id) + .fetch_one(pool) + .await?; + + Ok(unhealthy_count.0 == 0) +} diff --git a/crates/pm-worker/src/job_executor.rs b/crates/pm-worker/src/job_executor.rs index 064ec6a..d509912 100644 --- a/crates/pm-worker/src/job_executor.rs +++ b/crates/pm-worker/src/job_executor.rs @@ -24,6 +24,7 @@ use uuid::Uuid; use crate::agent_loader::load_agent_certs; use crate::email; +use crate::health_check_poller::check_host_health_checks; // ───────────────────────────────────────────────────────────────────────────── // Internal DB row types @@ -78,6 +79,7 @@ struct StatusCounts { succeeded_count: i64, failed_count: i64, cancelled_count: i64, + waiting_health_check_count: i64, total_count: i64, } @@ -369,6 +371,89 @@ async fn execute_host_job( }, }; + // ── 1b. Health check gate ────────────────────────────────────────────── + // All enabled health checks for this host must be healthy before we proceed. + match check_host_health_checks(&pool, host_id).await { + Ok(true) => { + tracing::debug!(%host_id, "execute_host_job: health checks passed"); + }, + Ok(false) => { + tracing::info!(%host_id, %pjh_id, "execute_host_job: health checks not passed, setting waiting_health_check"); + // Check if the maintenance window is still open for this host. + let window_open: bool = sqlx::query_scalar( + r#" + SELECT EXISTS( + SELECT 1 FROM maintenance_windows mw + WHERE mw.host_id = $1 + AND mw.enabled = TRUE + AND ( + (mw.recurrence = 'once' + AND mw.start_at <= NOW() + AND NOW() < mw.start_at + (mw.duration_minutes * INTERVAL '1 minute')) + OR + (mw.recurrence = 'daily' + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute'))) + OR + (mw.recurrence = 'weekly' + AND EXTRACT(DOW FROM NOW() AT TIME ZONE 'UTC') = mw.recurrence_day + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute'))) + OR + (mw.recurrence = 'monthly' + AND EXTRACT(DAY FROM NOW() AT TIME ZONE 'UTC') = mw.recurrence_day + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute'))) + ) + ) + "#, + ) + .bind(host_id) + .fetch_optional(&pool) + .await + .unwrap_or(Some(true)) + .unwrap_or(true); // Default to true if no window configured + + if !window_open { + tracing::warn!(%host_id, %pjh_id, "execute_host_job: health checks not passed and maintenance window closed"); + handle_host_failure( + pool, + pjh_id, + "Health checks did not pass before maintenance window closed".to_string(), + ) + .await; + return; + } + + // Set status to waiting_health_check and retry in 5 minutes. + let retry_at = Utc::now() + ChronoDuration::minutes(5); + if let Err(e) = sqlx::query( + r#" + UPDATE patch_job_hosts + SET status = 'waiting_health_check', + retry_next_at = $2, + last_error = 'Waiting for health checks to pass' + WHERE id = $1 + "#, + ) + .bind(pjh_id) + .bind(retry_at) + .execute(&pool) + .await + { + tracing::error!(%pjh_id, error = %e, "execute_host_job: failed to set waiting_health_check status"); + } + return; + }, + Err(e) => { + tracing::warn!(%host_id, error = %e, "execute_host_job: health check query failed, proceeding anyway"); + // If we can't query health checks, proceed with the job rather than blocking. + }, + } + // ── 2. Fetch the job's patch_selection ────────────────────────────────── let patch_sel: JobPatchSelection = match sqlx::query_as("SELECT patch_selection FROM patch_jobs WHERE id = $1") @@ -764,6 +849,7 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) { COUNT(*) FILTER (WHERE status = 'succeeded') AS succeeded_count, COUNT(*) FILTER (WHERE status = 'failed') AS failed_count, COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled_count, + COUNT(*) FILTER (WHERE status = 'waiting_health_check') AS waiting_health_check_count, COUNT(*) AS total_count FROM patch_job_hosts WHERE job_id = $1 @@ -784,7 +870,7 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) { let new_status: &str; let set_completed: bool; - if counts.running_count > 0 || counts.pending_count > 0 || counts.queued_count > 0 { + if counts.running_count > 0 || counts.pending_count > 0 || counts.queued_count > 0 || counts.waiting_health_check_count > 0 { // Still work in flight — keep parent running. new_status = "running"; set_completed = false; @@ -912,17 +998,18 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) { /// Find pending host entries whose back-off window has elapsed, reset them to /// `queued`, and dispatch them immediately. +/// +/// Also retries `waiting_health_check` entries whose retry window has elapsed. pub async fn retry_pending_jobs(pool: PgPool, config: Arc) { let rows: Vec = match sqlx::query_as( r#" SELECT pjh.id, pjh.host_id, pjh.job_id FROM patch_job_hosts pjh JOIN patch_jobs j ON j.id = pjh.job_id - WHERE pjh.status = 'pending' + WHERE pjh.status IN ('pending', 'waiting_health_check') AND pjh.retry_next_at <= NOW() - AND j.status != 'cancelled' - "#, - ) + AND j.status != 'cancelled' + "#,) .fetch_all(&pool) .await { diff --git a/crates/pm-worker/src/main.rs b/crates/pm-worker/src/main.rs index 6be41fb..b3a9978 100644 --- a/crates/pm-worker/src/main.rs +++ b/crates/pm-worker/src/main.rs @@ -6,6 +6,7 @@ mod agent_loader; mod audit_verifier; mod email; +mod health_check_poller; mod health_poller; mod job_executor; mod maintenance_scheduler; @@ -19,6 +20,7 @@ use std::{sync::Arc, time::Duration}; use tokio::time; use audit_verifier::run_audit_verifier; +use health_check_poller::run_health_check_poller; use health_poller::run_health_poller; use job_executor::run_job_executor; use maintenance_scheduler::run_maintenance_scheduler; @@ -29,7 +31,7 @@ use ws_relay::run_ws_relay; /// Minimum number of applied migrations the worker requires before /// accepting work. Prevents the worker from running against a schema /// that hasn't been migrated yet. -const REQUIRED_MIGRATION_COUNT: i64 = 5; +const REQUIRED_MIGRATION_COUNT: i64 = 8; /// How long to wait between schema-version checks before giving up. const SCHEMA_CHECK_TIMEOUT: Duration = Duration::from_secs(120); @@ -89,6 +91,9 @@ async fn main() -> anyhow::Result<()> { // M11: audit integrity verification (runs every 24 hours) let audit_verifier_handle = tokio::spawn(run_audit_verifier(pool.clone(), config.clone())); + // Health check poller — runs configured service/HTTP health checks + let health_check_handle = tokio::spawn(run_health_check_poller(pool.clone(), config.clone())); + tracing::info!("Worker tasks started"); // Wait for all tasks (they run indefinitely) diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 963c54b..6d0afd9 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -7,6 +7,9 @@ import type { UpdateMaintenanceWindowRequest, Certificate, IssuedCert, + HealthCheckWithResult, + CreateHealthCheckRequest, + UpdateHealthCheckRequest, } from '../types' const BASE_URL = '/api/v1' @@ -259,3 +262,25 @@ export const settingsApi = { updateIpWhitelist: (entries: string[]) => apiClient.put<{ entries: string[] }>('/settings/ip-whitelist', { entries }), auditIntegrity: () => apiClient.post('/settings/audit-integrity'), } + +// ── Health Checks API ───────────────────────────────────────────────────────── + +export const healthChecksApi = { + list: (hostId: string) => + apiClient.get(`/hosts/${hostId}/health-checks`), + + get: (hostId: string, checkId: string) => + apiClient.get(`/hosts/${hostId}/health-checks/${checkId}`), + + create: (hostId: string, body: CreateHealthCheckRequest) => + apiClient.post(`/hosts/${hostId}/health-checks`, body), + + update: (hostId: string, checkId: string, body: UpdateHealthCheckRequest) => + apiClient.put(`/hosts/${hostId}/health-checks/${checkId}`, body), + + delete: (hostId: string, checkId: string) => + apiClient.delete(`/hosts/${hostId}/health-checks/${checkId}`), + + test: (hostId: string, checkId: string) => + apiClient.post(`/hosts/${hostId}/health-checks/${checkId}/test`), +} diff --git a/frontend/src/pages/HostDetailPage.tsx b/frontend/src/pages/HostDetailPage.tsx index 4ec932b..4cbfa8e 100644 --- a/frontend/src/pages/HostDetailPage.tsx +++ b/frontend/src/pages/HostDetailPage.tsx @@ -34,13 +34,25 @@ import { import { Add as AddIcon, ArrowBack, + Cancel as CancelIcon, + CheckCircle as CheckCircleIcon, Delete as DeleteIcon, Edit as EditIcon, + MonitorHeart as MonitorHeartIcon, + PlayArrow as PlayArrowIcon, + Remove as RemoveIcon, Schedule as ScheduleIcon, VpnKey as VpnKeyIcon, } from '@mui/icons-material' -import { apiClient, maintenanceWindowsApi, certsApi } from '../api/client' -import type { MaintenanceWindow, WindowRecurrence } from '../types' +import { apiClient, maintenanceWindowsApi, healthChecksApi, certsApi } from '../api/client' +import type { + MaintenanceWindow, + WindowRecurrence, + HealthCheckType, + HealthCheckWithResult, + CreateHealthCheckRequest, + UpdateHealthCheckRequest, +} from '../types' // ── Helpers ─────────────────────────────────────────────────────────────────── @@ -74,7 +86,7 @@ function scheduleDescription(w: MaintenanceWindow): string { } } -// ── Form value type ─────────────────────────────────────────────────────────── +// ── Window form value type ──────────────────────────────────────────────────── interface FormValues { label: string @@ -185,6 +197,114 @@ function WindowFormDialog({ open, title, initial, onClose, onSubmit }: WindowFor ) } +// ── Health Check form value type ───────────────────────────────────────────── + +interface HealthCheckFormValues { + name: string + check_type: HealthCheckType + service_name: string + url: string + expected_body: string + ignore_cert_errors: boolean + basic_auth_user: string + basic_auth_pass: string + enabled: boolean +} + +function defaultHealthCheckForm(): HealthCheckFormValues { + return { + name: '', + check_type: 'service', + service_name: '', + url: '', + expected_body: '', + ignore_cert_errors: false, + basic_auth_user: '', + basic_auth_pass: '', + enabled: true, + } +} + +// ── Health Check form dialog ────────────────────────────────────────────────── + +interface HealthCheckFormDialogProps { + open: boolean + title: string + initial: HealthCheckFormValues + onClose: () => void + onSubmit: (values: HealthCheckFormValues) => Promise +} + +function HealthCheckFormDialog({ open, title, initial, onClose, onSubmit }: HealthCheckFormDialogProps) { + const [form, setForm] = useState(initial) + const [saving, setSaving] = useState(false) + const [err, setErr] = useState(null) + + useEffect(() => { setForm(initial); setErr(null) }, [open, initial]) + + const set = (field: keyof HealthCheckFormValues, value: HealthCheckFormValues[keyof HealthCheckFormValues]) => + setForm(prev => ({ ...prev, [field]: value })) + + const handleSubmit = async () => { + if (!form.name.trim()) { setErr('Name is required'); return } + if (form.check_type === 'service' && !form.service_name.trim()) { setErr('Service name is required'); return } + if (form.check_type === 'http' && !form.url.trim()) { setErr('URL is required'); return } + setSaving(true); setErr(null) + try { await onSubmit(form) } + catch (e: unknown) { + const msg = (e as { response?: { data?: { error?: { message?: string } } } }) + ?.response?.data?.error?.message ?? 'Failed to save' + setErr(msg) + } finally { setSaving(false) } + } + + return ( + + {title} + + {err && {err}} + set('name', e.target.value)} required fullWidth /> + + Check Type + + + {form.check_type === 'service' && ( + set('service_name', e.target.value)} required fullWidth + helperText="Systemd service unit name to check" /> + )} + {form.check_type === 'http' && ( + <> + set('url', e.target.value)} required fullWidth + helperText="Full URL to check (e.g. https://example.com/health)" /> + set('expected_body', e.target.value)} fullWidth + helperText="Substring expected in response body" /> + set('ignore_cert_errors', e.target.checked)} />} + label="Ignore Certificate Errors" + /> + set('basic_auth_user', e.target.value)} fullWidth /> + set('basic_auth_pass', e.target.value)} fullWidth + helperText="Leave blank to keep existing password" /> + + )} + set('enabled', e.target.checked)} />} + label="Enabled" + /> + + + + + + + ) +} + // ── Main page ────────────────────────────────────────────────────────────────── export default function HostDetailPage() { @@ -201,19 +321,37 @@ export default function HostDetailPage() { open: false, message: '', severity: 'success', }) - // Create dialog + // Create window dialog const [createOpen, setCreateOpen] = useState(false) const [createForm, setCreateForm] = useState(defaultForm()) - // Edit dialog + // Edit window dialog const [editOpen, setEditOpen] = useState(false) const [editWindow, setEditWindow] = useState(null) const [editForm, setEditForm] = useState(defaultForm()) - // Delete dialog + // Delete window dialog const [deleteOpen, setDeleteOpen] = useState(false) const [deleteTarget, setDeleteTarget] = useState(null) + // Health checks state + const [healthChecks, setHealthChecks] = useState([]) + const [hcLoading, setHcLoading] = useState(false) + const [testingId, setTestingId] = useState(null) + + // Create health check dialog + const [hcCreateOpen, setHcCreateOpen] = useState(false) + const [hcCreateForm, setHcCreateForm] = useState(defaultHealthCheckForm()) + + // Edit health check dialog + const [hcEditOpen, setHcEditOpen] = useState(false) + const [hcEditTarget, setHcEditTarget] = useState(null) + const [hcEditForm, setHcEditForm] = useState(defaultHealthCheckForm()) + + // Delete health check dialog + const [hcDeleteOpen, setHcDeleteOpen] = useState(false) + const [hcDeleteTarget, setHcDeleteTarget] = useState(null) + // ── Fetch host ──────────────────────────────────────────────────────────── useEffect(() => { apiClient.get(`/hosts/${id}`) @@ -235,6 +373,19 @@ export default function HostDetailPage() { useEffect(() => { fetchWindows() }, [fetchWindows]) + // ── Fetch health checks ─────────────────────────────────────────────────── + const fetchHealthChecks = useCallback(async () => { + if (!id) return + setHcLoading(true) + try { + const res = await healthChecksApi.list(id) + setHealthChecks(Array.isArray(res.data) ? res.data : []) + } catch { /* ignore */ } + finally { setHcLoading(false) } + }, [id]) + + useEffect(() => { fetchHealthChecks() }, [fetchHealthChecks]) + const showSnack = (message: string, severity: 'success' | 'error') => setSnackbar({ open: true, message, severity }) @@ -312,6 +463,105 @@ export default function HostDetailPage() { } } + // ── Create health check ────────────────────────────────────────────────── + const handleHcCreateSubmit = async (values: HealthCheckFormValues) => { + if (!id) return + const body: CreateHealthCheckRequest = { + name: values.name, + check_type: values.check_type, + } + if (values.check_type === 'service') { + body.service_name = values.service_name || undefined + } else { + body.url = values.url || undefined + body.expected_body = values.expected_body || undefined + body.ignore_cert_errors = values.ignore_cert_errors || undefined + body.basic_auth_user = values.basic_auth_user || undefined + body.basic_auth_pass = values.basic_auth_pass || undefined + } + await healthChecksApi.create(id, body) + setHcCreateOpen(false) + showSnack('Health check created', 'success') + await fetchHealthChecks() + } + + // ── Edit health check ──────────────────────────────────────────────────── + const handleHcEditClick = (check: HealthCheckWithResult) => { + setHcEditTarget(check) + setHcEditForm({ + name: check.name, + check_type: check.check_type, + service_name: check.service_name ?? '', + url: check.url ?? '', + expected_body: check.expected_body ?? '', + ignore_cert_errors: check.ignore_cert_errors, + basic_auth_user: check.basic_auth_user ?? '', + basic_auth_pass: '', + enabled: check.enabled, + }) + setHcEditOpen(true) + } + + const handleHcEditSubmit = async (values: HealthCheckFormValues) => { + if (!id || !hcEditTarget) return + const body: UpdateHealthCheckRequest = { + name: values.name, + enabled: values.enabled, + } + if (values.check_type === 'service') { + body.service_name = values.service_name || undefined + } else { + body.url = values.url || undefined + body.expected_body = values.expected_body || undefined + body.ignore_cert_errors = values.ignore_cert_errors + body.basic_auth_user = values.basic_auth_user || undefined + body.basic_auth_pass = values.basic_auth_pass || undefined + } + await healthChecksApi.update(id, hcEditTarget.id, body) + setHcEditOpen(false) + showSnack('Health check updated', 'success') + await fetchHealthChecks() + } + + // ── Delete health check ────────────────────────────────────────────────── + const handleHcDeleteConfirm = async () => { + if (!id || !hcDeleteTarget) return + try { + await healthChecksApi.delete(id, hcDeleteTarget.id) + setHcDeleteOpen(false) + showSnack('Health check deleted', 'success') + await fetchHealthChecks() + } catch { + showSnack('Failed to delete health check', 'error') + } + } + + // ── Toggle health check enabled ────────────────────────────────────────── + const handleToggleEnabled = async (check: HealthCheckWithResult) => { + if (!id) return + try { + await healthChecksApi.update(id, check.id, { enabled: !check.enabled }) + await fetchHealthChecks() + } catch { + showSnack('Failed to toggle health check', 'error') + } + } + + // ── Test health check ──────────────────────────────────────────────────── + const handleTestCheck = async (check: HealthCheckWithResult) => { + if (!id) return + setTestingId(check.id) + try { + await healthChecksApi.test(id, check.id) + await fetchHealthChecks() + showSnack('Health check test completed', 'success') + } catch { + showSnack('Health check test failed', 'error') + } finally { + setTestingId(null) + } + } + // ── Render ──────────────────────────────────────────────────────────────── if (loading) return if (error) return {error} @@ -350,7 +600,7 @@ export default function HostDetailPage() { {/* ── Maintenance Windows ──────────────────────────────────────────── */} - + @@ -427,6 +677,127 @@ export default function HostDetailPage() { )} + {/* ── Health Checks ────────────────────────────────────────────────── */} + + + + + Health Checks + + + + + + + Monitor host health with service and HTTP checks. Maximum 5 checks per host. + + + {hcLoading ? ( + + ) : healthChecks.length === 0 ? ( + + No health checks configured. Add a check to monitor this host's health. + + ) : ( + + + + Name + Type + Status + Enabled + Detail + Latency + Last Checked + Actions + + + + {healthChecks.map(check => ( + + {check.name} + + + + + {check.last_result ? ( + check.last_result.healthy ? ( + + + + ) : ( + + + + ) + ) : ( + + + + )} + + + handleToggleEnabled(check)} + /> + + + + {check.last_result?.detail ?? '—'} + + + + {check.last_result?.latency_ms != null ? `${check.last_result.latency_ms} ms` : '—'} + + + {check.last_result?.checked_at + ? new Date(check.last_result.checked_at).toLocaleString() + : '—'} + + + + handleTestCheck(check)} + > + {testingId === check.id + ? + : } + + + + handleHcEditClick(check)}> + + + + + { setHcDeleteTarget(check); setHcDeleteOpen(true) }} + > + + + + + + ))} + +
+ )} +
+ {/* ── Dialogs ─────────────────────────────────────────────────────── */} + {/* Health Check Dialogs */} + setHcCreateOpen(false)} + onSubmit={handleHcCreateSubmit} + /> + setHcEditOpen(false)} + onSubmit={handleHcEditSubmit} + /> + setHcDeleteOpen(false)} maxWidth="xs" fullWidth> + Delete Health Check + + + Delete {hcDeleteTarget?.name}? This cannot be undone. + + + + + + + + {/* Snackbar */} IP Address OS Health + Checks Agent Actions @@ -82,6 +84,15 @@ export default function HostsPage() { + + {h.health_check_status === 'all_healthy' ? ( + + ) : h.health_check_status === 'some_unhealthy' ? ( + + ) : ( + + )} + {h.agent_version ?? '—'} e.stopPropagation()}> diff --git a/frontend/src/pages/PatchDeploymentPage.tsx b/frontend/src/pages/PatchDeploymentPage.tsx index dcbbb93..f1f1205 100644 --- a/frontend/src/pages/PatchDeploymentPage.tsx +++ b/frontend/src/pages/PatchDeploymentPage.tsx @@ -22,8 +22,10 @@ import { TextField, Toolbar, Typography, + Tooltip, } from '@mui/material' import { Search as SearchIcon } from '@mui/icons-material' +import { CheckCircle as CheckCircleIcon, Cancel as CancelIcon, Remove as RemoveIcon } from '@mui/icons-material' import { useNavigate } from 'react-router-dom' import { hostsApi, jobsApi } from '../api/client' import type { Host, HostHealthStatus } from '../types' @@ -256,6 +258,7 @@ export default function PatchDeploymentPage() { FQDN IP Address Health + Checks Patches OS @@ -263,7 +266,7 @@ export default function PatchDeploymentPage() { {filteredHosts.length === 0 ? ( - + No hosts found @@ -291,6 +294,15 @@ export default function PatchDeploymentPage() { + + {host.health_check_status === 'all_healthy' ? ( + + ) : host.health_check_status === 'some_unhealthy' ? ( + + ) : ( + + )} + ` +```json +{ + "success": true, + "data": { + "name": "nginx", + "display_name": "A high performance web server", + "active_state": "active", + "sub_state": "running", + "load_state": "loaded", + "enabled_state": "enabled", + "main_pid": 1234, + "healthy": true + } +} +``` -## Verified Working -- WebSocket connections to linux-patch-manager-dev (agent with proper WS handler) -- HTTP polling fallback to gitea-runner-u2404 (agent with stub WS) -- Job completion status updates via pg_notify -- Frontend real-time updates via WebSocket events +**Health determination:** The agent `healthy` field is authoritative. Manager uses this boolean directly. + +**Error responses:** 400 (invalid name), 404 (not found → unhealthy), 500 (error → unhealthy) + +### GET /api/v1/health +Basic agent health. Returns `{"status": "healthy"}`. Used for connectivity checks, not host health checks. + +--- + +## Phase 1: Database Schema + +### New table: `host_health_checks` +```sql +CREATE TABLE host_health_checks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + host_id UUID NOT NULL REFERENCES hosts(id) ON DELETE CASCADE, + name VARCHAR(100) NOT NULL, + check_type VARCHAR(20) NOT NULL CHECK (check_type IN ('service', 'http')), + enabled BOOLEAN NOT NULL DEFAULT true, + -- Service check fields + service_name VARCHAR(200), + -- HTTP check fields + url TEXT, + expected_body VARCHAR(500), + ignore_cert_errors BOOLEAN DEFAULT true, + basic_auth_user VARCHAR(100), + basic_auth_pass_encrypted BYTEA, -- AES-256-GCM encrypted with per-install key + basic_auth_pass_nonce BYTEA, -- nonce for AES-GCM + -- Metadata + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT valid_service_check CHECK ( + (check_type = 'service' AND service_name IS NOT NULL AND url IS NULL) + OR + (check_type = 'http' AND url IS NOT NULL AND expected_body IS NOT NULL AND service_name IS NULL) + ) +); + +CREATE INDEX idx_health_checks_host ON host_health_checks (host_id); +``` + +### New table: `host_health_check_results` +```sql +CREATE TABLE host_health_check_results ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + check_id UUID NOT NULL REFERENCES host_health_checks(id) ON DELETE CASCADE, + healthy BOOLEAN NOT NULL, + detail TEXT, + latency_ms INTEGER, + checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_health_results_check ON host_health_check_results (check_id, checked_at DESC); +``` + +### Encryption key storage +- Per-install app key stored at `/etc/patch-manager/keys/health-check.key` +- 256-bit random key generated on first startup if not present +- File permissions: 0600, owned by patch-manager user +- Used for AES-256-GCM encryption of basic_auth_pass + +- [ ] Create migration 007_health_checks.sql +- [ ] Add models to pm-core/src/models.rs +- [ ] Add encryption utility to pm-core +- [ ] Verify migration runs on dev LXC + +--- + +## Phase 2: Backend API Routes + +### Endpoints +- `GET /api/v1/hosts/{id}/health-checks` — list health checks for host (RBAC scoped) +- `POST /api/v1/hosts/{id}/health-checks` — create health check (max 5 per host) +- `PUT /api/v1/hosts/{id}/health-checks/{check_id}` — update health check +- `DELETE /api/v1/hosts/{id}/health-checks/{check_id}` — delete health check +- `POST /api/v1/hosts/{id}/health-checks/{check_id}/test` — run check immediately, return result + +### Request/Response types +```rust +struct CreateHealthCheckRequest { + name: String, + check_type: String, // "service" or "http" + service_name: Option, + url: Option, + expected_body: Option, + ignore_cert_errors: Option, + basic_auth_user: Option, + basic_auth_pass: Option, // plaintext in request, encrypted before storage +} + +struct HealthCheck { + id: Uuid, + host_id: Uuid, + name: String, + check_type: String, + enabled: bool, + service_name: Option, + url: Option, + expected_body: Option, + ignore_cert_errors: bool, + basic_auth_user: Option, + // basic_auth_pass NOT returned in responses + last_result: Option, + created_at: DateTime, + updated_at: DateTime, +} + +struct HealthCheckResult { + healthy: bool, + detail: Option, + latency_ms: Option, + checked_at: DateTime, +} +``` + +- [ ] Add routes to pm-web/src/routes/ (new health_checks.rs) +- [ ] Add CRUD operations +- [ ] Add RBAC enforcement (admin all, operator matching group) +- [ ] Add max-5-per-host validation +- [ ] Add /test endpoint that runs check immediately +- [ ] Add audit logging for create/update/delete +- [ ] Add encryption/decryption for basic_auth_pass + +--- + +## Phase 3: Worker Health Check Engine + +### Continuous Polling +- New task in pm-worker: `health_check_poller` +- Polls all enabled health checks every 5 minutes +- For service checks: call agent `GET /api/v1/system/services/{name}` via mTLS, check `healthy` field +- For HTTP checks: make direct HTTP(S) request from manager, check status code + substring match +- Store results in `host_health_check_results` +- Prune results older than 4 days + +### Pre-Patch Execution Gate +- When a patch job is about to execute on a host: + 1. Check if host has any enabled health checks + 2. If yes, verify all are currently healthy (from latest poll result) + 3. If any are unhealthy: + - Wait and retry at 5-minute intervals + - Continue until maintenance window closes + - If window closes with failed checks, mark job host as failed with detail + 4. If all healthy, proceed with patch execution + +### HTTP Check Implementation +- Use reqwest with: + - 10-second timeout + - Accept invalid certs (ignore_cert_errors) + - Optional basic auth header (decrypt from DB) + - Check response body contains expected_body substring +- Return healthy=true if match, false otherwise + +- [ ] Add health_check_poller module to pm-worker +- [ ] Implement service check via AgentClient +- [ ] Implement HTTP check via reqwest +- [ ] Add pre-patch execution gate to job_executor +- [ ] Add retry loop with 5-minute intervals +- [ ] Add maintenance window expiry check +- [ ] Add health check config to WorkerConfig (poll interval) +- [ ] Add result pruning (4-day retention) + +--- + +## Phase 4: Frontend UI + +### Host Detail Page +- Add "Health Checks" section below host info +- List current health checks with status indicators +- Add/Edit/Delete health check dialogs +- "Test" button to run check immediately and show result +- Visual indicator: green check = healthy, red X = unhealthy, gray = unknown + +### Hosts Page +- Add health check summary column or indicator +- Show aggregate status: all healthy / some unhealthy / no checks configured + +### Deploy Page +- Show health check status in host selection table +- Warn if any selected hosts have unhealthy checks + +### Job Detail +- Show health check gate status when job is waiting for healthy checks +- Display which checks are passing/failing + +- [ ] Add HealthCheck types to frontend/src/types/index.ts +- [ ] Add health check API calls to frontend/src/api/client.ts +- [ ] Add Health Checks section to HostDetailPage.tsx +- [ ] Add health check status to HostsPage.tsx +- [ ] Add health check indicators to PatchDeploymentPage.tsx +- [ ] Add health check gate status to JobsPage.tsx detail view + +--- + +## Phase 5: Integration & Testing +- [ ] Build and deploy to dev LXC +- [ ] Test service health check against dev LXC agent +- [ ] Test HTTP health check against internal services +- [ ] Test pre-patch gate: deploy with failing check, verify retry behavior +- [ ] Test maintenance window expiry with failing checks +- [ ] Test RBAC: operator can only manage checks in their group +- [ ] Test max 5 checks per host enforcement +- [ ] Test basic auth encryption/decryption +- [ ] Push to Gitea + +--- + +## Resolved Items +- ~~Basic auth password storage~~ → Encrypted in DB with per-install app key (AES-256-GCM) +- ~~Health check poll interval~~ → 5 minutes +- ~~Result retention~~ → 4 days (time-based)