fix: add job-level WS events so jobs show completed status
Some checks failed
CI Pipeline / Rust Format Check (push) Failing after 24s
CI Pipeline / Clippy Lints (push) Successful in 1m4s
CI Pipeline / Rust Unit Tests (push) Successful in 1m21s
CI Pipeline / Security Audit (push) Successful in 5s
CI Pipeline / Frontend Lint & Type Check (push) Successful in 16s
CI Pipeline / Build .deb & Release (push) Has been skipped
Some checks failed
CI Pipeline / Rust Format Check (push) Failing after 24s
CI Pipeline / Clippy Lints (push) Successful in 1m4s
CI Pipeline / Rust Unit Tests (push) Successful in 1m21s
CI Pipeline / Security Audit (push) Successful in 5s
CI Pipeline / Frontend Lint & Type Check (push) Successful in 16s
CI Pipeline / Build .deb & Release (push) Has been skipped
- Frontend: handleWsEvent now distinguishes host vs job events - Host events only update detail rows + optimistic counters - Job events (event_type=job) set authoritative status + counts - Backend ws_relay: NotifyPayload now includes event_type field - Host events: event_type=host - update_parent_job_status fires pg_notify with event_type=job - Backend job_executor: sync_job_status fires pg_notify with event_type=job - Backend jobs cancel endpoint fires pg_notify with event_type=job - Fixes jobs appearing stuck because host status was mapped to job status
This commit is contained in:
@ -488,6 +488,28 @@ async fn cancel_job(
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
// Fire job-level pg_notify so the frontend can update the job row.
|
||||||
|
let notify_payload = json!({
|
||||||
|
"event_type": "job",
|
||||||
|
"job_id": id.to_string(),
|
||||||
|
"host_id": "",
|
||||||
|
"status": "cancelled",
|
||||||
|
"succeeded_count": 0,
|
||||||
|
"failed_count": 0,
|
||||||
|
"host_count": 0,
|
||||||
|
});
|
||||||
|
if let Ok(payload_str) = serde_json::to_string(¬ify_payload) {
|
||||||
|
if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)")
|
||||||
|
.bind(&payload_str)
|
||||||
|
.execute(&state.db)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(error = %e, %id, "cancel_job: job-level pg_notify failed");
|
||||||
|
} else {
|
||||||
|
tracing::info!(%id, "cancel_job: job-level pg_notify sent");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log_event(
|
log_event(
|
||||||
&state.db,
|
&state.db,
|
||||||
AuditAction::PatchJobCancelled,
|
AuditAction::PatchJobCancelled,
|
||||||
|
|||||||
@ -17,6 +17,7 @@ use std::sync::Arc;
|
|||||||
use chrono::{Duration as ChronoDuration, Utc};
|
use chrono::{Duration as ChronoDuration, Utc};
|
||||||
use pm_agent_client::{types::ApplyPatchesRequest, AgentClient};
|
use pm_agent_client::{types::ApplyPatchesRequest, AgentClient};
|
||||||
use pm_core::config::AppConfig;
|
use pm_core::config::AppConfig;
|
||||||
|
use serde_json::json;
|
||||||
use sqlx::{FromRow, PgPool};
|
use sqlx::{FromRow, PgPool};
|
||||||
use tokio::{sync::Semaphore, time};
|
use tokio::{sync::Semaphore, time};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@ -840,6 +841,28 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) {
|
|||||||
tracing::error!(%job_id, error = %e, "sync_job_status: failed to update parent job");
|
tracing::error!(%job_id, error = %e, "sync_job_status: failed to update parent job");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fire job-level pg_notify so the frontend can update the job row.
|
||||||
|
let notify_payload = json!({
|
||||||
|
"event_type": "job",
|
||||||
|
"job_id": job_id.to_string(),
|
||||||
|
"host_id": "",
|
||||||
|
"status": new_status,
|
||||||
|
"succeeded_count": counts.succeeded_count,
|
||||||
|
"failed_count": counts.failed_count,
|
||||||
|
"host_count": counts.total_count,
|
||||||
|
});
|
||||||
|
if let Ok(payload_str) = serde_json::to_string(¬ify_payload) {
|
||||||
|
if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)")
|
||||||
|
.bind(&payload_str)
|
||||||
|
.execute(pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(%job_id, error = %e, "sync_job_status: job-level pg_notify failed");
|
||||||
|
} else {
|
||||||
|
tracing::info!(%job_id, status = %new_status, "sync_job_status: job-level pg_notify sent");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Send email notifications for completed/failed jobs
|
// Send email notifications for completed/failed jobs
|
||||||
if set_completed {
|
if set_completed {
|
||||||
// Spawn email notification in background — non-blocking
|
// Spawn email notification in background — non-blocking
|
||||||
|
|||||||
@ -47,12 +47,17 @@ struct AgentWsEvent {
|
|||||||
/// Payload broadcast via `pg_notify('job_update', …)`.
|
/// Payload broadcast via `pg_notify('job_update', …)`.
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
struct NotifyPayload {
|
struct NotifyPayload {
|
||||||
|
event_type: String, // "host" or "job"
|
||||||
job_id: String,
|
job_id: String,
|
||||||
host_id: String,
|
host_id: String,
|
||||||
status: String,
|
status: String,
|
||||||
output: Option<String>,
|
output: Option<String>,
|
||||||
error_message: Option<String>,
|
error_message: Option<String>,
|
||||||
agent_job_id: String,
|
agent_job_id: String,
|
||||||
|
// Job-level fields (only present when event_type === "job")
|
||||||
|
succeeded_count: Option<i64>,
|
||||||
|
failed_count: Option<i64>,
|
||||||
|
host_count: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
@ -351,14 +356,18 @@ async fn process_event(pool: &PgPool, row: &RunningHostJob, event: &AgentWsEvent
|
|||||||
update_parent_job_status(pool, row.job_id).await;
|
update_parent_job_status(pool, row.job_id).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fire pg_notify so browser WS handlers forward the event.
|
// Fire pg_notify so browser WS handlers forward the host-level event.
|
||||||
let payload = NotifyPayload {
|
let payload = NotifyPayload {
|
||||||
|
event_type: "host".to_string(),
|
||||||
job_id: row.job_id.to_string(),
|
job_id: row.job_id.to_string(),
|
||||||
host_id: row.host_id.to_string(),
|
host_id: row.host_id.to_string(),
|
||||||
status: db_status.to_string(),
|
status: db_status.to_string(),
|
||||||
output: event.output.clone(),
|
output: event.output.clone(),
|
||||||
error_message: event.error.clone(),
|
error_message: event.error.clone(),
|
||||||
agent_job_id: row.agent_job_id.clone(),
|
agent_job_id: row.agent_job_id.clone(),
|
||||||
|
succeeded_count: None,
|
||||||
|
failed_count: None,
|
||||||
|
host_count: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let payload_json = match serde_json::to_string(&payload) {
|
let payload_json = match serde_json::to_string(&payload) {
|
||||||
@ -394,6 +403,9 @@ async fn process_event(pool: &PgPool, row: &RunningHostJob, event: &AgentWsEvent
|
|||||||
|
|
||||||
/// After a host-level job reaches a terminal state, check whether ALL hosts for
|
/// After a host-level job reaches a terminal state, check whether ALL hosts for
|
||||||
/// that job are now terminal and update the parent `patch_jobs` row accordingly.
|
/// that job are now terminal and update the parent `patch_jobs` row accordingly.
|
||||||
|
///
|
||||||
|
/// If the parent job transitions to a terminal status, also fires a `job_update`
|
||||||
|
/// pg_notify with `event_type: "job"` so the frontend can update the job row.
|
||||||
async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) {
|
async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) {
|
||||||
// Count hosts that are still in a non-terminal state.
|
// Count hosts that are still in a non-terminal state.
|
||||||
let pending: i64 = match sqlx::query_scalar(
|
let pending: i64 = match sqlx::query_scalar(
|
||||||
@ -423,22 +435,36 @@ async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) {
|
|||||||
return; // still hosts running — parent stays running
|
return; // still hosts running — parent stays running
|
||||||
}
|
}
|
||||||
|
|
||||||
// All hosts terminal — determine final parent status.
|
// All hosts terminal — determine final parent status and counts.
|
||||||
let failed_count: i64 = match sqlx::query_scalar(
|
#[derive(sqlx::FromRow)]
|
||||||
"SELECT COUNT(*) FROM patch_job_hosts WHERE job_id = $1 AND status = 'failed'::job_status",
|
struct RollupCounts {
|
||||||
|
total: i64,
|
||||||
|
succeeded: i64,
|
||||||
|
failed: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
let counts: RollupCounts = match sqlx::query_as(
|
||||||
|
r#"
|
||||||
|
SELECT
|
||||||
|
COUNT(*) AS total,
|
||||||
|
COUNT(*) FILTER (WHERE status = 'succeeded') AS succeeded,
|
||||||
|
COUNT(*) FILTER (WHERE status = 'failed') AS failed
|
||||||
|
FROM patch_job_hosts
|
||||||
|
WHERE job_id = $1
|
||||||
|
"#,
|
||||||
)
|
)
|
||||||
.bind(job_id)
|
.bind(job_id)
|
||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(n) => n,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(error = %e, %job_id, "update_parent_job_status: failed-count query failed");
|
tracing::error!(error = %e, %job_id, "update_parent_job_status: rollup query failed");
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let final_status = if failed_count > 0 {
|
let final_status = if counts.failed > 0 {
|
||||||
"failed"
|
"failed"
|
||||||
} else {
|
} else {
|
||||||
"succeeded"
|
"succeeded"
|
||||||
@ -458,11 +484,52 @@ async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) {
|
|||||||
status = %final_status,
|
status = %final_status,
|
||||||
"update_parent_job_status: UPDATE failed"
|
"update_parent_job_status: UPDATE failed"
|
||||||
);
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
%job_id,
|
||||||
|
status = %final_status,
|
||||||
|
"Parent job status updated"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Fire job-level pg_notify so the frontend can update the job row.
|
||||||
|
let payload = NotifyPayload {
|
||||||
|
event_type: "job".to_string(),
|
||||||
|
job_id: job_id.to_string(),
|
||||||
|
host_id: String::new(), // no specific host for job-level events
|
||||||
|
status: final_status.to_string(),
|
||||||
|
output: None,
|
||||||
|
error_message: None,
|
||||||
|
agent_job_id: String::new(),
|
||||||
|
succeeded_count: Some(counts.succeeded),
|
||||||
|
failed_count: Some(counts.failed),
|
||||||
|
host_count: Some(counts.total),
|
||||||
|
};
|
||||||
|
|
||||||
|
let payload_json = match serde_json::to_string(&payload) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, %job_id, "update_parent_job_status: failed to serialize job-level notify payload");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = sqlx::query("SELECT pg_notify('job_update', $1)")
|
||||||
|
.bind(&payload_json)
|
||||||
|
.execute(pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
%job_id,
|
||||||
|
"update_parent_job_status: job-level pg_notify failed"
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
%job_id,
|
%job_id,
|
||||||
status = %final_status,
|
status = %final_status,
|
||||||
"Parent job status updated"
|
"Job-level pg_notify sent"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -345,36 +345,59 @@ export default function JobsPage() {
|
|||||||
|
|
||||||
// ── WS event handler — surgical state updates ─────────────────────────────
|
// ── WS event handler — surgical state updates ─────────────────────────────
|
||||||
const handleWsEvent = useCallback((event: JobWsEvent) => {
|
const handleWsEvent = useCallback((event: JobWsEvent) => {
|
||||||
// Update matching job summary row status.
|
if (event.event_type === 'job') {
|
||||||
setJobs((prev) =>
|
// ── Job-level event: authoritative status + counts from backend ──
|
||||||
prev.map((job) => {
|
setJobs((prev) =>
|
||||||
if (job.id !== event.job_id) return job
|
prev.map((job) => {
|
||||||
const updated = { ...job, status: event.status }
|
if (job.id !== event.job_id) return job
|
||||||
// Increment counters when a host reaches a terminal state.
|
return {
|
||||||
if (event.status === 'succeeded') {
|
...job,
|
||||||
updated.succeeded_count = job.succeeded_count + 1
|
status: event.status,
|
||||||
} else if (event.status === 'failed') {
|
succeeded_count: event.succeeded_count ?? job.succeeded_count,
|
||||||
updated.failed_count = job.failed_count + 1
|
failed_count: event.failed_count ?? job.failed_count,
|
||||||
}
|
host_count: event.host_count ?? job.host_count,
|
||||||
return updated
|
}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
} else {
|
||||||
|
// ── Host-level event: update detail row + optimistic counters only ──
|
||||||
|
setJobs((prev) =>
|
||||||
|
prev.map((job) => {
|
||||||
|
if (job.id !== event.job_id) return job
|
||||||
|
const updated = { ...job }
|
||||||
|
// Optimistically increment counters when a host reaches a terminal state.
|
||||||
|
// The authoritative rollup will arrive as a job-level event later.
|
||||||
|
if (event.status === 'succeeded') {
|
||||||
|
updated.succeeded_count = job.succeeded_count + 1
|
||||||
|
} else if (event.status === 'failed') {
|
||||||
|
updated.failed_count = job.failed_count + 1
|
||||||
|
}
|
||||||
|
// If any host is still running, ensure the job shows 'running'.
|
||||||
|
// Do NOT promote host status to job status — only the job-level
|
||||||
|
// event can set the parent job to a terminal state.
|
||||||
|
if (event.status === 'running' && job.status === 'queued') {
|
||||||
|
updated.status = 'running'
|
||||||
|
}
|
||||||
|
return updated
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
// Also update the host row in the expanded detail panel if loaded.
|
// Update the host row in the expanded detail panel if loaded.
|
||||||
setDetails((prev) => {
|
setDetails((prev) => {
|
||||||
const detail = prev[event.job_id]
|
const detail = prev[event.job_id]
|
||||||
if (!detail) return prev
|
if (!detail) return prev
|
||||||
const updatedHosts = detail.hosts.map((h) => {
|
const updatedHosts = detail.hosts.map((h) => {
|
||||||
if (h.host_id !== event.host_id) return h
|
if (h.host_id !== event.host_id) return h
|
||||||
return {
|
return {
|
||||||
...h,
|
...h,
|
||||||
status: event.status,
|
status: event.status,
|
||||||
...(event.error_message ? { error_message: event.error_message } : {}),
|
...(event.error_message ? { error_message: event.error_message } : {}),
|
||||||
...(event.agent_job_id ? { agent_job_id: event.agent_job_id } : {}),
|
...(event.agent_job_id ? { agent_job_id: event.agent_job_id } : {}),
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
return { ...prev, [event.job_id]: { ...detail, hosts: updatedHosts } }
|
||||||
})
|
})
|
||||||
return { ...prev, [event.job_id]: { ...detail, hosts: updatedHosts } }
|
}
|
||||||
})
|
|
||||||
}, [])
|
}, [])
|
||||||
|
|
||||||
// ── WebSocket connection ──────────────────────────────────────────────────
|
// ── WebSocket connection ──────────────────────────────────────────────────
|
||||||
|
|||||||
@ -159,12 +159,17 @@ export interface UpdateMaintenanceWindowRequest {
|
|||||||
// ── WebSocket event types (M7) ────────────────────────────────────────────────
|
// ── WebSocket event types (M7) ────────────────────────────────────────────────
|
||||||
|
|
||||||
export interface JobWsEvent {
|
export interface JobWsEvent {
|
||||||
|
event_type?: 'host' | 'job' // defaults to 'host' for backward compat
|
||||||
job_id: string
|
job_id: string
|
||||||
host_id: string
|
host_id: string
|
||||||
status: JobStatus
|
status: JobStatus
|
||||||
output?: string
|
output?: string
|
||||||
error_message?: string
|
error_message?: string
|
||||||
agent_job_id?: string
|
agent_job_id?: string
|
||||||
|
// Job-level fields (only present when event_type === 'job')
|
||||||
|
succeeded_count?: number
|
||||||
|
failed_count?: number
|
||||||
|
host_count?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Certificates (M8) ────────────────────────────────────────────────────────
|
// ── Certificates (M8) ────────────────────────────────────────────────────────
|
||||||
|
|||||||
Reference in New Issue
Block a user