diff --git a/crates/pm-web/src/routes/jobs.rs b/crates/pm-web/src/routes/jobs.rs index 9a5ae2a..674a1a1 100644 --- a/crates/pm-web/src/routes/jobs.rs +++ b/crates/pm-web/src/routes/jobs.rs @@ -488,6 +488,28 @@ async fn cancel_job( ) })?; + // Fire job-level pg_notify so the frontend can update the job row. + let notify_payload = json!({ + "event_type": "job", + "job_id": id.to_string(), + "host_id": "", + "status": "cancelled", + "succeeded_count": 0, + "failed_count": 0, + "host_count": 0, + }); + if let Ok(payload_str) = serde_json::to_string(¬ify_payload) { + if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)") + .bind(&payload_str) + .execute(&state.db) + .await + { + tracing::error!(error = %e, %id, "cancel_job: job-level pg_notify failed"); + } else { + tracing::info!(%id, "cancel_job: job-level pg_notify sent"); + } + } + log_event( &state.db, AuditAction::PatchJobCancelled, diff --git a/crates/pm-worker/src/job_executor.rs b/crates/pm-worker/src/job_executor.rs index bbf2ca2..064ec6a 100644 --- a/crates/pm-worker/src/job_executor.rs +++ b/crates/pm-worker/src/job_executor.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use chrono::{Duration as ChronoDuration, Utc}; use pm_agent_client::{types::ApplyPatchesRequest, AgentClient}; use pm_core::config::AppConfig; +use serde_json::json; use sqlx::{FromRow, PgPool}; use tokio::{sync::Semaphore, time}; use uuid::Uuid; @@ -840,6 +841,28 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) { tracing::error!(%job_id, error = %e, "sync_job_status: failed to update parent job"); } + // Fire job-level pg_notify so the frontend can update the job row. + let notify_payload = json!({ + "event_type": "job", + "job_id": job_id.to_string(), + "host_id": "", + "status": new_status, + "succeeded_count": counts.succeeded_count, + "failed_count": counts.failed_count, + "host_count": counts.total_count, + }); + if let Ok(payload_str) = serde_json::to_string(¬ify_payload) { + if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)") + .bind(&payload_str) + .execute(pool) + .await + { + tracing::error!(%job_id, error = %e, "sync_job_status: job-level pg_notify failed"); + } else { + tracing::info!(%job_id, status = %new_status, "sync_job_status: job-level pg_notify sent"); + } + } + // Send email notifications for completed/failed jobs if set_completed { // Spawn email notification in background — non-blocking diff --git a/crates/pm-worker/src/ws_relay.rs b/crates/pm-worker/src/ws_relay.rs index 9ffb738..af95dda 100644 --- a/crates/pm-worker/src/ws_relay.rs +++ b/crates/pm-worker/src/ws_relay.rs @@ -47,12 +47,17 @@ struct AgentWsEvent { /// Payload broadcast via `pg_notify('job_update', …)`. #[derive(Debug, Serialize)] struct NotifyPayload { + event_type: String, // "host" or "job" job_id: String, host_id: String, status: String, output: Option, error_message: Option, agent_job_id: String, + // Job-level fields (only present when event_type === "job") + succeeded_count: Option, + failed_count: Option, + host_count: Option, } // ── Entry point ─────────────────────────────────────────────────────────────── @@ -351,14 +356,18 @@ async fn process_event(pool: &PgPool, row: &RunningHostJob, event: &AgentWsEvent update_parent_job_status(pool, row.job_id).await; } - // Fire pg_notify so browser WS handlers forward the event. + // Fire pg_notify so browser WS handlers forward the host-level event. let payload = NotifyPayload { + event_type: "host".to_string(), job_id: row.job_id.to_string(), host_id: row.host_id.to_string(), status: db_status.to_string(), output: event.output.clone(), error_message: event.error.clone(), agent_job_id: row.agent_job_id.clone(), + succeeded_count: None, + failed_count: None, + host_count: None, }; let payload_json = match serde_json::to_string(&payload) { @@ -394,6 +403,9 @@ async fn process_event(pool: &PgPool, row: &RunningHostJob, event: &AgentWsEvent /// After a host-level job reaches a terminal state, check whether ALL hosts for /// that job are now terminal and update the parent `patch_jobs` row accordingly. +/// +/// If the parent job transitions to a terminal status, also fires a `job_update` +/// pg_notify with `event_type: "job"` so the frontend can update the job row. async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) { // Count hosts that are still in a non-terminal state. let pending: i64 = match sqlx::query_scalar( @@ -423,22 +435,36 @@ async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) { return; // still hosts running — parent stays running } - // All hosts terminal — determine final parent status. - let failed_count: i64 = match sqlx::query_scalar( - "SELECT COUNT(*) FROM patch_job_hosts WHERE job_id = $1 AND status = 'failed'::job_status", + // All hosts terminal — determine final parent status and counts. + #[derive(sqlx::FromRow)] + struct RollupCounts { + total: i64, + succeeded: i64, + failed: i64, + } + + let counts: RollupCounts = match sqlx::query_as( + r#" + SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE status = 'succeeded') AS succeeded, + COUNT(*) FILTER (WHERE status = 'failed') AS failed + FROM patch_job_hosts + WHERE job_id = $1 + "#, ) .bind(job_id) .fetch_one(pool) .await { - Ok(n) => n, + Ok(c) => c, Err(e) => { - tracing::error!(error = %e, %job_id, "update_parent_job_status: failed-count query failed"); + tracing::error!(error = %e, %job_id, "update_parent_job_status: rollup query failed"); return; }, }; - let final_status = if failed_count > 0 { + let final_status = if counts.failed > 0 { "failed" } else { "succeeded" @@ -458,11 +484,52 @@ async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) { status = %final_status, "update_parent_job_status: UPDATE failed" ); + return; + } + + tracing::info!( + %job_id, + status = %final_status, + "Parent job status updated" + ); + + // Fire job-level pg_notify so the frontend can update the job row. + let payload = NotifyPayload { + event_type: "job".to_string(), + job_id: job_id.to_string(), + host_id: String::new(), // no specific host for job-level events + status: final_status.to_string(), + output: None, + error_message: None, + agent_job_id: String::new(), + succeeded_count: Some(counts.succeeded), + failed_count: Some(counts.failed), + host_count: Some(counts.total), + }; + + let payload_json = match serde_json::to_string(&payload) { + Ok(s) => s, + Err(e) => { + tracing::error!(error = %e, %job_id, "update_parent_job_status: failed to serialize job-level notify payload"); + return; + }, + }; + + if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)") + .bind(&payload_json) + .execute(pool) + .await + { + tracing::error!( + error = %e, + %job_id, + "update_parent_job_status: job-level pg_notify failed" + ); } else { tracing::info!( %job_id, status = %final_status, - "Parent job status updated" + "Job-level pg_notify sent" ); } } diff --git a/frontend/src/pages/JobsPage.tsx b/frontend/src/pages/JobsPage.tsx index e822aa5..775bc4c 100644 --- a/frontend/src/pages/JobsPage.tsx +++ b/frontend/src/pages/JobsPage.tsx @@ -345,36 +345,59 @@ export default function JobsPage() { // ── WS event handler — surgical state updates ───────────────────────────── const handleWsEvent = useCallback((event: JobWsEvent) => { - // Update matching job summary row status. - setJobs((prev) => - prev.map((job) => { - if (job.id !== event.job_id) return job - const updated = { ...job, status: event.status } - // Increment counters when a host reaches a terminal state. - if (event.status === 'succeeded') { - updated.succeeded_count = job.succeeded_count + 1 - } else if (event.status === 'failed') { - updated.failed_count = job.failed_count + 1 - } - return updated - }) - ) + if (event.event_type === 'job') { + // ── Job-level event: authoritative status + counts from backend ── + setJobs((prev) => + prev.map((job) => { + if (job.id !== event.job_id) return job + return { + ...job, + status: event.status, + succeeded_count: event.succeeded_count ?? job.succeeded_count, + failed_count: event.failed_count ?? job.failed_count, + host_count: event.host_count ?? job.host_count, + } + }) + ) + } else { + // ── Host-level event: update detail row + optimistic counters only ── + setJobs((prev) => + prev.map((job) => { + if (job.id !== event.job_id) return job + const updated = { ...job } + // Optimistically increment counters when a host reaches a terminal state. + // The authoritative rollup will arrive as a job-level event later. + if (event.status === 'succeeded') { + updated.succeeded_count = job.succeeded_count + 1 + } else if (event.status === 'failed') { + updated.failed_count = job.failed_count + 1 + } + // If any host is still running, ensure the job shows 'running'. + // Do NOT promote host status to job status — only the job-level + // event can set the parent job to a terminal state. + if (event.status === 'running' && job.status === 'queued') { + updated.status = 'running' + } + return updated + }) + ) - // Also update the host row in the expanded detail panel if loaded. - setDetails((prev) => { - const detail = prev[event.job_id] - if (!detail) return prev - const updatedHosts = detail.hosts.map((h) => { - if (h.host_id !== event.host_id) return h - return { - ...h, - status: event.status, - ...(event.error_message ? { error_message: event.error_message } : {}), - ...(event.agent_job_id ? { agent_job_id: event.agent_job_id } : {}), - } + // Update the host row in the expanded detail panel if loaded. + setDetails((prev) => { + const detail = prev[event.job_id] + if (!detail) return prev + const updatedHosts = detail.hosts.map((h) => { + if (h.host_id !== event.host_id) return h + return { + ...h, + status: event.status, + ...(event.error_message ? { error_message: event.error_message } : {}), + ...(event.agent_job_id ? { agent_job_id: event.agent_job_id } : {}), + } + }) + return { ...prev, [event.job_id]: { ...detail, hosts: updatedHosts } } }) - return { ...prev, [event.job_id]: { ...detail, hosts: updatedHosts } } - }) + } }, []) // ── WebSocket connection ────────────────────────────────────────────────── diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index bd15aff..055689a 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -159,12 +159,17 @@ export interface UpdateMaintenanceWindowRequest { // ── WebSocket event types (M7) ──────────────────────────────────────────────── export interface JobWsEvent { + event_type?: 'host' | 'job' // defaults to 'host' for backward compat job_id: string host_id: string status: JobStatus output?: string error_message?: string agent_job_id?: string + // Job-level fields (only present when event_type === 'job') + succeeded_count?: number + failed_count?: number + host_count?: number } // ── Certificates (M8) ────────────────────────────────────────────────────────