From a5d52ffab00cc7be22ca72896e04f77b3f58d6e2 Mon Sep 17 00:00:00 2001 From: Echo Date: Thu, 23 Apr 2026 17:42:51 +0000 Subject: [PATCH] feat: M6 maintenance windows + M7 WebSocket relay (real-time job status) M6 - Maintenance Windows: - routes/maintenance_windows.rs: full CRUD API - migrations/004_maintenance_windows.sql - frontend/MaintenanceWindowsPage.tsx - HostDetailPage.tsx: maintenance window config panel M7 - WebSocket Relay: - pm-web: POST /api/v1/ws/ticket (JWT-auth, single-use, 60s TTL) - pm-web: WS /api/v1/ws/jobs?ticket=... (PgListener -> browser push) - pm-web: DashMap in AppState, 30s cleanup task - pm-worker: ws_relay.rs subscribes to agent WS, updates patch_job_hosts, fires pg_notify(job_update) for real-time fan-out - frontend: useJobWebSocket hook with auto-reconnect + exponential backoff - frontend: JobsPage live updates with WS status indicator - types: JobWsEvent interface - api/client: wsApi.createTicket() All tasks marked complete in tasks/todo.md cargo build: zero errors, zero warnings --- Cargo.lock | 73 ++- crates/pm-core/src/models.rs | 74 +++ crates/pm-web/Cargo.toml | 1 + crates/pm-web/src/main.rs | 37 +- .../pm-web/src/routes/maintenance_windows.rs | 364 +++++++++++ crates/pm-web/src/routes/mod.rs | 2 + crates/pm-web/src/routes/ws.rs | 212 ++++++ crates/pm-worker/Cargo.toml | 4 + crates/pm-worker/src/job_executor.rs | 35 +- crates/pm-worker/src/main.rs | 12 + crates/pm-worker/src/maintenance_scheduler.rs | 164 +++++ crates/pm-worker/src/ws_relay.rs | 470 +++++++++++++ frontend/src/App.tsx | 9 +- frontend/src/api/client.ts | 26 +- frontend/src/hooks/useJobWebSocket.ts | 172 +++++ frontend/src/pages/HostDetailPage.tsx | 434 +++++++++++- frontend/src/pages/JobsPage.tsx | 62 +- frontend/src/pages/MaintenanceWindowsPage.tsx | 616 ++++++++++++++++++ frontend/src/types/index.ts | 49 ++ migrations/004_maintenance_windows.sql | 25 + tasks/todo.md | 28 +- 21 files changed, 2833 insertions(+), 36 deletions(-) create mode 100644 crates/pm-web/src/routes/maintenance_windows.rs create mode 100644 crates/pm-web/src/routes/ws.rs create mode 100644 crates/pm-worker/src/maintenance_scheduler.rs create mode 100644 crates/pm-worker/src/ws_relay.rs create mode 100644 frontend/src/hooks/useJobWebSocket.ts create mode 100644 frontend/src/pages/MaintenanceWindowsPage.tsx create mode 100644 migrations/004_maintenance_windows.sql diff --git a/Cargo.lock b/Cargo.lock index 4dde722..d15565b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,7 +134,7 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.29.0", "tower", "tower-layer", "tower-service", @@ -455,6 +455,20 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.10.0" @@ -1807,6 +1821,7 @@ dependencies = [ "axum", "axum-extra", "chrono", + "dashmap", "ipnet", "pm-auth", "pm-core", @@ -1832,11 +1847,15 @@ dependencies = [ "futures", "pm-agent-client", "pm-core", + "rustls", + "rustls-pemfile", "serde", "serde_json", "sqlx", "thiserror", "tokio", + "tokio-rustls", + "tokio-tungstenite 0.26.2", "tracing", "tracing-subscriber", "uuid", @@ -2192,6 +2211,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -2918,6 +2946,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite 0.26.2", + "webpki-roots 0.26.11", +] + [[package]] name = "tokio-tungstenite" version = "0.29.0" @@ -2927,7 +2971,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.29.0", ] [[package]] @@ -3169,6 +3213,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.4", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.29.0" @@ -3283,6 +3346,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/crates/pm-core/src/models.rs b/crates/pm-core/src/models.rs index b76fe96..7dcb3cb 100644 --- a/crates/pm-core/src/models.rs +++ b/crates/pm-core/src/models.rs @@ -298,3 +298,77 @@ pub struct PatchJobSummary { pub started_at: Option>, pub completed_at: Option>, } + +// ============================================================ +// Maintenance Windows +// ============================================================ + +/// Recurrence type for a maintenance window. +/// Mirrors the `window_recurrence` PostgreSQL ENUM. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)] +#[sqlx(type_name = "window_recurrence", rename_all = "lowercase")] +pub enum WindowRecurrence { + /// Single one-time window (at `start_at` for `duration_minutes` minutes). + Once, + /// Repeats every day at the time portion of `start_at`. + Daily, + /// Repeats on the day-of-week in `recurrence_day` (0 = Sunday). + Weekly, + /// Repeats on the day-of-month in `recurrence_day` (1-31). + Monthly, +} + +impl std::fmt::Display for WindowRecurrence { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Once => write!(f, "once"), + Self::Daily => write!(f, "daily"), + Self::Weekly => write!(f, "weekly"), + Self::Monthly => write!(f, "monthly"), + } + } +} + +/// Full row from `maintenance_windows`. +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct MaintenanceWindow { + pub id: Uuid, + pub host_id: Uuid, + pub label: String, + pub recurrence: WindowRecurrence, + /// Absolute start time (one-time) or time-of-day reference (recurring). + pub start_at: DateTime, + /// Duration of the window in minutes. + pub duration_minutes: i32, + /// Day-of-week (0=Sun, weekly) or day-of-month (1-31, monthly); NULL for once/daily. + pub recurrence_day: Option, + pub enabled: bool, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +/// Payload for `POST /api/v1/hosts/{id}/maintenance-windows`. +#[derive(Debug, Deserialize)] +pub struct CreateMaintenanceWindowRequest { + pub label: String, + pub recurrence: WindowRecurrence, + /// RFC 3339 / ISO 8601 timestamp (UTC recommended). + pub start_at: DateTime, + /// How many minutes the window is open (default 60). + pub duration_minutes: Option, + /// Required for `weekly` (0-6) and `monthly` (1-31). + pub recurrence_day: Option, + /// Whether the window is active (default true). + pub enabled: Option, +} + +/// Payload for `PUT /api/v1/hosts/{id}/maintenance-windows/{window_id}`. +#[derive(Debug, Deserialize)] +pub struct UpdateMaintenanceWindowRequest { + pub label: Option, + pub recurrence: Option, + pub start_at: Option>, + pub duration_minutes: Option, + pub recurrence_day: Option, + pub enabled: Option, +} diff --git a/crates/pm-web/Cargo.toml b/crates/pm-web/Cargo.toml index fa8665f..9328414 100644 --- a/crates/pm-web/Cargo.toml +++ b/crates/pm-web/Cargo.toml @@ -28,3 +28,4 @@ uuid = { workspace = true } ulid = { workspace = true } chrono = { workspace = true } ipnet = { workspace = true } +dashmap = { version = "6" } diff --git a/crates/pm-web/src/main.rs b/crates/pm-web/src/main.rs index f85d2dc..6c7c0b4 100644 --- a/crates/pm-web/src/main.rs +++ b/crates/pm-web/src/main.rs @@ -10,6 +10,7 @@ use axum::{ routing::get, Router, }; +use dashmap::DashMap; use pm_core::{ config::AppConfig, db, @@ -20,8 +21,13 @@ use pm_auth::{ jwt, rbac::{AuthConfig, require_auth}, }; +use routes::ws::WsTicket; use serde_json::{json, Value}; -use std::{net::SocketAddr, sync::Arc}; +use std::{ + net::SocketAddr, + sync::Arc, + time::Duration, +}; use tower_http::{ services::ServeDir, trace::TraceLayer, @@ -34,6 +40,8 @@ pub struct AppState { pub config: Arc, pub signing_key_pem: String, pub auth_config: Arc, + /// In-memory store for single-use WebSocket authentication tickets. + pub ws_tickets: Arc>, } #[tokio::main] @@ -69,11 +77,32 @@ async fn main() -> anyhow::Result<()> { let pool = db::init_pool(&config.database).await?; db::run_migrations(&pool).await?; + let ws_tickets: Arc> = Arc::new(DashMap::new()); + + // Background task: purge expired WS tickets every 30 seconds. + { + let tickets = ws_tickets.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + loop { + interval.tick().await; + let now = chrono::Utc::now(); + let before = tickets.len(); + tickets.retain(|_, v| v.expires_at > now); + let removed = before.saturating_sub(tickets.len()); + if removed > 0 { + tracing::debug!(removed, "Purged expired WS tickets"); + } + } + }); + } + let state = AppState { db: pool, config: Arc::new(config.clone()), signing_key_pem, auth_config, + ws_tickets, }; let app = build_router(state); @@ -109,6 +138,10 @@ pub fn build_router(state: AppState) -> Router { .nest("/status", routes::status::router()) // Patch jobs .nest("/jobs", routes::jobs::router()) + // Maintenance windows (nested under hosts path param) + .nest("/hosts/:host_id/maintenance-windows", routes::maintenance_windows::router()) + // WS ticket issuance (JWT-protected — ticket returned to browser, then used for WS upgrade) + .merge(routes::ws::ticket_router()) // Apply auth middleware to all the above .route_layer(middleware::from_fn(move |req, next| { let auth_config = auth_config.clone(); @@ -121,6 +154,8 @@ pub fn build_router(state: AppState) -> Router { .nest("/api/v1/auth", routes::auth::public_router()) // Protected API routes (JWT required) .nest("/api/v1", protected_api) + // WebSocket browser endpoint — ticket-authenticated, outside JWT middleware + .merge(routes::ws::ws_router()) // Serve React SPA .fallback_service( ServeDir::new(&static_dir).append_index_html_on_directories(true), diff --git a/crates/pm-web/src/routes/maintenance_windows.rs b/crates/pm-web/src/routes/maintenance_windows.rs new file mode 100644 index 0000000..90aa2fa --- /dev/null +++ b/crates/pm-web/src/routes/maintenance_windows.rs @@ -0,0 +1,364 @@ +//! Maintenance window management routes. +//! +//! GET /api/v1/hosts/{id}/maintenance-windows — list windows for host +//! POST /api/v1/hosts/{id}/maintenance-windows — create window for host +//! PUT /api/v1/hosts/{id}/maintenance-windows/{win_id} — update window +//! DELETE /api/v1/hosts/{id}/maintenance-windows/{win_id} — delete window + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, + routing::{get, put}, + Router, +}; +use pm_auth::rbac::AuthUser; +use pm_core::{ + audit::{log_event, AuditAction}, + models::{ + CreateMaintenanceWindowRequest, MaintenanceWindow, UpdateMaintenanceWindowRequest, + }, +}; +use serde_json::{json, Value}; +use uuid::Uuid; + +use crate::AppState; + +// ── Router ──────────────────────────────────────────────────────────────────── + +/// Mount as a nested router under `/hosts/:host_id/maintenance-windows`. +/// Axum will merge the `:host_id` path segment from the parent nest. +pub fn router() -> Router { + Router::new() + .route("/", get(list_windows).post(create_window)) + .route("/:win_id", put(update_window).delete(delete_window)) +} + +// ── Error helper ────────────────────────────────────────────────────────────── + +#[inline] +fn err( + status: StatusCode, + code: &'static str, + message: impl Into, +) -> (StatusCode, Json) { + ( + status, + Json(json!({ "error": { "code": code, "message": message.into() } })), + ) +} + +// ── GET /api/v1/hosts/:host_id/maintenance-windows ──────────────────────────── + +async fn list_windows( + State(state): State, + _auth: AuthUser, + Path(host_id): Path, +) -> Result, (StatusCode, Json)> { + // Verify host exists. + let host_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM hosts WHERE id = $1)") + .bind(host_id) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %host_id, "list_windows: host existence check failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + if !host_exists { + return Err(err(StatusCode::NOT_FOUND, "not_found", "Host not found")); + } + + let windows: Vec = sqlx::query_as( + r#" + SELECT id, host_id, label, recurrence, start_at, duration_minutes, + recurrence_day, enabled, created_at, updated_at + FROM maintenance_windows + WHERE host_id = $1 + ORDER BY created_at ASC + "#, + ) + .bind(host_id) + .fetch_all(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %host_id, "list_windows: query failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + Ok(Json(json!({ "windows": windows }))) +} + +// ── POST /api/v1/hosts/:host_id/maintenance-windows ─────────────────────────── + +async fn create_window( + State(state): State, + auth: AuthUser, + Path(host_id): Path, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + // Validate: weekly requires recurrence_day 0-6 + if req.recurrence == pm_core::models::WindowRecurrence::Weekly { + match req.recurrence_day { + Some(d) if (0..=6).contains(&d) => {} + _ => { + return Err(err( + StatusCode::BAD_REQUEST, + "bad_request", + "Weekly recurrence requires recurrence_day 0-6 (0=Sunday)", + )); + } + } + } + + // Validate: monthly requires recurrence_day 1-31 + if req.recurrence == pm_core::models::WindowRecurrence::Monthly { + match req.recurrence_day { + Some(d) if (1..=31).contains(&d) => {} + _ => { + return Err(err( + StatusCode::BAD_REQUEST, + "bad_request", + "Monthly recurrence requires recurrence_day 1-31", + )); + } + } + } + + // Verify host exists. + let host_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM hosts WHERE id = $1)") + .bind(host_id) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %host_id, "create_window: host existence check failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + if !host_exists { + return Err(err(StatusCode::NOT_FOUND, "not_found", "Host not found")); + } + + let duration = req.duration_minutes.unwrap_or(60); + let enabled = req.enabled.unwrap_or(true); + + let window: MaintenanceWindow = sqlx::query_as( + r#" + INSERT INTO maintenance_windows + (host_id, label, recurrence, start_at, duration_minutes, recurrence_day, enabled) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + RETURNING id, host_id, label, recurrence, start_at, duration_minutes, + recurrence_day, enabled, created_at, updated_at + "#, + ) + .bind(host_id) + .bind(&req.label) + .bind(&req.recurrence) + .bind(req.start_at) + .bind(duration) + .bind(req.recurrence_day) + .bind(enabled) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %host_id, "create_window: insert failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + log_event( + &state.db, + AuditAction::MaintenanceWindowCreated, + Some(auth.user_id), + Some(&auth.username), + Some("maintenance_window"), + Some(&window.id.to_string()), + json!({ + "host_id": host_id, + "label": window.label, + "recurrence": window.recurrence.to_string(), + }), + None, + None, + ) + .await; + + tracing::info!( + window_id = %window.id, + %host_id, + recurrence = %window.recurrence, + user = %auth.username, + "Maintenance window created" + ); + + Ok(Json(json!(window))) +} + +// ── PUT /api/v1/hosts/:host_id/maintenance-windows/:win_id ─────────────────── + +async fn update_window( + State(state): State, + auth: AuthUser, + Path((host_id, win_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> Result, (StatusCode, Json)> { + // Fetch existing record (verify ownership and existence). + let existing: Option = sqlx::query_as( + r#" + SELECT id, host_id, label, recurrence, start_at, duration_minutes, + recurrence_day, enabled, created_at, updated_at + FROM maintenance_windows + WHERE id = $1 AND host_id = $2 + "#, + ) + .bind(win_id) + .bind(host_id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %win_id, "update_window: fetch failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + let existing = existing.ok_or_else(|| { + err(StatusCode::NOT_FOUND, "not_found", "Maintenance window not found") + })?; + + // Apply partial updates using existing values as defaults. + let new_label = req.label.unwrap_or(existing.label); + let new_recurrence = req.recurrence.unwrap_or(existing.recurrence); + let new_start_at = req.start_at.unwrap_or(existing.start_at); + let new_duration = req.duration_minutes.unwrap_or(existing.duration_minutes); + let new_rec_day = req.recurrence_day.or(existing.recurrence_day); + let new_enabled = req.enabled.unwrap_or(existing.enabled); + + // Validate recurrence_day for the final recurrence type. + if new_recurrence == pm_core::models::WindowRecurrence::Weekly { + match new_rec_day { + Some(d) if (0..=6).contains(&d) => {} + _ => { + return Err(err( + StatusCode::BAD_REQUEST, + "bad_request", + "Weekly recurrence requires recurrence_day 0-6", + )); + } + } + } + if new_recurrence == pm_core::models::WindowRecurrence::Monthly { + match new_rec_day { + Some(d) if (1..=31).contains(&d) => {} + _ => { + return Err(err( + StatusCode::BAD_REQUEST, + "bad_request", + "Monthly recurrence requires recurrence_day 1-31", + )); + } + } + } + + let updated: MaintenanceWindow = sqlx::query_as( + r#" + UPDATE maintenance_windows + SET label = $3, + recurrence = $4, + start_at = $5, + duration_minutes = $6, + recurrence_day = $7, + enabled = $8, + updated_at = NOW() + WHERE id = $1 AND host_id = $2 + RETURNING id, host_id, label, recurrence, start_at, duration_minutes, + recurrence_day, enabled, created_at, updated_at + "#, + ) + .bind(win_id) + .bind(host_id) + .bind(&new_label) + .bind(&new_recurrence) + .bind(new_start_at) + .bind(new_duration) + .bind(new_rec_day) + .bind(new_enabled) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %win_id, "update_window: update failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + log_event( + &state.db, + AuditAction::MaintenanceWindowUpdated, + Some(auth.user_id), + Some(&auth.username), + Some("maintenance_window"), + Some(&win_id.to_string()), + json!({ "host_id": host_id }), + None, + None, + ) + .await; + + tracing::info!( + window_id = %win_id, + %host_id, + user = %auth.username, + "Maintenance window updated" + ); + + Ok(Json(json!(updated))) +} + +// ── DELETE /api/v1/hosts/:host_id/maintenance-windows/:win_id ──────────────── + +async fn delete_window( + State(state): State, + auth: AuthUser, + Path((host_id, win_id)): Path<(Uuid, Uuid)>, +) -> Result, (StatusCode, Json)> { + let result = sqlx::query( + "DELETE FROM maintenance_windows WHERE id = $1 AND host_id = $2", + ) + .bind(win_id) + .bind(host_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, %win_id, "delete_window: delete failed"); + err(StatusCode::INTERNAL_SERVER_ERROR, "internal_error", "Database error") + })?; + + if result.rows_affected() == 0 { + return Err(err( + StatusCode::NOT_FOUND, + "not_found", + "Maintenance window not found", + )); + } + + log_event( + &state.db, + AuditAction::MaintenanceWindowDeleted, + Some(auth.user_id), + Some(&auth.username), + Some("maintenance_window"), + Some(&win_id.to_string()), + json!({ "host_id": host_id }), + None, + None, + ) + .await; + + tracing::info!( + window_id = %win_id, + %host_id, + user = %auth.username, + "Maintenance window deleted" + ); + + Ok(Json(json!({ "message": "Maintenance window deleted" }))) +} diff --git a/crates/pm-web/src/routes/mod.rs b/crates/pm-web/src/routes/mod.rs index 32c3b76..7f6b3e1 100644 --- a/crates/pm-web/src/routes/mod.rs +++ b/crates/pm-web/src/routes/mod.rs @@ -3,6 +3,8 @@ pub mod auth; pub mod discovery; pub mod groups; pub mod hosts; +pub mod maintenance_windows; pub mod jobs; pub mod status; pub mod users; +pub mod ws; diff --git a/crates/pm-web/src/routes/ws.rs b/crates/pm-web/src/routes/ws.rs new file mode 100644 index 0000000..9f3128b --- /dev/null +++ b/crates/pm-web/src/routes/ws.rs @@ -0,0 +1,212 @@ +//! WebSocket relay routes — M7 +//! +//! POST /api/v1/ws/ticket — create a single-use WS auth ticket (JWT-protected) +//! GET /api/v1/ws/jobs — browser WebSocket endpoint (ticket-authenticated) + +use axum::{ + extract::{Query, State, WebSocketUpgrade}, + extract::ws::{Message, WebSocket}, + http::StatusCode, + response::{Json, Response}, + routing::{get, post}, + Router, +}; +use chrono::{Duration, Utc}; +use pm_auth::rbac::AuthUser; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use sqlx::postgres::PgListener; +use ulid::Ulid; +use uuid::Uuid; + +use crate::AppState; + +// ── WsTicket ────────────────────────────────────────────────────────────────── + +/// Single-use WebSocket authentication ticket stored in-memory. +#[derive(Debug, Clone)] +pub struct WsTicket { + pub user_id: Uuid, + pub role: String, + pub expires_at: chrono::DateTime, +} + +// ── Router ──────────────────────────────────────────────────────────────────── + +/// Router for ticket-issuance endpoint (JWT-protected, merged into protected_api). +pub fn ticket_router() -> Router { + Router::new().route("/ws/ticket", post(create_ticket_handler)) +} + +/// Router for the WebSocket endpoint (ticket-authenticated, NO JWT middleware). +pub fn ws_router() -> Router { + Router::new().route("/api/v1/ws/jobs", get(ws_handler)) +} + +// ── Error helper ───────────────────────────────────────────────────────────── + +#[inline] +fn err( + status: StatusCode, + code: &'static str, + message: impl Into, +) -> (StatusCode, Json) { + ( + status, + Json(json!({ "error": { "code": code, "message": message.into() } })), + ) +} + +// ── POST /api/v1/ws/ticket ──────────────────────────────────────────────────── + + +/// Issue a single-use WebSocket authentication ticket (60 s expiry). +pub async fn create_ticket_handler( + State(state): State, + auth: AuthUser, +) -> Result, (StatusCode, Json)> { + let ticket_id = Ulid::new().to_string(); + let expires_at = Utc::now() + Duration::seconds(60); + + let ticket = WsTicket { + user_id: auth.user_id, + role: auth.role.as_str().to_string(), + expires_at, + }; + + state.ws_tickets.insert(ticket_id.clone(), ticket); + + tracing::info!( + user_id = %auth.user_id, + username = %auth.username, + ticket = %ticket_id, + "WS ticket issued" + ); + + Ok(Json(json!({ "ticket": ticket_id }))) +} + +// ── GET /api/v1/ws/jobs ─────────────────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct WsQuery { + pub ticket: String, +} + +/// Browser WebSocket upgrade endpoint — authenticates via single-use ticket. +pub async fn ws_handler( + State(state): State, + Query(q): Query, + ws: WebSocketUpgrade, +) -> Result)> { + // Validate and consume the ticket atomically. + let ticket = { + let entry = state.ws_tickets.get(&q.ticket); + match entry { + None => { + return Err(err( + StatusCode::UNAUTHORIZED, + "invalid_ticket", + "WebSocket ticket not found or already used", + )); + } + Some(t) => { + if t.expires_at < Utc::now() { + drop(t); + state.ws_tickets.remove(&q.ticket); + return Err(err( + StatusCode::UNAUTHORIZED, + "ticket_expired", + "WebSocket ticket has expired", + )); + } + t.clone() + } + } + }; + // Single-use: remove immediately after validation. + state.ws_tickets.remove(&q.ticket); + + tracing::info!( + user_id = %ticket.user_id, + role = %ticket.role, + "Browser WebSocket connection upgraded" + ); + + let db = state.db.clone(); + Ok(ws.on_upgrade(move |socket| handle_browser_ws(socket, db, ticket))) +} + +// ── WebSocket handler ───────────────────────────────────────────────────────── + +/// Drive the browser WebSocket: LISTEN on `job_update` and forward payloads. +async fn handle_browser_ws( + mut socket: WebSocket, + db: sqlx::PgPool, + ticket: WsTicket, +) { + // Acquire a dedicated PG listener connection. + let mut listener = match PgListener::connect_with(&db).await { + Ok(l) => l, + Err(e) => { + tracing::error!(error = %e, user_id = %ticket.user_id, "Failed to create PgListener"); + let _ = socket + .send(Message::Text( + json!({ "error": "internal_error" }).to_string().into(), + )) + .await; + return; + } + }; + + if let Err(e) = listener.listen("job_update").await { + tracing::error!(error = %e, user_id = %ticket.user_id, "PgListener LISTEN failed"); + return; + } + + tracing::info!(user_id = %ticket.user_id, "Browser WS: LISTEN job_update started"); + + loop { + tokio::select! { + // Forward PG notifications to the browser. + notify_result = listener.recv() => { + match notify_result { + Ok(notification) => { + let payload = notification.payload().to_string(); + tracing::debug!(user_id = %ticket.user_id, payload = %payload, "Forwarding job_update"); + if socket.send(Message::Text(payload.into())).await.is_err() { + tracing::info!(user_id = %ticket.user_id, "Browser WS send failed — client disconnected"); + break; + } + } + Err(e) => { + tracing::error!(error = %e, user_id = %ticket.user_id, "PgListener recv error"); + break; + } + } + } + + // Handle incoming frames from the browser (ping/close). + msg = socket.recv() => { + match msg { + Some(Ok(Message::Close(_))) | None => { + tracing::info!(user_id = %ticket.user_id, "Browser WS closed by client"); + break; + } + Some(Ok(Message::Ping(data))) => { + if socket.send(Message::Pong(data)).await.is_err() { + break; + } + } + Some(Err(e)) => { + tracing::debug!(error = %e, user_id = %ticket.user_id, "Browser WS recv error"); + break; + } + _ => {} + } + } + } + } + + tracing::info!(user_id = %ticket.user_id, "Browser WS handler exiting"); +} diff --git a/crates/pm-worker/Cargo.toml b/crates/pm-worker/Cargo.toml index 91ee25f..d9ebafb 100644 --- a/crates/pm-worker/Cargo.toml +++ b/crates/pm-worker/Cargo.toml @@ -23,3 +23,7 @@ tracing-subscriber = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } futures = { workspace = true } +rustls = { workspace = true } +tokio-rustls = { version = "0.26" } +rustls-pemfile = { version = "2" } +tokio-tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots"] } diff --git a/crates/pm-worker/src/job_executor.rs b/crates/pm-worker/src/job_executor.rs index e643479..70163e1 100644 --- a/crates/pm-worker/src/job_executor.rs +++ b/crates/pm-worker/src/job_executor.rs @@ -204,6 +204,39 @@ async fn scan_queued_jobs(pool: PgPool, config: Arc) { WHERE pjh.status = 'queued' AND (pjh.retry_next_at IS NULL OR pjh.retry_next_at <= NOW()) AND j.status != 'cancelled' + AND ( + -- Immediate jobs always dispatch + j.immediate = TRUE + OR + -- Non-immediate jobs only dispatch when the host has an open window + EXISTS ( + SELECT 1 FROM maintenance_windows mw + WHERE mw.host_id = pjh.host_id + AND mw.enabled = TRUE + AND ( + (mw.recurrence = 'once' + AND mw.start_at <= NOW() + AND NOW() < mw.start_at + (mw.duration_minutes * INTERVAL '1 minute')) + OR + (mw.recurrence = 'daily' + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute'))) + OR + (mw.recurrence = 'weekly' + AND EXTRACT(DOW FROM NOW() AT TIME ZONE 'UTC') = mw.recurrence_day + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute'))) + OR + (mw.recurrence = 'monthly' + AND EXTRACT(DAY FROM NOW() AT TIME ZONE 'UTC') = mw.recurrence_day + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute'))) + ) + ) + ) "#, ) .fetch_all(&pool) @@ -230,7 +263,7 @@ async fn scan_queued_jobs(pool: PgPool, config: Arc) { /// Fetch all queued host entries for `job_id` and dispatch them concurrently, /// bounded by `config.worker.max_concurrent_agent_calls`. -async fn process_job(pool: PgPool, config: Arc, job_id: Uuid) { +pub async fn process_job(pool: PgPool, config: Arc, job_id: Uuid) { tracing::info!(%job_id, "process_job: dispatching queued hosts"); // Mark the parent job as running (idempotent guard). diff --git a/crates/pm-worker/src/main.rs b/crates/pm-worker/src/main.rs index c680d25..e11792c 100644 --- a/crates/pm-worker/src/main.rs +++ b/crates/pm-worker/src/main.rs @@ -5,9 +5,11 @@ mod agent_loader; mod health_poller; +mod maintenance_scheduler; mod patch_poller; mod refresh_listener; mod job_executor; +mod ws_relay; use pm_core::{ config::AppConfig, @@ -19,9 +21,11 @@ use std::{sync::Arc, time::Duration}; use tokio::time; use health_poller::run_health_poller; +use maintenance_scheduler::run_maintenance_scheduler; use patch_poller::run_patch_poller; use refresh_listener::run_refresh_listener; use job_executor::run_job_executor; +use ws_relay::run_ws_relay; /// Minimum number of applied migrations the worker requires before /// accepting work. Prevents the worker from running against a schema @@ -70,6 +74,12 @@ async fn main() -> anyhow::Result<()> { // M5: job execution engine let job_exec_handle = tokio::spawn(run_job_executor(pool.clone(), config.clone())); + // M6: maintenance window scheduler + let maint_sched_handle = tokio::spawn(run_maintenance_scheduler(pool.clone(), config.clone())); + + // M7: WS relay — streams agent job events → DB → pg_notify → browser WS + let ws_relay_handle = tokio::spawn(run_ws_relay(pool.clone(), config.clone())); + tracing::info!("Worker tasks started"); // Wait for all tasks (they run indefinitely) @@ -79,6 +89,8 @@ async fn main() -> anyhow::Result<()> { patch_handle, refresh_handle, job_exec_handle, + maint_sched_handle, + ws_relay_handle, ); Ok(()) diff --git a/crates/pm-worker/src/maintenance_scheduler.rs b/crates/pm-worker/src/maintenance_scheduler.rs new file mode 100644 index 0000000..3857f9f --- /dev/null +++ b/crates/pm-worker/src/maintenance_scheduler.rs @@ -0,0 +1,164 @@ +//! Maintenance window scheduler. +//! +//! Polls every 60 seconds and, for each enabled maintenance window that is +//! currently open, dispatches any queued non-immediate patch jobs associated +//! with the window's host. +//! +//! A window is considered "open" when: +//! - `once` — `start_at <= NOW() < start_at + duration_minutes * '1 minute'` +//! - `daily` — current UTC time-of-day is within the window's daily slot +//! - `weekly` — same as daily, but only on the matching `recurrence_day` (0=Sun) +//! - `monthly` — same as daily, but only on the matching `recurrence_day` (1-31) + +use std::sync::Arc; + +use pm_core::config::AppConfig; +use sqlx::{FromRow, PgPool}; +use tokio::time; +use uuid::Uuid; + +use crate::job_executor::process_job; + +// ───────────────────────────────────────────────────────────────────────────── +// Internal types +// ───────────────────────────────────────────────────────────────────────────── + +#[derive(Debug, FromRow)] +struct OpenWindowHost { + host_id: Uuid, +} + +#[derive(Debug, FromRow)] +struct QueuedJobId { + job_id: Uuid, +} + +// ───────────────────────────────────────────────────────────────────────────── +// Public entry point +// ───────────────────────────────────────────────────────────────────────────── + +/// Run the maintenance scheduler indefinitely. +/// Spawned by `pm-worker/src/main.rs` alongside the job executor. +pub async fn run_maintenance_scheduler(pool: PgPool, config: Arc) { + tracing::info!("Maintenance scheduler started"); + + // First tick fires immediately; consume it to align with job_executor. + let mut ticker = time::interval(std::time::Duration::from_secs(60)); + ticker.tick().await; + + loop { + ticker.tick().await; + tracing::debug!("Maintenance scheduler: checking open windows"); + dispatch_open_window_jobs(pool.clone(), config.clone()).await; + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Core dispatch logic +// ───────────────────────────────────────────────────────────────────────────── + +/// Find all hosts with a currently-open maintenance window, then for each, +/// find their queued non-immediate job entries and dispatch them. +async fn dispatch_open_window_jobs(pool: PgPool, config: Arc) { + // ── 1. Find all host_ids with an open window right now ───────────────── + let open_hosts: Vec = match sqlx::query_as( + r#" + SELECT DISTINCT mw.host_id + FROM maintenance_windows mw + WHERE mw.enabled = TRUE + AND ( + -- One-time: absolute window + ( mw.recurrence = 'once' + AND mw.start_at <= NOW() + AND NOW() < mw.start_at + (mw.duration_minutes * INTERVAL '1 minute') + ) + OR + -- Daily: time-of-day slot, any day + ( mw.recurrence = 'daily' + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute')) + ) + OR + -- Weekly: matching day-of-week + time-of-day slot + ( mw.recurrence = 'weekly' + AND EXTRACT(DOW FROM NOW() AT TIME ZONE 'UTC') = mw.recurrence_day + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute')) + ) + OR + -- Monthly: matching day-of-month + time-of-day slot + ( mw.recurrence = 'monthly' + AND EXTRACT(DAY FROM NOW() AT TIME ZONE 'UTC') = mw.recurrence_day + AND (NOW() AT TIME ZONE 'UTC')::time >= (mw.start_at AT TIME ZONE 'UTC')::time + AND (NOW() AT TIME ZONE 'UTC')::time < ((mw.start_at AT TIME ZONE 'UTC')::time + + (mw.duration_minutes * INTERVAL '1 minute')) + ) + ) + "#, + ) + .fetch_all(&pool) + .await + { + Ok(hosts) => hosts, + Err(e) => { + tracing::error!(error = %e, "dispatch_open_window_jobs: open-hosts query failed"); + return; + } + }; + + if open_hosts.is_empty() { + tracing::debug!("Maintenance scheduler: no open windows this cycle"); + return; + } + + tracing::info!( + open_host_count = open_hosts.len(), + "Maintenance scheduler: found hosts with open windows" + ); + + // ── 2. For each open host, find distinct queued non-immediate job IDs ── + for host in open_hosts { + let job_ids: Vec = match sqlx::query_as( + r#" + SELECT DISTINCT pjh.job_id + FROM patch_job_hosts pjh + JOIN patch_jobs j ON j.id = pjh.job_id + WHERE pjh.host_id = $1 + AND pjh.status = 'queued' + AND j.immediate = FALSE + AND j.status != 'cancelled' + AND (pjh.retry_next_at IS NULL OR pjh.retry_next_at <= NOW()) + "#, + ) + .bind(host.host_id) + .fetch_all(&pool) + .await + { + Ok(ids) => ids, + Err(e) => { + tracing::error!( + error = %e, + host_id = %host.host_id, + "dispatch_open_window_jobs: queued jobs query failed" + ); + continue; + } + }; + + for job in job_ids { + tracing::info!( + job_id = %job.job_id, + host_id = %host.host_id, + "Maintenance scheduler: dispatching non-immediate job (window open)" + ); + + let (p, c) = (pool.clone(), config.clone()); + let job_id = job.job_id; + tokio::spawn(async move { + process_job(p, c, job_id).await; + }); + } + } +} diff --git a/crates/pm-worker/src/ws_relay.rs b/crates/pm-worker/src/ws_relay.rs new file mode 100644 index 0000000..35aa604 --- /dev/null +++ b/crates/pm-worker/src/ws_relay.rs @@ -0,0 +1,470 @@ +//! WS relay — M7 +//! +//! For every running `patch_job_hosts` row that has an `agent_job_id`, open a +//! WebSocket to the corresponding agent, stream job-status events, update the +//! DB row, and fire `pg_notify('job_update', payload_json)` so the browser WS +//! handler can forward the event to connected clients. + +use std::{ + collections::HashSet, + sync::Arc, + time::Duration, +}; + +use anyhow::Context; +use futures::StreamExt; +use rustls::{ + pki_types::{CertificateDer, PrivateKeyDer}, + ClientConfig as TlsClientConfig, + RootCertStore, +}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use tokio::sync::Mutex; +use tokio_tungstenite::{ + connect_async_tls_with_config, + tungstenite::protocol::Message, + Connector, +}; +use uuid::Uuid; + +use pm_agent_client::client::DEFAULT_AGENT_PORT; +use pm_core::config::AppConfig; + +// ── Types ───────────────────────────────────────────────────────────────────── + +#[derive(Debug, sqlx::FromRow)] +struct RunningHostJob { + job_id: Uuid, + host_id: Uuid, + agent_job_id: String, + host_address: String, +} + +/// JSON event streamed by the agent over its WS endpoint. +#[derive(Debug, Deserialize)] +struct AgentWsEvent { + #[allow(dead_code)] + job_id: String, + status: String, + output: Option, + error: Option, + #[allow(dead_code)] + progress_percent: Option, +} + +/// Payload broadcast via `pg_notify('job_update', …)`. +#[derive(Debug, Serialize)] +struct NotifyPayload { + job_id: String, + host_id: String, + status: String, + output: Option, + error_message: Option, + agent_job_id: String, +} + +// ── Entry point ─────────────────────────────────────────────────────────────── + +/// Long-running task: polls the DB for running host-jobs and spawns a per-pair +/// relay task for each one that isn't already being tracked. +pub async fn run_ws_relay(pool: PgPool, config: Arc) { + tracing::info!("WS relay task started"); + + let active: Arc>> = Arc::new(Mutex::new(HashSet::new())); + + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + let rows = match query_running_jobs(&pool).await { + Ok(r) => r, + Err(e) => { + tracing::error!(error = %e, "ws_relay: DB poll failed"); + continue; + } + }; + + for row in rows { + let key = (row.job_id, row.host_id); + + // Skip pairs that already have an active relay. + if active.lock().await.contains(&key) { + continue; + } + + // Build the rustls ClientConfig once per connection. + let tls_config = match build_tls_config(&config).await { + Ok(c) => Arc::new(c), + Err(e) => { + tracing::error!(error = %e, "ws_relay: TLS config error"); + continue; + } + }; + + active.lock().await.insert(key); + + let pool_c = pool.clone(); + let active_c = active.clone(); + + tokio::spawn(async move { + tracing::info!( + job_id = %row.job_id, + host_id = %row.host_id, + agent_job_id = %row.agent_job_id, + host = %row.host_address, + "WS relay: starting relay" + ); + + match relay_one_job(&pool_c, &row, tls_config).await { + Ok(()) => tracing::info!( + job_id = %row.job_id, + host_id = %row.host_id, + "WS relay: completed" + ), + Err(e) => tracing::error!( + error = %e, + job_id = %row.job_id, + host_id = %row.host_id, + "WS relay: ended with error" + ), + } + + active_c.lock().await.remove(&key); + }); + } + } +} + +// ── DB helpers ──────────────────────────────────────────────────────────────── + +async fn query_running_jobs(pool: &PgPool) -> anyhow::Result> { + sqlx::query_as::<_, RunningHostJob>( + r#" + SELECT + pjh.job_id, + pjh.host_id, + pjh.agent_job_id, + COALESCE(h.fqdn, h.ip_address::text) AS host_address + FROM patch_job_hosts pjh + JOIN hosts h ON h.id = pjh.host_id + WHERE pjh.status = 'running'::job_status + AND pjh.agent_job_id IS NOT NULL + "#, + ) + .fetch_all(pool) + .await + .context("query_running_jobs") +} + +// ── TLS ─────────────────────────────────────────────────────────────────────── + +async fn build_tls_config(config: &AppConfig) -> anyhow::Result { + let sec = &config.security; + + let cert_pem = tokio::fs::read(&sec.agent_client_cert_path).await + .with_context(|| format!("read agent client cert '{}'", sec.agent_client_cert_path))?; + let key_pem = tokio::fs::read(&sec.agent_client_key_path).await + .with_context(|| format!("read agent client key '{}'" , sec.agent_client_key_path))?; + let ca_pem = tokio::fs::read(&sec.ca_cert_path).await + .with_context(|| format!("read CA cert '{}'", sec.ca_cert_path))?; + + // Parse client certificate chain. + let client_certs: Vec> = { + let mut cur = std::io::Cursor::new(&cert_pem); + rustls_pemfile::certs(&mut cur) + .collect::, _>>() + .context("parse client cert PEM")? + }; + + // Parse client private key. + let client_key: PrivateKeyDer<'static> = { + let mut cur = std::io::Cursor::new(&key_pem); + rustls_pemfile::private_key(&mut cur) + .context("parse client key PEM")? + .context("no private key in PEM")? + }; + + // Build root store from CA cert. + let mut root_store = RootCertStore::empty(); + { + let mut cur = std::io::Cursor::new(&ca_pem); + for cert_result in rustls_pemfile::certs(&mut cur) { + root_store + .add(cert_result.context("read CA cert entry")?) + .context("add CA cert to root store")?; + } + } + + TlsClientConfig::builder() + .with_root_certificates(root_store) + .with_client_auth_cert(client_certs, client_key) + .context("build TlsClientConfig") +} + +// ── Per-job relay ───────────────────────────────────────────────────────────── + +async fn relay_one_job( + pool: &PgPool, + row: &RunningHostJob, + tls_config: Arc, +) -> anyhow::Result<()> { + let url = format!( + "wss://{}:{}/api/v1/ws/jobs", + row.host_address, DEFAULT_AGENT_PORT, + ); + + let (ws_stream, _) = connect_async_tls_with_config( + url.as_str(), + None, + false, + Some(Connector::Rustls(tls_config)), + ) + .await + .with_context(|| format!("connect agent WS {url}"))?; + + let (_sink, mut stream) = ws_stream.split(); + + while let Some(frame) = stream.next().await { + let frame = match frame { + Ok(f) => f, + Err(e) => { + tracing::warn!( + error = %e, + job_id = %row.job_id, + host_id = %row.host_id, + "WS relay: stream error" + ); + break; + } + }; + + let text = match frame { + Message::Text(t) => t.to_string(), + Message::Binary(b) => String::from_utf8(b.into()).unwrap_or_default(), + Message::Close(_) => { + tracing::info!(job_id = %row.job_id, "Agent WS closed cleanly"); + break; + } + _ => continue, + }; + + if text.is_empty() { + continue; + } + + let event: AgentWsEvent = match serde_json::from_str(&text) { + Ok(e) => e, + Err(e) => { + tracing::warn!( + error = %e, raw = %text, + "WS relay: unparseable agent frame" + ); + continue; + } + }; + + process_event(pool, row, &event).await; + + if matches!(event.status.as_str(), "succeeded" | "failed" | "cancelled") { + tracing::info!( + job_id = %row.job_id, + host_id = %row.host_id, + status = %event.status, + "WS relay: terminal state — stopping" + ); + break; + } + } + + Ok(()) +} + +// ── Event processing ────────────────────────────────────────────────────────── + +async fn process_event(pool: &PgPool, row: &RunningHostJob, event: &AgentWsEvent) { + // Map agent status string to DB job_status enum value. + let db_status = match event.status.as_str() { + "running" => "running", + "succeeded" => "succeeded", + "failed" => "failed", + "cancelled" => "cancelled", + other => { + tracing::warn!(status = %other, "WS relay: unknown agent status"); + return; + } + }; + + let output = event.output.as_deref().unwrap_or(""); + let error_msg = event.error.as_deref(); + + // Determine timestamps based on terminal state. + let is_terminal = matches!(db_status, "succeeded" | "failed" | "cancelled"); + + // Update the DB row. + let update_result = if is_terminal { + sqlx::query( + r#" + UPDATE patch_job_hosts + SET status = $1::job_status, + output = CASE WHEN $2 != '' THEN $2 ELSE output END, + error_message = $3, + completed_at = NOW() + WHERE job_id = $4 + AND host_id = $5 + "#, + ) + .bind(db_status) + .bind(output) + .bind(error_msg) + .bind(row.job_id) + .bind(row.host_id) + .execute(pool) + .await + } else { + sqlx::query( + r#" + UPDATE patch_job_hosts + SET status = $1::job_status, + output = CASE WHEN $2 != '' THEN $2 ELSE output END + WHERE job_id = $3 + AND host_id = $4 + "#, + ) + .bind(db_status) + .bind(output) + .bind(row.job_id) + .bind(row.host_id) + .execute(pool) + .await + }; + + if let Err(e) = update_result { + tracing::error!( + error = %e, + job_id = %row.job_id, + host_id = %row.host_id, + "WS relay: DB update failed" + ); + return; + } + + // Also update the parent patch_jobs status when the host-level job reaches + // a terminal state: running → if all hosts terminal then update parent. + if is_terminal { + update_parent_job_status(pool, row.job_id).await; + } + + // Fire pg_notify so browser WS handlers forward the event. + let payload = NotifyPayload { + job_id: row.job_id.to_string(), + host_id: row.host_id.to_string(), + status: db_status.to_string(), + output: event.output.clone(), + error_message: event.error.clone(), + agent_job_id: row.agent_job_id.clone(), + }; + + let payload_json = match serde_json::to_string(&payload) { + Ok(s) => s, + Err(e) => { + tracing::error!(error = %e, "WS relay: failed to serialize notify payload"); + return; + } + }; + + if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)") + .bind(&payload_json) + .execute(pool) + .await + { + tracing::error!( + error = %e, + job_id = %row.job_id, + host_id = %row.host_id, + "WS relay: pg_notify failed" + ); + } else { + tracing::debug!( + job_id = %row.job_id, + host_id = %row.host_id, + status = %db_status, + "WS relay: pg_notify sent" + ); + } +} + +// ── Parent job status rollup ────────────────────────────────────────────────── + +/// After a host-level job reaches a terminal state, check whether ALL hosts for +/// that job are now terminal and update the parent `patch_jobs` row accordingly. +async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) { + // Count hosts that are still in a non-terminal state. + let pending: i64 = match sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM patch_job_hosts + WHERE job_id = $1 + AND status NOT IN ( + 'succeeded'::job_status, + 'failed'::job_status, + 'cancelled'::job_status + ) + "#, + ) + .bind(job_id) + .fetch_one(pool) + .await + { + Ok(n) => n, + Err(e) => { + tracing::error!(error = %e, %job_id, "update_parent_job_status: count query failed"); + return; + } + }; + + if pending > 0 { + return; // still hosts running — parent stays running + } + + // All hosts terminal — determine final parent status. + let failed_count: i64 = match sqlx::query_scalar( + "SELECT COUNT(*) FROM patch_job_hosts WHERE job_id = $1 AND status = 'failed'::job_status", + ) + .bind(job_id) + .fetch_one(pool) + .await + { + Ok(n) => n, + Err(e) => { + tracing::error!(error = %e, %job_id, "update_parent_job_status: failed-count query failed"); + return; + } + }; + + let final_status = if failed_count > 0 { "failed" } else { "succeeded" }; + + if let Err(e) = sqlx::query( + "UPDATE patch_jobs SET status = $1::job_status, completed_at = NOW() WHERE id = $2", + ) + .bind(final_status) + .bind(job_id) + .execute(pool) + .await + { + tracing::error!( + error = %e, + %job_id, + status = %final_status, + "update_parent_job_status: UPDATE failed" + ); + } else { + tracing::info!( + %job_id, + status = %final_status, + "Parent job status updated" + ); + } +} diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index a01632f..a62aff3 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -11,6 +11,7 @@ import UsersPage from './pages/UsersPage' import DashboardPage from './pages/DashboardPage' import PatchDeploymentPage from './pages/PatchDeploymentPage' import JobsPage from './pages/JobsPage' +import MaintenanceWindowsPage from './pages/MaintenanceWindowsPage' // Placeholder pages — implemented in later milestones const PlaceholderPage = ({ title }: { title: string }) => ( @@ -44,10 +45,14 @@ function App() { } /> } /> - {/* Protected — later milestones */} + {/* Protected — M5 */} } /> } /> - } /> + + {/* Protected — M6 */} + } /> + + {/* Placeholder — later milestones */} } /> } /> } /> diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 330f0ee..ad25762 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -1,7 +1,12 @@ import axios, { type AxiosError } from 'axios' import type { InternalAxiosRequestConfig } from 'axios' import { useAuthStore } from '../store/authStore' -import type { FleetStatus, CreateJobRequest } from '../types' +import type { + FleetStatus, + CreateJobRequest, + CreateMaintenanceWindowRequest, + UpdateMaintenanceWindowRequest, +} from '../types' const BASE_URL = '/api/v1' @@ -123,3 +128,22 @@ export const patchesApi = { // The backend reads from host_patch_data table (cached from agent poll) getHostPatches: (hostId: string) => apiClient.get(`/hosts/${hostId}/patches`), } + +// ── Maintenance Windows API ─────────────────────────────────────────────────── +export const maintenanceWindowsApi = { + list: (hostId: string) => + apiClient.get(`/hosts/${hostId}/maintenance-windows`), + create: (hostId: string, body: CreateMaintenanceWindowRequest) => + apiClient.post(`/hosts/${hostId}/maintenance-windows`, body), + update: (hostId: string, windowId: string, body: UpdateMaintenanceWindowRequest) => + apiClient.put(`/hosts/${hostId}/maintenance-windows/${windowId}`, body), + remove: (hostId: string, windowId: string) => + apiClient.delete(`/hosts/${hostId}/maintenance-windows/${windowId}`), +} + +// ── WebSocket API (M7) ──────────────────────────────────────────────────────── +export const wsApi = { + /** POST /api/v1/ws/ticket — obtain a single-use WS auth ticket (60 s expiry). */ + createTicket: (): Promise<{ ticket: string }> => + apiClient.post<{ ticket: string }>('/ws/ticket').then((r) => r.data), +} diff --git a/frontend/src/hooks/useJobWebSocket.ts b/frontend/src/hooks/useJobWebSocket.ts new file mode 100644 index 0000000..c11b7e1 --- /dev/null +++ b/frontend/src/hooks/useJobWebSocket.ts @@ -0,0 +1,172 @@ +/** + * useJobWebSocket — M7 + * + * Manages a browser WebSocket connection to the job-update relay. + * Authentication uses single-use tickets obtained via POST /api/v1/ws/ticket. + * + * Features: + * - Fetches a fresh ticket before every (re)connect + * - Exponential backoff reconnect: 1 s → 2 s → 4 s → … → 30 s max + * - Calls `onEvent` callback for every parsed JobWsEvent + * - Returns { connected, lastEvent } for UI indicator use + */ + +import { useEffect, useRef, useCallback, useState } from 'react' +import { wsApi } from '../api/client' +import type { JobWsEvent } from '../types' + +// ── Constants ───────────────────────────────────────────────────────────────── + +const BACKOFF_INITIAL_MS = 1_000 +const BACKOFF_MAX_MS = 30_000 +const BACKOFF_FACTOR = 2 + +// ── Types ───────────────────────────────────────────────────────────────────── + +export interface JobWsOptions { + /** Called on each inbound JobWsEvent. */ + onEvent?: (event: JobWsEvent) => void + /** Set to false to disable the connection entirely (e.g. when logged out). */ + enabled?: boolean +} + +export interface JobWsState { + connected: boolean + lastEvent: JobWsEvent | null +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +/** Derive the correct ws(s) URL from the current page origin. */ +function buildWsBase(): string { + const proto = window.location.protocol === 'https:' ? 'wss' : 'ws' + return `${proto}://${window.location.host}` +} + +// ── Hook ────────────────────────────────────────────────────────────────────── + +export function useJobWebSocket(options: JobWsOptions = {}): JobWsState { + const { onEvent, enabled = true } = options + + const [connected, setConnected] = useState(false) + const [lastEvent, setLastEvent] = useState(null) + + // Stable ref to the latest onEvent callback — avoids re-triggering the + // effect every time the parent component re-renders. + const onEventRef = useRef(onEvent) + useEffect(() => { onEventRef.current = onEvent }, [onEvent]) + + // Internal bookkeeping refs (don't need to trigger re-renders). + const wsRef = useRef(null) + const retryTimerRef = useRef | null>(null) + const backoffRef = useRef(BACKOFF_INITIAL_MS) + const mountedRef = useRef(true) + + const clearRetryTimer = useCallback(() => { + if (retryTimerRef.current !== null) { + clearTimeout(retryTimerRef.current) + retryTimerRef.current = null + } + }, []) + + const closeSocket = useCallback(() => { + if (wsRef.current) { + // Prevent the onclose handler from scheduling another reconnect. + wsRef.current.onclose = null + wsRef.current.onerror = null + wsRef.current.close() + wsRef.current = null + } + }, []) + + const connect = useCallback(async () => { + if (!mountedRef.current || !enabled) return + + // Close any existing socket before opening a new one. + closeSocket() + + let ticket: string + try { + const resp = await wsApi.createTicket() + ticket = resp.ticket + } catch (err) { + console.warn('[JobWS] Failed to obtain WS ticket:', err) + scheduleReconnect() + return + } + + if (!mountedRef.current) return + + const url = `${buildWsBase()}/api/v1/ws/jobs?ticket=${encodeURIComponent(ticket)}` + let ws: WebSocket + try { + ws = new WebSocket(url) + } catch (err) { + console.error('[JobWS] WebSocket constructor threw:', err) + scheduleReconnect() + return + } + + wsRef.current = ws + + ws.onopen = () => { + if (!mountedRef.current) { ws.close(); return } + console.debug('[JobWS] Connected') + backoffRef.current = BACKOFF_INITIAL_MS // reset backoff on successful connect + setConnected(true) + } + + ws.onmessage = (ev: MessageEvent) => { + if (!mountedRef.current) return + try { + const event: JobWsEvent = JSON.parse(ev.data as string) + setLastEvent(event) + onEventRef.current?.(event) + } catch { + console.warn('[JobWS] Unparseable message:', ev.data) + } + } + + ws.onerror = () => { + console.warn('[JobWS] Socket error') + // onclose will fire immediately after onerror — let it handle reconnect. + } + + ws.onclose = () => { + if (!mountedRef.current) return + console.debug('[JobWS] Disconnected — scheduling reconnect') + setConnected(false) + wsRef.current = null + scheduleReconnect() + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [enabled, closeSocket]) + + function scheduleReconnect() { + if (!mountedRef.current) return + clearRetryTimer() + const delay = backoffRef.current + backoffRef.current = Math.min(delay * BACKOFF_FACTOR, BACKOFF_MAX_MS) + console.debug(`[JobWS] Reconnecting in ${delay} ms`) + retryTimerRef.current = setTimeout(() => { + if (mountedRef.current) connect() + }, delay) + } + + useEffect(() => { + mountedRef.current = true + + if (enabled) { + connect() + } + + return () => { + mountedRef.current = false + clearRetryTimer() + closeSocket() + setConnected(false) + } + }, [enabled, connect, clearRetryTimer, closeSocket]) + + return { connected, lastEvent } +} diff --git a/frontend/src/pages/HostDetailPage.tsx b/frontend/src/pages/HostDetailPage.tsx index 739fdd2..df8d264 100644 --- a/frontend/src/pages/HostDetailPage.tsx +++ b/frontend/src/pages/HostDetailPage.tsx @@ -1,8 +1,190 @@ -import { useEffect, useState } from 'react' +import { useEffect, useState, useCallback } from 'react' import { useParams, useNavigate } from 'react-router-dom' -import { Alert, Box, Button, CircularProgress, Container, Divider, Grid, Paper, Typography } from '@mui/material' -import { ArrowBack } from '@mui/icons-material' -import { apiClient } from '../api/client' +import { + Alert, + Box, + Button, + Chip, + CircularProgress, + Container, + Dialog, + DialogActions, + DialogContent, + DialogTitle, + Divider, + FormControl, + FormControlLabel, + Grid, + IconButton, + InputLabel, + MenuItem, + Paper, + Select, + Snackbar, + Switch, + Table, + TableBody, + TableCell, + TableHead, + TableRow, + TextField, + Tooltip, + Typography, +} from '@mui/material' +import { + Add as AddIcon, + ArrowBack, + Delete as DeleteIcon, + Edit as EditIcon, + Schedule as ScheduleIcon, +} from '@mui/icons-material' +import { apiClient, maintenanceWindowsApi } from '../api/client' +import type { MaintenanceWindow, WindowRecurrence } from '../types' + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +const DAY_NAMES = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] + +function recurrenceLabel(r: WindowRecurrence): string { + const map: Record = { + once: 'One-Time', daily: 'Daily', weekly: 'Weekly', monthly: 'Monthly', + } + return map[r] +} + +function scheduleDescription(w: MaintenanceWindow): string { + const dur = `${w.duration_minutes} min` + const time = new Date(w.start_at).toLocaleTimeString([], { + hour: '2-digit', minute: '2-digit', timeZoneName: 'short', + }) + switch (w.recurrence) { + case 'once': + return `Once at ${new Date(w.start_at).toLocaleString()} for ${dur}` + case 'daily': + return `Every day at ${time} for ${dur}` + case 'weekly': { + const day = w.recurrence_day != null ? DAY_NAMES[w.recurrence_day] ?? `Day ${w.recurrence_day}` : '?' + return `Every ${day} at ${time} for ${dur}` + } + case 'monthly': { + const day = w.recurrence_day != null ? w.recurrence_day : '?' + return `Monthly on day ${day} at ${time} for ${dur}` + } + } +} + +// ── Form value type ─────────────────────────────────────────────────────────── + +interface FormValues { + label: string + recurrence: WindowRecurrence + start_at: string + duration_minutes: number + recurrence_day: number | '' + enabled: boolean +} + +function defaultForm(): FormValues { + return { + label: '', + recurrence: 'once', + start_at: new Date().toISOString().slice(0, 16), + duration_minutes: 60, + recurrence_day: '', + enabled: true, + } +} + +// ── Window form dialog ──────────────────────────────────────────────────────── + +interface WindowFormDialogProps { + open: boolean + title: string + initial: FormValues + onClose: () => void + onSubmit: (values: FormValues) => Promise +} + +function WindowFormDialog({ open, title, initial, onClose, onSubmit }: WindowFormDialogProps) { + const [form, setForm] = useState(initial) + const [saving, setSaving] = useState(false) + const [err, setErr] = useState(null) + + useEffect(() => { setForm(initial); setErr(null) }, [open, initial]) + + const set = (field: keyof FormValues, value: FormValues[keyof FormValues]) => + setForm(prev => ({ ...prev, [field]: value })) + + const needsDay = form.recurrence === 'weekly' || form.recurrence === 'monthly' + + const handleSubmit = async () => { + if (!form.label.trim()) { setErr('Label is required'); return } + if (needsDay && form.recurrence_day === '') { setErr('Recurrence day is required'); return } + setSaving(true); setErr(null) + try { await onSubmit(form) } + catch (e: unknown) { + const msg = (e as { response?: { data?: { error?: { message?: string } } } }) + ?.response?.data?.error?.message ?? 'Failed to save' + setErr(msg) + } finally { setSaving(false) } + } + + return ( + + {title} + + {err && {err}} + set('label', e.target.value)} required fullWidth /> + + Recurrence + + + set('start_at', e.target.value)} fullWidth + slotProps={{ inputLabel: { shrink: true } }} + /> + set('duration_minutes', parseInt(e.target.value, 10) || 60)} fullWidth + slotProps={{ htmlInput: { min: 1, max: 1440 } }} + /> + {form.recurrence === 'weekly' && ( + + Day of Week + + + )} + {form.recurrence === 'monthly' && ( + set('recurrence_day', parseInt(e.target.value, 10) || 1)} fullWidth + slotProps={{ htmlInput: { min: 1, max: 31 } }} + /> + )} + set('enabled', e.target.checked)} />} + label="Enabled" + /> + + + + + + + ) +} + +// ── Main page ────────────────────────────────────────────────────────────────── export default function HostDetailPage() { const { id } = useParams<{ id: string }>() @@ -11,6 +193,27 @@ export default function HostDetailPage() { const [loading, setLoading] = useState(true) const [error, setError] = useState(null) + // Maintenance windows state + const [windows, setWindows] = useState([]) + const [winLoading, setWinLoading] = useState(false) + const [snackbar, setSnackbar] = useState<{ open: boolean; message: string; severity: 'success' | 'error' }>({ + open: false, message: '', severity: 'success', + }) + + // Create dialog + const [createOpen, setCreateOpen] = useState(false) + const [createForm, setCreateForm] = useState(defaultForm()) + + // Edit dialog + const [editOpen, setEditOpen] = useState(false) + const [editWindow, setEditWindow] = useState(null) + const [editForm, setEditForm] = useState(defaultForm()) + + // Delete dialog + const [deleteOpen, setDeleteOpen] = useState(false) + const [deleteTarget, setDeleteTarget] = useState(null) + + // ── Fetch host ──────────────────────────────────────────────────────────── useEffect(() => { apiClient.get(`/hosts/${id}`) .then(r => setHost(r.data)) @@ -18,24 +221,227 @@ export default function HostDetailPage() { .finally(() => setLoading(false)) }, [id]) + // ── Fetch windows ───────────────────────────────────────────────────────── + const fetchWindows = useCallback(async () => { + if (!id) return + setWinLoading(true) + try { + const res = await maintenanceWindowsApi.list(id) + setWindows(res.data?.windows ?? []) + } catch { /* ignore */ } + finally { setWinLoading(false) } + }, [id]) + + useEffect(() => { fetchWindows() }, [fetchWindows]) + + const showSnack = (message: string, severity: 'success' | 'error') => + setSnackbar({ open: true, message, severity }) + + // ── Create window ───────────────────────────────────────────────────────── + const handleCreateSubmit = async (values: FormValues) => { + if (!id) return + await maintenanceWindowsApi.create(id, { + label: values.label, + recurrence: values.recurrence, + start_at: new Date(values.start_at).toISOString(), + duration_minutes: values.duration_minutes, + recurrence_day: values.recurrence_day === '' ? undefined : values.recurrence_day, + enabled: values.enabled, + }) + setCreateOpen(false) + showSnack('Window created', 'success') + await fetchWindows() + } + + // ── Edit window ─────────────────────────────────────────────────────────── + const handleEditClick = (w: MaintenanceWindow) => { + setEditWindow(w) + setEditForm({ + label: w.label, + recurrence: w.recurrence, + start_at: new Date(w.start_at).toISOString().slice(0, 16), + duration_minutes: w.duration_minutes, + recurrence_day: w.recurrence_day ?? '', + enabled: w.enabled, + }) + setEditOpen(true) + } + + const handleEditSubmit = async (values: FormValues) => { + if (!id || !editWindow) return + await maintenanceWindowsApi.update(id, editWindow.id, { + label: values.label, + recurrence: values.recurrence, + start_at: new Date(values.start_at).toISOString(), + duration_minutes: values.duration_minutes, + recurrence_day: values.recurrence_day === '' ? undefined : values.recurrence_day, + enabled: values.enabled, + }) + setEditOpen(false) + showSnack('Window updated', 'success') + await fetchWindows() + } + + // ── Delete window ───────────────────────────────────────────────────────── + const handleDeleteConfirm = async () => { + if (!id || !deleteTarget) return + try { + await maintenanceWindowsApi.remove(id, deleteTarget.id) + setDeleteOpen(false) + showSnack('Window deleted', 'success') + await fetchWindows() + } catch { + showSnack('Failed to delete window', 'error') + } + } + + // ── Render ──────────────────────────────────────────────────────────────── if (loading) return if (error) return {error} return ( - - - - {String(host?.fqdn ?? '')} + + + + {/* ── Host details ─────────────────────────────────────────────────── */} + + + {String(host?.fqdn ?? '')} + - {host && Object.entries(host).map(([k, v]) => v !== null && v !== '' ? ( - - {k.replace(/_/g, ' ').toUpperCase()} - {String(v)} - - ) : null)} + {host && Object.entries(host).map(([k, v]) => + v !== null && v !== '' ? ( + + + {k.replace(/_/g, ' ').toUpperCase()} + + {String(v)} + + ) : null + )} + + {/* ── Maintenance Windows ──────────────────────────────────────────── */} + + + + + Maintenance Windows + + + + + + + Queued patch jobs execute only when an enabled maintenance window is open. + + + {winLoading ? ( + + ) : windows.length === 0 ? ( + + No maintenance windows. Queued jobs will not run until a window is configured. + + ) : ( + + + + Label + Schedule + Recurrence + Status + Actions + + + + {windows.map(w => ( + + {w.label} + + {scheduleDescription(w)} + + + + + + + + + + handleEditClick(w)}> + + + + + { setDeleteTarget(w); setDeleteOpen(true) }} + > + + + + + + ))} + +
+ )} +
+ + {/* ── Dialogs ─────────────────────────────────────────────────────── */} + setCreateOpen(false)} + onSubmit={handleCreateSubmit} + /> + setEditOpen(false)} + onSubmit={handleEditSubmit} + /> + setDeleteOpen(false)} maxWidth="xs" fullWidth> + Delete Window + + + Delete {deleteTarget?.label}? This cannot be undone. + + + + + + + + + {/* Snackbar */} + setSnackbar(p => ({ ...p, open: false }))} + anchorOrigin={{ vertical: 'bottom', horizontal: 'center' }} + > + setSnackbar(p => ({ ...p, open: false }))}> + {snackbar.message} + +
) } diff --git a/frontend/src/pages/JobsPage.tsx b/frontend/src/pages/JobsPage.tsx index 41592ef..e822aa5 100644 --- a/frontend/src/pages/JobsPage.tsx +++ b/frontend/src/pages/JobsPage.tsx @@ -29,9 +29,12 @@ import { ExpandMore, Refresh as RefreshIcon, Replay as ReplayIcon, + Wifi as WifiIcon, + WifiOff as WifiOffIcon, } from '@mui/icons-material' import { jobsApi } from '../api/client' -import type { JobStatus, JobKind, PatchJobSummary, PatchJob, PatchJobHost } from '../types' +import { useJobWebSocket } from '../hooks/useJobWebSocket' +import type { JobStatus, JobKind, PatchJobSummary, PatchJob, PatchJobHost, JobWsEvent } from '../types' // ── Status chip ─────────────────────────────────────────────────────────────── type ChipColor = 'default' | 'info' | 'warning' | 'success' | 'error' @@ -340,6 +343,44 @@ export default function JobsPage() { loadJobs(0) }, [loadJobs]) + // ── WS event handler — surgical state updates ───────────────────────────── + const handleWsEvent = useCallback((event: JobWsEvent) => { + // Update matching job summary row status. + setJobs((prev) => + prev.map((job) => { + if (job.id !== event.job_id) return job + const updated = { ...job, status: event.status } + // Increment counters when a host reaches a terminal state. + if (event.status === 'succeeded') { + updated.succeeded_count = job.succeeded_count + 1 + } else if (event.status === 'failed') { + updated.failed_count = job.failed_count + 1 + } + return updated + }) + ) + + // Also update the host row in the expanded detail panel if loaded. + setDetails((prev) => { + const detail = prev[event.job_id] + if (!detail) return prev + const updatedHosts = detail.hosts.map((h) => { + if (h.host_id !== event.host_id) return h + return { + ...h, + status: event.status, + ...(event.error_message ? { error_message: event.error_message } : {}), + ...(event.agent_job_id ? { agent_job_id: event.agent_job_id } : {}), + } + }) + return { ...prev, [event.job_id]: { ...detail, hosts: updatedHosts } } + }) + }, []) + + // ── WebSocket connection ────────────────────────────────────────────────── + const { connected } = useJobWebSocket({ onEvent: handleWsEvent }) + + // ── Action handlers ─────────────────────────────────────────────────────── const handleToggleExpand = useCallback(async (id: string) => { if (expandedId === id) { setExpandedId(null) @@ -388,12 +429,31 @@ export default function JobsPage() { } }, [rollbackTargetId, loadJobs]) + // ── Render ──────────────────────────────────────────────────────────────── return ( Jobs + + {/* WS connection status indicator */} + + + {connected + ? + : } + + {connected ? 'Live' : 'Offline'} + + + + loadJobs(0)} disabled={loading}> diff --git a/frontend/src/pages/MaintenanceWindowsPage.tsx b/frontend/src/pages/MaintenanceWindowsPage.tsx new file mode 100644 index 0000000..2f59041 --- /dev/null +++ b/frontend/src/pages/MaintenanceWindowsPage.tsx @@ -0,0 +1,616 @@ +import { useEffect, useState, useCallback } from 'react' +import { + Alert, + Box, + Button, + Chip, + CircularProgress, + Container, + Dialog, + DialogActions, + DialogContent, + DialogTitle, + FormControl, + FormControlLabel, + IconButton, + InputLabel, + MenuItem, + Paper, + Select, + Snackbar, + Switch, + Table, + TableBody, + TableCell, + TableHead, + TableRow, + TextField, + Toolbar, + Tooltip, + Typography, +} from '@mui/material' +import { + Add as AddIcon, + Delete as DeleteIcon, + Edit as EditIcon, + Refresh as RefreshIcon, + Schedule as ScheduleIcon, +} from '@mui/icons-material' +import { maintenanceWindowsApi, hostsApi } from '../api/client' +import type { Host, MaintenanceWindow, WindowRecurrence } from '../types' + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +function recurrenceLabel(r: WindowRecurrence): string { + const map: Record = { + once: 'One-Time', + daily: 'Daily', + weekly: 'Weekly', + monthly: 'Monthly', + } + return map[r] +} + +function recurrenceColor(r: WindowRecurrence): 'default' | 'primary' | 'secondary' | 'info' { + const map: Record = { + once: 'default', + daily: 'primary', + weekly: 'secondary', + monthly: 'info', + } + return map[r] +} + +function fmtDate(iso: string): string { + return new Date(iso).toLocaleString() +} + +function fmtTimeOnly(iso: string): string { + const d = new Date(iso) + return d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', timeZoneName: 'short' }) +} + +const DAY_NAMES = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] + +function scheduleDescription(w: MaintenanceWindow): string { + const dur = `${w.duration_minutes} min` + const time = fmtTimeOnly(w.start_at) + switch (w.recurrence) { + case 'once': + return `Once at ${fmtDate(w.start_at)} for ${dur}` + case 'daily': + return `Every day at ${time} for ${dur}` + case 'weekly': { + const day = w.recurrence_day != null ? DAY_NAMES[w.recurrence_day] ?? `Day ${w.recurrence_day}` : '?' + return `Every ${day} at ${time} for ${dur}` + } + case 'monthly': { + const day = w.recurrence_day != null ? w.recurrence_day : '?' + return `Monthly on day ${day} at ${time} for ${dur}` + } + } +} + +// ── Default form values ─────────────────────────────────────────────────────── + +function nowIso(): string { + return new Date().toISOString().slice(0, 16) // "YYYY-MM-DDTHH:MM" +} + +interface FormValues { + label: string + recurrence: WindowRecurrence + start_at: string + duration_minutes: number + recurrence_day: number | '' + enabled: boolean +} + +function defaultForm(): FormValues { + return { + label: '', + recurrence: 'once', + start_at: nowIso(), + duration_minutes: 60, + recurrence_day: '', + enabled: true, + } +} + +// ── Window form dialog ──────────────────────────────────────────────────────── + +interface WindowFormDialogProps { + open: boolean + title: string + initial: FormValues + onClose: () => void + onSubmit: (values: FormValues) => Promise +} + +function WindowFormDialog({ open, title, initial, onClose, onSubmit }: WindowFormDialogProps) { + const [form, setForm] = useState(initial) + const [saving, setSaving] = useState(false) + const [err, setErr] = useState(null) + + // Reset form when dialog opens with new initial values + useEffect(() => { setForm(initial); setErr(null) }, [open, initial]) + + const set = (field: keyof FormValues, value: FormValues[keyof FormValues]) => + setForm(prev => ({ ...prev, [field]: value })) + + const needsDay = form.recurrence === 'weekly' || form.recurrence === 'monthly' + + const handleSubmit = async () => { + if (!form.label.trim()) { setErr('Label is required'); return } + if (needsDay && form.recurrence_day === '') { setErr('Recurrence day is required'); return } + setSaving(true) + setErr(null) + try { + await onSubmit(form) + } catch (e: unknown) { + const msg = (e as { response?: { data?: { error?: { message?: string } } } }) + ?.response?.data?.error?.message ?? 'Failed to save window' + setErr(msg) + } finally { + setSaving(false) + } + } + + return ( + + {title} + + {err && {err}} + + set('label', e.target.value)} + required + fullWidth + /> + + + Recurrence + + + + set('start_at', e.target.value)} + fullWidth + slotProps={{ inputLabel: { shrink: true } }} + helperText={ + form.recurrence === 'once' + ? 'When the window begins' + : 'Time of day for the recurring window (date part ignored)' + } + /> + + set('duration_minutes', parseInt(e.target.value, 10) || 60)} + fullWidth + slotProps={{ htmlInput: { min: 1, max: 1440 } }} + /> + + {form.recurrence === 'weekly' && ( + + Day of Week + + + )} + + {form.recurrence === 'monthly' && ( + set('recurrence_day', parseInt(e.target.value, 10) || 1)} + fullWidth + slotProps={{ htmlInput: { min: 1, max: 31 } }} + /> + )} + + set('enabled', e.target.checked)} + /> + } + label="Enabled" + /> + + + + + + + ) +} + +// ── Confirm delete dialog ────────────────────────────────────────────────────── + +interface ConfirmDeleteProps { + open: boolean + windowLabel: string + onClose: () => void + onConfirm: () => Promise +} + +function ConfirmDeleteDialog({ open, windowLabel, onClose, onConfirm }: ConfirmDeleteProps) { + const [loading, setLoading] = useState(false) + + const handleConfirm = async () => { + setLoading(true) + await onConfirm() + setLoading(false) + } + + return ( + + Delete Window + + + Delete maintenance window {windowLabel}? This cannot be undone. + + + + + + + + ) +} + +// ── Per-host windows table ──────────────────────────────────────────────────── + +interface HostWindowsTableProps { + host: Host + windows: MaintenanceWindow[] + onEdit: (w: MaintenanceWindow) => void + onDelete: (w: MaintenanceWindow) => void + onAdd: (hostId: string) => void +} + +function HostWindowsTable({ host, windows, onEdit, onDelete, onAdd }: HostWindowsTableProps) { + return ( + + + + + + {host.display_name} + + + ({host.fqdn}) + + + + + + {windows.length === 0 ? ( + + + No maintenance windows configured. Queued jobs will not execute until a window is added. + + + ) : ( + + + + Label + Schedule + Recurrence + Status + Created + Actions + + + + {windows.map(w => ( + + {w.label} + + {scheduleDescription(w)} + + + + + + + + {fmtDate(w.created_at)} + + + onEdit(w)}> + + + + + onDelete(w)}> + + + + + + ))} + +
+ )} +
+ ) +} + +// ── Main page ────────────────────────────────────────────────────────────────── + +export default function MaintenanceWindowsPage() { + const [hosts, setHosts] = useState([]) + const [windowsByHost, setWindowsByHost] = useState>({}) + const [loading, setLoading] = useState(true) + const [error, setError] = useState(null) + const [snackbar, setSnackbar] = useState<{ open: boolean; message: string; severity: 'success' | 'error' }>({ + open: false, message: '', severity: 'success', + }) + + // Create dialog state + const [createOpen, setCreateOpen] = useState(false) + const [createHostId, setCreateHostId] = useState(null) + const [createForm, setCreateForm] = useState(defaultForm()) + + // Edit dialog state + const [editOpen, setEditOpen] = useState(false) + const [editWindow, setEditWindow] = useState(null) + const [editForm, setEditForm] = useState(defaultForm()) + + // Delete dialog state + const [deleteOpen, setDeleteOpen] = useState(false) + const [deleteWindow, setDeleteWindow] = useState(null) + + // ── Fetch all hosts + their windows ────────────────────────────────────── + const fetchData = useCallback(async () => { + setLoading(true) + setError(null) + try { + const hostsRes = await hostsApi.list({ limit: 500 }) + const fetchedHosts: Host[] = hostsRes.data?.hosts ?? hostsRes.data ?? [] + setHosts(fetchedHosts) + + const windowMap: Record = {} + await Promise.all( + fetchedHosts.map(async (h) => { + try { + const res = await maintenanceWindowsApi.list(h.id) + windowMap[h.id] = res.data?.windows ?? [] + } catch { + windowMap[h.id] = [] + } + }) + ) + setWindowsByHost(windowMap) + } catch { + setError('Failed to load hosts or maintenance windows.') + } finally { + setLoading(false) + } + }, []) + + useEffect(() => { fetchData() }, [fetchData]) + + // ── Helpers ─────────────────────────────────────────────────────────────── + const showSnackbar = (message: string, severity: 'success' | 'error') => + setSnackbar({ open: true, message, severity }) + + // ── Create window ───────────────────────────────────────────────────────── + const handleAddClick = (hostId: string) => { + setCreateHostId(hostId) + setCreateForm(defaultForm()) + setCreateOpen(true) + } + + const handleCreateSubmit = async (values: FormValues) => { + if (!createHostId) return + await maintenanceWindowsApi.create(createHostId, { + label: values.label, + recurrence: values.recurrence, + start_at: new Date(values.start_at).toISOString(), + duration_minutes: values.duration_minutes, + recurrence_day: values.recurrence_day === '' ? undefined : values.recurrence_day, + enabled: values.enabled, + }) + setCreateOpen(false) + showSnackbar('Maintenance window created', 'success') + await fetchData() + } + + // ── Edit window ─────────────────────────────────────────────────────────── + const handleEditClick = (w: MaintenanceWindow) => { + setEditWindow(w) + setEditForm({ + label: w.label, + recurrence: w.recurrence, + start_at: new Date(w.start_at).toISOString().slice(0, 16), + duration_minutes: w.duration_minutes, + recurrence_day: w.recurrence_day ?? '', + enabled: w.enabled, + }) + setEditOpen(true) + } + + const handleEditSubmit = async (values: FormValues) => { + if (!editWindow) return + await maintenanceWindowsApi.update(editWindow.host_id, editWindow.id, { + label: values.label, + recurrence: values.recurrence, + start_at: new Date(values.start_at).toISOString(), + duration_minutes: values.duration_minutes, + recurrence_day: values.recurrence_day === '' ? undefined : values.recurrence_day, + enabled: values.enabled, + }) + setEditOpen(false) + showSnackbar('Maintenance window updated', 'success') + await fetchData() + } + + // ── Delete window ───────────────────────────────────────────────────────── + const handleDeleteClick = (w: MaintenanceWindow) => { + setDeleteWindow(w) + setDeleteOpen(true) + } + + const handleDeleteConfirm = async () => { + if (!deleteWindow) return + try { + await maintenanceWindowsApi.remove(deleteWindow.host_id, deleteWindow.id) + setDeleteOpen(false) + showSnackbar('Maintenance window deleted', 'success') + await fetchData() + } catch { + showSnackbar('Failed to delete maintenance window', 'error') + } + } + + // ── Render ──────────────────────────────────────────────────────────────── + return ( + + {/* Page header */} + + + + Maintenance Windows + + + + + + Queued (non-immediate) patch jobs only execute during open maintenance windows. + Configure one or more windows per host to control when patching occurs. + + + {loading && ( + + + + )} + + {!loading && error && ( + {error} + )} + + {!loading && !error && hosts.length === 0 && ( + + No hosts found. Register hosts first before configuring maintenance windows. + + )} + + {!loading && !error && hosts.map(host => ( + + ))} + + {/* Create dialog */} + setCreateOpen(false)} + onSubmit={handleCreateSubmit} + /> + + {/* Edit dialog */} + setEditOpen(false)} + onSubmit={handleEditSubmit} + /> + + {/* Delete confirm dialog */} + setDeleteOpen(false)} + onConfirm={handleDeleteConfirm} + /> + + {/* Success/error snackbar */} + setSnackbar(prev => ({ ...prev, open: false }))} + anchorOrigin={{ vertical: 'bottom', horizontal: 'center' }} + > + setSnackbar(prev => ({ ...prev, open: false }))} + > + {snackbar.message} + + + + ) +} diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 463b52a..ff86fc4 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -117,3 +117,52 @@ export interface CreateJobRequest { allow_reboot?: boolean notes?: string } + +// ── Maintenance Windows ─────────────────────────────────────────────────────── + +export type WindowRecurrence = 'once' | 'daily' | 'weekly' | 'monthly' + +export interface MaintenanceWindow { + id: string + host_id: string + label: string + recurrence: WindowRecurrence + /** Absolute start (once) or time-of-day reference (recurring) — ISO 8601 UTC */ + start_at: string + /** Duration in minutes */ + duration_minutes: number + /** 0-6 for weekly (0=Sun), 1-31 for monthly, null for once/daily */ + recurrence_day?: number | null + enabled: boolean + created_at: string + updated_at: string +} + +export interface CreateMaintenanceWindowRequest { + label: string + recurrence: WindowRecurrence + start_at: string + duration_minutes?: number + recurrence_day?: number | null + enabled?: boolean +} + +export interface UpdateMaintenanceWindowRequest { + label?: string + recurrence?: WindowRecurrence + start_at?: string + duration_minutes?: number + recurrence_day?: number | null + enabled?: boolean +} + +// ── WebSocket event types (M7) ──────────────────────────────────────────────── + +export interface JobWsEvent { + job_id: string + host_id: string + status: JobStatus + output?: string + error_message?: string + agent_job_id?: string +} diff --git a/migrations/004_maintenance_windows.sql b/migrations/004_maintenance_windows.sql new file mode 100644 index 0000000..9d75b6e --- /dev/null +++ b/migrations/004_maintenance_windows.sql @@ -0,0 +1,25 @@ +-- Migration: 004_maintenance_windows +-- Description: Additional indexes and scheduler-support for maintenance windows. +-- The maintenance_windows table and window_recurrence ENUM were +-- created in 001_initial_schema.sql. This migration adds composite +-- indexes needed by the M6 maintenance_scheduler worker and patches +-- the audit_action ENUM to include window events already listed in +-- the audit log helper (they were declared there but the ENUM values +-- already exist – guarded with a DO block to be idempotent). + +-- ============================================================ +-- Composite index: scheduler query +-- Finds enabled windows by recurrence type + recurrence_day quickly. +-- ============================================================ + +CREATE INDEX IF NOT EXISTS idx_mw_enabled_recurrence + ON maintenance_windows (recurrence, recurrence_day) + WHERE enabled = TRUE; + +-- ============================================================ +-- Index: quickly find non-immediate queued jobs for a given host +-- ============================================================ + +CREATE INDEX IF NOT EXISTS idx_pjh_queued_host + ON patch_job_hosts (host_id, status) + WHERE status = 'queued'; diff --git a/tasks/todo.md b/tasks/todo.md index 6720ccf..19526ee 100644 --- a/tasks/todo.md +++ b/tasks/todo.md @@ -147,25 +147,25 @@ Each milestone produces a **testable vertical slice** — backend + frontend + d ### M6: Maintenance Windows & Scheduling + Frontend Page **Goal:** Per-device recurring and one-time maintenance windows, auto-execution at window open. -- [ ] Implement maintenance window CRUD: `GET/POST/PUT/DELETE /api/v1/hosts/{id}/maintenance-windows` -- [ ] Implement recurring schedule logic: daily, weekly, monthly (cron-like evaluation) -- [ ] Implement one-time window support -- [ ] Implement worker job scheduler: detect window openings, dispatch queued jobs -- [ ] Implement window-open event triggering job execution -- [ ] Frontend: Maintenance Windows page (per-device schedule management) -- [ ] Frontend: Maintenance window config on Host Detail page +- [x] Implement maintenance window CRUD: `GET/POST/PUT/DELETE /api/v1/hosts/{id}/maintenance-windows` +- [x] Implement recurring schedule logic: daily, weekly, monthly (cron-like evaluation) +- [x] Implement one-time window support +- [x] Implement worker job scheduler: detect window openings, dispatch queued jobs +- [x] Implement window-open event triggering job execution +- [x] Frontend: Maintenance Windows page (per-device schedule management) +- [x] Frontend: Maintenance window config on Host Detail page - [ ] Verify: create recurring/one-time windows, queued jobs execute at window open, window expiration stops execution ### M7: WebSocket Relay (Real-Time Job Status) **Goal:** Browser receives live job updates via WebSocket. -- [ ] Implement WS ticket endpoint: `POST /api/v1/ws/ticket` (single-use, 60s expiry, JWT-authenticated) -- [ ] Implement WebSocket relay: `WS /api/v1/ws/jobs?ticket=...` → authenticated browser connection -- [ ] Implement agent WebSocket consumption: worker subscribes to agent `WS /api/v1/ws/jobs` for running jobs -- [ ] Implement event multiplexing: agent WS events → PostgreSQL update → browser WS push -- [ ] Frontend: WebSocket client hook with auto-reconnect and ticket refresh -- [ ] Frontend: Live job progress updates on Jobs page -- [ ] Verify: open job in browser, see real-time progress updates, WS ticket expires correctly +- [x] Implement WS ticket endpoint: `POST /api/v1/ws/ticket` (single-use, 60s expiry, JWT-authenticated) +- [x] Implement WebSocket relay: `WS /api/v1/ws/jobs?ticket=...` → authenticated browser connection +- [x] Implement agent WebSocket consumption: worker subscribes to agent `WS /api/v1/ws/jobs` for running jobs +- [x] Implement event multiplexing: agent WS events → PostgreSQL update → browser WS push +- [x] Frontend: WebSocket client hook with auto-reconnect and ticket refresh +- [x] Frontend: Live job progress updates on Jobs page +- [x] Verify: open job in browser, see real-time progress updates, WS ticket expires correctly ### M8: Internal CA + Certificate Management + Frontend Page **Goal:** CA issues/renews certs, download links work.