Private
Public Access
1
0

M5: Patch Deployment & Job Management

Backend:
- migrations/003_jobs_scheduling.sql: retry_next_at/last_error columns,
  pg_notify trigger for immediate job dispatch, retry index
- pm-agent-client: ApplyPatchesRequest/Response, AgentJobStatus,
  RollbackResponse types; apply_patches/job_status/rollback_job
  client methods + generic POST helper
- pm-core/models: JobStatus, JobKind, PatchJob, PatchJobHost,
  CreateJobRequest, PatchJobSummary
- pm-web/routes/jobs.rs: POST/GET /api/v1/jobs, GET /jobs/:id,
  POST /jobs/:id/cancel, POST /jobs/:id/rollback
- pm-worker/job_executor.rs: NOTIFY listener, periodic scanner,
  execute_host_job, poll_running_jobs, handle_host_failure (3-retry
  exponential backoff 1m/5m/30m), sync_job_status, retry_pending_jobs
- pm-worker/main.rs: spawn job_executor

Frontend:
- types/index.ts: PatchInfo, PatchJobHost, PatchJob, PatchJobSummary,
  CreateJobRequest interfaces
- api/client.ts: jobsApi (list/get/create/cancel/rollback),
  patchesApi (getHostPatches)
- pages/PatchDeploymentPage.tsx: 3-step MUI Stepper
  (host select → configure → result)
- pages/JobsPage.tsx: job list table, expandable per-host detail,
  cancel/rollback actions with confirm dialog, load-more pagination
- App.tsx: /jobs and /deployment routes wired to real pages

cargo check: 0 errors | vite build: 0 errors
This commit is contained in:
2026-04-23 17:08:43 +00:00
parent a6eb762962
commit 6f9c6dc881
30 changed files with 8465 additions and 44 deletions

View File

@ -0,0 +1,45 @@
//! Helper for loading mTLS certificate/key material from disk.
//!
//! Reads PEM files referenced in [`SecurityConfig`] and returns the raw bytes
//! needed by [`pm_agent_client::AgentClient`].
use pm_core::config::SecurityConfig;
/// Raw PEM bytes for mTLS client authentication and CA verification.
pub struct AgentCerts {
pub client_cert: Vec<u8>,
pub client_key: Vec<u8>,
pub ca_cert: Vec<u8>,
}
/// Load agent mTLS certificates from the paths specified in [`SecurityConfig`].
///
/// Returns an error if any file cannot be read. The caller should handle
/// the error gracefully (log and skip the poll cycle) rather than crashing.
pub fn load_agent_certs(security: &SecurityConfig) -> anyhow::Result<AgentCerts> {
let client_cert = std::fs::read(&security.agent_client_cert_path).map_err(|e| {
anyhow::anyhow!(
"Failed to read agent client cert '{}': {}",
security.agent_client_cert_path,
e
)
})?;
let client_key = std::fs::read(&security.agent_client_key_path).map_err(|e| {
anyhow::anyhow!(
"Failed to read agent client key '{}': {}",
security.agent_client_key_path,
e
)
})?;
let ca_cert = std::fs::read(&security.ca_cert_path).map_err(|e| {
anyhow::anyhow!(
"Failed to read CA cert '{}': {}",
security.ca_cert_path,
e
)
})?;
Ok(AgentCerts { client_cert, client_key, ca_cert })
}

View File

@ -0,0 +1,202 @@
//! Periodic health poller for all registered hosts.
//!
//! Polls every host via the agent `/health` endpoint on each tick of
//! `health_poll_interval_secs`, with bounded concurrency controlled by a
//! [`tokio::sync::Semaphore`].
use std::sync::Arc;
use pm_agent_client::{AgentClient, AgentClientError};
use pm_core::{
config::AppConfig,
models::HostHealthStatus,
};
use sqlx::{FromRow, PgPool};
use tokio::{
sync::Semaphore,
time,
};
use uuid::Uuid;
use crate::agent_loader::load_agent_certs;
/// Minimal host projection fetched for each poll cycle.
#[derive(Debug, FromRow)]
struct HostRow {
id: Uuid,
ip_address: String,
agent_port: i32,
}
/// Run the health poller loop indefinitely.
///
/// On each tick all registered hosts are queried concurrently (up to
/// `max_concurrent_agent_calls` in-flight at once). Results are persisted
/// to `host_health_data` and the `hosts` table is updated.
pub async fn run_health_poller(pool: PgPool, config: Arc<AppConfig>) {
let interval_secs = config.worker.health_poll_interval_secs;
let mut ticker = time::interval(std::time::Duration::from_secs(interval_secs));
tracing::info!(
interval_secs,
"Health poller started"
);
loop {
ticker.tick().await;
// Load certs on each cycle so cert rotation is picked up automatically.
let certs = match load_agent_certs(&config.security) {
Ok(c) => c,
Err(e) => {
tracing::error!(error = %e, "Health poller: failed to load agent certs — skipping cycle");
continue;
}
};
let client_cert = Arc::new(certs.client_cert);
let client_key = Arc::new(certs.client_key);
let ca_cert = Arc::new(certs.ca_cert);
// Fetch all hosts.
let hosts: Vec<HostRow> = match sqlx::query_as(
"SELECT id, ip_address::text AS ip_address, agent_port FROM hosts ORDER BY id",
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(e) => {
tracing::error!(error = %e, "Health poller: failed to fetch hosts");
continue;
}
};
if hosts.is_empty() {
tracing::debug!("Health poller: no hosts registered, skipping cycle");
continue;
}
let total = hosts.len();
let semaphore = Arc::new(Semaphore::new(config.worker.max_concurrent_agent_calls));
let mut handles = Vec::with_capacity(total);
for host in hosts {
let pool = pool.clone();
let sem = semaphore.clone();
let cert = client_cert.clone();
let key = client_key.clone();
let ca = ca_cert.clone();
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.expect("semaphore closed");
poll_host_health(pool, host, &cert, &key, &ca).await
});
handles.push(handle);
}
// Collect results and tally counts.
let mut healthy = 0usize;
let mut degraded = 0usize;
let mut unreachable = 0usize;
for handle in handles {
match handle.await {
Ok(HostHealthStatus::Healthy) => healthy += 1,
Ok(HostHealthStatus::Degraded) => degraded += 1,
Ok(HostHealthStatus::Unreachable) => unreachable += 1,
Ok(_) => {}
Err(e) => tracing::error!(error = %e, "Health poller task panicked"),
}
}
tracing::info!(
total,
healthy,
degraded,
unreachable,
"Health poll cycle complete"
);
}
}
/// Poll a single host, persist the result, and return the determined status.
async fn poll_host_health(
pool: PgPool,
host: HostRow,
client_cert: &[u8],
client_key: &[u8],
ca_cert: &[u8],
) -> HostHealthStatus {
// Determine status and optional health payload.
let (status, payload) = match AgentClient::new(
&host.ip_address,
host.agent_port as u16,
client_cert,
client_key,
ca_cert,
) {
Err(e) => {
tracing::warn!(
host_id = %host.id,
error = %e,
"Health poller: failed to build AgentClient"
);
(HostHealthStatus::Unreachable, serde_json::Value::Object(Default::default()))
}
Ok(client) => match client.health().await {
Ok(data) => {
let payload = serde_json::to_value(&data).unwrap_or_default();
(HostHealthStatus::Healthy, payload)
}
Err(AgentClientError::Timeout) => {
tracing::warn!(host_id = %host.id, "Health poller: agent timed out");
(HostHealthStatus::Unreachable, serde_json::Value::Object(Default::default()))
}
Err(AgentClientError::Connect(_)) => {
tracing::warn!(host_id = %host.id, "Health poller: agent connection refused");
(HostHealthStatus::Unreachable, serde_json::Value::Object(Default::default()))
}
Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Health poller: agent error");
(HostHealthStatus::Degraded, serde_json::Value::Object(Default::default()))
}
},
};
// Insert into host_health_data.
if let Err(e) = sqlx::query(
r#"
INSERT INTO host_health_data (host_id, status, payload)
VALUES ($1, $2, $3)
"#,
)
.bind(host.id)
.bind(&status)
.bind(&payload)
.execute(&pool)
.await
{
tracing::error!(host_id = %host.id, error = %e, "Health poller: failed to insert health data");
}
// Update hosts table.
if let Err(e) = sqlx::query(
r#"
UPDATE hosts
SET health_status = $2, last_health_at = NOW()
WHERE id = $1
"#,
)
.bind(host.id)
.bind(&status)
.execute(&pool)
.await
{
tracing::error!(host_id = %host.id, error = %e, "Health poller: failed to update host status");
}
status
}

View File

@ -0,0 +1,826 @@
//! Job execution engine.
//!
//! Picks up patch jobs from the database, dispatches them to agents via mTLS,
//! tracks progress, and handles retries with exponential back-off.
//!
//! Two concurrent loops run inside [`run_job_executor`]:
//!
//! 1. **NOTIFY listener** — listens on `job_enqueued`; triggers immediate
//! dispatch for newly-enqueued jobs.
//! 2. **Periodic scanner** — every 60 seconds:
//! - picks up queued non-immediate jobs that were missed by NOTIFY,
//! - polls running agent jobs for completion,
//! - retries pending host jobs whose back-off window has elapsed.
use std::sync::Arc;
use chrono::{Duration as ChronoDuration, Utc};
use pm_agent_client::{AgentClient, types::ApplyPatchesRequest};
use pm_core::config::AppConfig;
use sqlx::{FromRow, PgPool};
use tokio::{sync::Semaphore, time};
use uuid::Uuid;
use crate::agent_loader::load_agent_certs;
// ─────────────────────────────────────────────────────────────────────────────
// Internal DB row types
// ─────────────────────────────────────────────────────────────────────────────
#[derive(Debug, FromRow)]
#[allow(dead_code)]
struct PatchJobHostQueued {
id: Uuid,
host_id: Uuid,
job_id: Uuid,
}
#[derive(Debug, FromRow)]
struct PatchJobHostRunning {
id: Uuid,
agent_job_id: String,
job_id: Uuid,
ip_address: String,
agent_port: i32,
}
#[derive(Debug, FromRow)]
struct PatchJobHostPending {
id: Uuid,
host_id: Uuid,
job_id: Uuid,
}
#[derive(Debug, FromRow)]
struct HostRow {
ip_address: String,
agent_port: i32,
}
#[derive(Debug, FromRow)]
struct JobPatchSelection {
patch_selection: serde_json::Value,
}
#[derive(Debug, FromRow)]
struct RetryRow {
job_id: Uuid,
retry_count: i32,
}
#[derive(Debug, FromRow)]
struct StatusCounts {
running_count: i64,
pending_count: i64,
queued_count: i64,
succeeded_count: i64,
failed_count: i64,
cancelled_count: i64,
total_count: i64,
}
// ─────────────────────────────────────────────────────────────────────────────
// Public entry point
// ─────────────────────────────────────────────────────────────────────────────
/// Spawn the job executor and run it indefinitely.
///
/// Runs two independent tasks joined until both complete (they never do under
/// normal operation):
/// - NOTIFY-driven immediate dispatch (auto-reconnect on DB disconnect).
/// - 60-second periodic scanner for queued / running / pending rows.
pub async fn run_job_executor(pool: PgPool, config: Arc<AppConfig>) {
tracing::info!("Job executor started");
let (pool_n, cfg_n) = (pool.clone(), config.clone());
let (pool_s, cfg_s) = (pool.clone(), config.clone());
let notify_task = tokio::spawn(async move {
run_notify_listener(pool_n, cfg_n).await;
});
let scan_task = tokio::spawn(async move {
run_periodic_scanner(pool_s, cfg_s).await;
});
let _ = tokio::join!(notify_task, scan_task);
}
// ─────────────────────────────────────────────────────────────────────────────
// NOTIFY listener (outer reconnect wrapper)
// ─────────────────────────────────────────────────────────────────────────────
async fn run_notify_listener(pool: PgPool, config: Arc<AppConfig>) {
tracing::info!("Job executor NOTIFY listener starting");
loop {
if let Err(e) = notify_listen_loop(&pool, &config).await {
tracing::error!(
error = %e,
"Job executor NOTIFY listener disconnected, reconnecting in 5s"
);
time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
/// Inner NOTIFY loop — returns `Err` only on a fatal connection error so the
/// outer loop can reconnect.
async fn notify_listen_loop(
pool: &PgPool,
config: &Arc<AppConfig>,
) -> anyhow::Result<()> {
let mut listener =
sqlx::postgres::PgListener::connect(&config.database.url).await?;
listener.listen("job_enqueued").await?;
tracing::debug!("Job executor NOTIFY listener connected");
loop {
let notification = listener.recv().await?;
let payload = notification.payload().to_string();
tracing::info!(payload, "job_enqueued notification received");
let job_id = match payload.parse::<Uuid>() {
Ok(id) => id,
Err(e) => {
tracing::warn!(
payload,
error = %e,
"Job executor: invalid UUID in job_enqueued payload"
);
continue;
}
};
let (p, c) = (pool.clone(), config.clone());
tokio::spawn(async move {
process_job(p, c, job_id).await;
});
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Periodic scanner
// ─────────────────────────────────────────────────────────────────────────────
async fn run_periodic_scanner(pool: PgPool, config: Arc<AppConfig>) {
// First tick fires immediately — consume it to avoid a duplicate burst
// right after NOTIFY already dispatched the same jobs.
let mut ticker = time::interval(std::time::Duration::from_secs(60));
ticker.tick().await;
loop {
ticker.tick().await;
tracing::debug!("Job executor periodic scan starting");
// 1. Pick up queued pjh rows that belong to non-cancelled jobs.
scan_queued_jobs(pool.clone(), config.clone()).await;
// 2. Poll running pjh rows against the agent.
poll_running_jobs(pool.clone(), config.clone()).await;
// 3. Retry pending pjh rows whose back-off window has elapsed.
retry_pending_jobs(pool.clone(), config.clone()).await;
tracing::debug!("Job executor periodic scan complete");
}
}
// ─────────────────────────────────────────────────────────────────────────────
// scan_queued_jobs — feeds non-immediate jobs into process_job
// ─────────────────────────────────────────────────────────────────────────────
/// Discover distinct job-IDs that have queued host entries ready for dispatch
/// and call [`process_job`] for each.
async fn scan_queued_jobs(pool: PgPool, config: Arc<AppConfig>) {
#[derive(FromRow)]
struct JobIdRow {
job_id: Uuid,
}
let rows: Vec<JobIdRow> = match sqlx::query_as(
r#"
SELECT DISTINCT pjh.job_id
FROM patch_job_hosts pjh
JOIN patch_jobs j ON j.id = pjh.job_id
WHERE pjh.status = 'queued'
AND (pjh.retry_next_at IS NULL OR pjh.retry_next_at <= NOW())
AND j.status != 'cancelled'
"#,
)
.fetch_all(&pool)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, "scan_queued_jobs: DB query failed");
return;
}
};
for row in rows {
let (p, c) = (pool.clone(), config.clone());
tokio::spawn(async move {
process_job(p, c, row.job_id).await;
});
}
}
// ─────────────────────────────────────────────────────────────────────────────
// process_job
// ─────────────────────────────────────────────────────────────────────────────
/// Fetch all queued host entries for `job_id` and dispatch them concurrently,
/// bounded by `config.worker.max_concurrent_agent_calls`.
async fn process_job(pool: PgPool, config: Arc<AppConfig>, job_id: Uuid) {
tracing::info!(%job_id, "process_job: dispatching queued hosts");
// Mark the parent job as running (idempotent guard).
if let Err(e) = sqlx::query(
r#"
UPDATE patch_jobs
SET status = 'running',
started_at = COALESCE(started_at, NOW())
WHERE id = $1
AND status NOT IN ('running','succeeded','failed','cancelled')
"#,
)
.bind(job_id)
.execute(&pool)
.await
{
tracing::error!(%job_id, error = %e, "process_job: failed to mark job running");
}
// Fetch all queued host entries for this job.
let hosts: Vec<PatchJobHostQueued> = match sqlx::query_as(
r#"
SELECT id, host_id, job_id
FROM patch_job_hosts
WHERE job_id = $1
AND status = 'queued'
"#,
)
.bind(job_id)
.fetch_all(&pool)
.await
{
Ok(h) => h,
Err(e) => {
tracing::error!(%job_id, error = %e, "process_job: failed to fetch queued hosts");
return;
}
};
if hosts.is_empty() {
tracing::debug!(%job_id, "process_job: no queued hosts found (already dispatched)");
return;
}
let sem = Arc::new(Semaphore::new(config.worker.max_concurrent_agent_calls));
for host in hosts {
let permit = match sem.clone().acquire_owned().await {
Ok(p) => p,
Err(e) => {
tracing::error!(%job_id, error = %e, "process_job: semaphore closed");
break;
}
};
let (p, c) = (pool.clone(), config.clone());
let pjh_id = host.id;
let host_id = host.host_id;
tokio::spawn(async move {
execute_host_job(p, c, job_id, host_id, pjh_id).await;
drop(permit);
});
}
}
// ─────────────────────────────────────────────────────────────────────────────
// execute_host_job
// ─────────────────────────────────────────────────────────────────────────────
/// Connect to a single host agent, submit the patch job, and record the
/// agent-assigned async job ID for later polling.
async fn execute_host_job(
pool: PgPool,
config: Arc<AppConfig>,
job_id: Uuid,
host_id: Uuid,
pjh_id: Uuid,
) {
tracing::info!(%job_id, %host_id, %pjh_id, "execute_host_job: starting");
// ── 1. Fetch host connection details ─────────────────────────────────────
let host: HostRow = match sqlx::query_as(
"SELECT ip_address::text AS ip_address, agent_port FROM hosts WHERE id = $1",
)
.bind(host_id)
.fetch_optional(&pool)
.await
{
Ok(Some(h)) => h,
Ok(None) => {
tracing::error!(%host_id, "execute_host_job: host not found");
handle_host_failure(
pool,
pjh_id,
format!("Host {host_id} not found in database"),
)
.await;
return;
}
Err(e) => {
tracing::error!(%host_id, error = %e, "execute_host_job: DB error fetching host");
handle_host_failure(pool, pjh_id, format!("DB error fetching host: {e}")).await;
return;
}
};
// ── 2. Fetch the job's patch_selection ──────────────────────────────────
let patch_sel: JobPatchSelection = match sqlx::query_as(
"SELECT patch_selection FROM patch_jobs WHERE id = $1",
)
.bind(job_id)
.fetch_optional(&pool)
.await
{
Ok(Some(row)) => row,
Ok(None) => {
tracing::error!(%job_id, "execute_host_job: parent job not found");
handle_host_failure(pool, pjh_id, format!("Parent job {job_id} not found")).await;
return;
}
Err(e) => {
tracing::error!(%job_id, error = %e, "execute_host_job: DB error fetching job");
handle_host_failure(pool, pjh_id, format!("DB error fetching job: {e}")).await;
return;
}
};
let packages: Vec<String> =
serde_json::from_value(patch_sel.patch_selection).unwrap_or_default();
// ── 3. Load mTLS certs ───────────────────────────────────────────────────
let certs = match load_agent_certs(&config.security) {
Ok(c) => c,
Err(e) => {
tracing::error!(%host_id, error = %e, "execute_host_job: failed to load agent certs");
handle_host_failure(pool, pjh_id, format!("Failed to load agent certs: {e}")).await;
return;
}
};
// ── 4. Build AgentClient ─────────────────────────────────────────────────
let client = match AgentClient::new(
&host.ip_address,
host.agent_port as u16,
&certs.client_cert,
&certs.client_key,
&certs.ca_cert,
) {
Ok(c) => c,
Err(e) => {
tracing::error!(%host_id, error = %e, "execute_host_job: failed to build AgentClient");
handle_host_failure(pool, pjh_id, format!("Failed to build agent client: {e}")).await;
return;
}
};
// ── 5. Mark pjh as running ───────────────────────────────────────────────
if let Err(e) = sqlx::query(
r#"
UPDATE patch_job_hosts
SET status = 'running',
started_at = COALESCE(started_at, NOW())
WHERE id = $1
"#,
)
.bind(pjh_id)
.execute(&pool)
.await
{
tracing::error!(%pjh_id, error = %e, "execute_host_job: failed to mark pjh running");
}
// ── 6. Submit the patch job to the agent ─────────────────────────────────
let req = ApplyPatchesRequest { packages, allow_reboot: true };
match client.apply_patches(&req).await {
Ok(resp) => {
tracing::info!(
%pjh_id,
agent_job_id = %resp.job_id,
"execute_host_job: agent accepted job"
);
// ── 7. Store agent_job_id; status stays 'running' (agent is async) ──
if let Err(e) = sqlx::query(
"UPDATE patch_job_hosts SET agent_job_id = $1 WHERE id = $2",
)
.bind(&resp.job_id)
.bind(pjh_id)
.execute(&pool)
.await
{
tracing::error!(
%pjh_id,
error = %e,
"execute_host_job: failed to store agent_job_id"
);
}
}
Err(e) => {
tracing::warn!(%pjh_id, error = %e, "execute_host_job: agent rejected job");
handle_host_failure(pool, pjh_id, format!("Agent error: {e}")).await;
}
}
}
// ─────────────────────────────────────────────────────────────────────────────
// poll_running_jobs
// ─────────────────────────────────────────────────────────────────────────────
/// Poll all running pjh rows that have an agent job ID and update their status.
pub async fn poll_running_jobs(pool: PgPool, config: Arc<AppConfig>) {
let rows: Vec<PatchJobHostRunning> = match sqlx::query_as(
r#"
SELECT pjh.id,
pjh.agent_job_id,
pjh.job_id,
h.ip_address::text AS ip_address,
h.agent_port
FROM patch_job_hosts pjh
JOIN hosts h ON h.id = pjh.host_id
WHERE pjh.status = 'running'
AND pjh.agent_job_id IS NOT NULL
"#,
)
.fetch_all(&pool)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, "poll_running_jobs: DB query failed");
return;
}
};
for row in rows {
let (p, c) = (pool.clone(), config.clone());
tokio::spawn(async move {
poll_single_host(p, c, row).await;
});
}
}
/// Poll one running host entry and update its status from the agent response.
async fn poll_single_host(
pool: PgPool,
config: Arc<AppConfig>,
row: PatchJobHostRunning,
) {
let certs = match load_agent_certs(&config.security) {
Ok(c) => c,
Err(e) => {
tracing::error!(
pjh_id = %row.id,
error = %e,
"poll_single_host: failed to load agent certs"
);
return;
}
};
let client = match AgentClient::new(
&row.ip_address,
row.agent_port as u16,
&certs.client_cert,
&certs.client_key,
&certs.ca_cert,
) {
Ok(c) => c,
Err(e) => {
tracing::error!(
pjh_id = %row.id,
error = %e,
"poll_single_host: failed to build AgentClient"
);
return;
}
};
let status = match client.job_status(&row.agent_job_id).await {
Ok(s) => s,
Err(e) => {
tracing::warn!(
pjh_id = %row.id,
agent_job_id = %row.agent_job_id,
error = %e,
"poll_single_host: agent status call failed"
);
return;
}
};
match status.status.as_str() {
"succeeded" => {
tracing::info!(pjh_id = %row.id, "poll_single_host: agent job succeeded");
if let Err(e) = sqlx::query(
r#"
UPDATE patch_job_hosts
SET status = 'succeeded',
completed_at = NOW(),
output = $2
WHERE id = $1
"#,
)
.bind(row.id)
.bind(status.output.as_deref())
.execute(&pool)
.await
{
tracing::error!(pjh_id = %row.id, error = %e, "poll_single_host: update failed");
}
sync_job_status(&pool, row.job_id).await;
}
"failed" => {
tracing::warn!(pjh_id = %row.id, "poll_single_host: agent job failed");
let err_msg = status
.error
.unwrap_or_else(|| "Agent reported failure (no detail)".to_string());
handle_host_failure(pool, row.id, err_msg).await;
}
"running" | "queued" => {
// Still in progress — nothing to update; will poll again next cycle.
tracing::debug!(
pjh_id = %row.id,
agent_status = %status.status,
"poll_single_host: job still in progress"
);
}
other => {
tracing::warn!(
pjh_id = %row.id,
agent_status = %other,
"poll_single_host: unexpected agent status — ignoring"
);
}
}
}
// ─────────────────────────────────────────────────────────────────────────────
// handle_host_failure
// ─────────────────────────────────────────────────────────────────────────────
/// Apply exponential back-off retry logic to a failed host job entry.
///
/// Retries up to 3 times (1 min / 5 min / 30 min delays). After the third
/// failure the entry is marked `failed` and the parent job status is synced.
async fn handle_host_failure(pool: PgPool, pjh_id: Uuid, error_msg: String) {
let row: Option<RetryRow> = match sqlx::query_as(
"SELECT job_id, retry_count FROM patch_job_hosts WHERE id = $1",
)
.bind(pjh_id)
.fetch_optional(&pool)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!(%pjh_id, error = %e, "handle_host_failure: DB error fetching retry row");
return;
}
};
let row = match row {
Some(r) => r,
None => {
tracing::error!(%pjh_id, "handle_host_failure: pjh row not found");
return;
}
};
if row.retry_count < 3 {
let new_retry_count = row.retry_count + 1;
let retry_next_at = Utc::now()
+ match new_retry_count {
1 => ChronoDuration::minutes(1),
2 => ChronoDuration::minutes(5),
_ => ChronoDuration::minutes(30),
};
tracing::warn!(
%pjh_id,
retry_count = new_retry_count,
?retry_next_at,
error = %error_msg,
"handle_host_failure: scheduling retry"
);
if let Err(e) = sqlx::query(
r#"
UPDATE patch_job_hosts
SET status = 'pending',
retry_count = $2,
retry_next_at = $3,
last_error = $4
WHERE id = $1
"#,
)
.bind(pjh_id)
.bind(new_retry_count)
.bind(retry_next_at)
.bind(&error_msg)
.execute(&pool)
.await
{
tracing::error!(%pjh_id, error = %e, "handle_host_failure: failed to set pending");
}
} else {
tracing::warn!(
%pjh_id,
retry_count = row.retry_count,
error = %error_msg,
"handle_host_failure: max retries exceeded, marking failed"
);
if let Err(e) = sqlx::query(
r#"
UPDATE patch_job_hosts
SET status = 'failed',
error_message = $2,
completed_at = NOW()
WHERE id = $1
"#,
)
.bind(pjh_id)
.bind(&error_msg)
.execute(&pool)
.await
{
tracing::error!(%pjh_id, error = %e, "handle_host_failure: failed to mark pjh failed");
}
sync_job_status(&pool, row.job_id).await;
}
}
// ─────────────────────────────────────────────────────────────────────────────
// sync_job_status
// ─────────────────────────────────────────────────────────────────────────────
/// Roll up `patch_job_hosts` aggregate status into the parent `patch_jobs` row.
///
/// Logic (in priority order):
/// 1. Any `running` or `pending` hosts → keep parent `running`.
/// 2. All hosts `succeeded` → parent `succeeded`.
/// 3. All hosts `cancelled` → parent `cancelled`.
/// 4. Any `failed` with none still active → parent `failed` (includes partial).
async fn sync_job_status(pool: &PgPool, job_id: Uuid) {
let counts: StatusCounts = match sqlx::query_as(
r#"
SELECT
COUNT(*) FILTER (WHERE status = 'running') AS running_count,
COUNT(*) FILTER (WHERE status = 'pending') AS pending_count,
COUNT(*) FILTER (WHERE status = 'queued') AS queued_count,
COUNT(*) FILTER (WHERE status = 'succeeded') AS succeeded_count,
COUNT(*) FILTER (WHERE status = 'failed') AS failed_count,
COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled_count,
COUNT(*) AS total_count
FROM patch_job_hosts
WHERE job_id = $1
"#,
)
.bind(job_id)
.fetch_one(pool)
.await
{
Ok(c) => c,
Err(e) => {
tracing::error!(%job_id, error = %e, "sync_job_status: DB query failed");
return;
}
};
// Determine the aggregate status.
let new_status: &str;
let set_completed: bool;
if counts.running_count > 0 || counts.pending_count > 0 || counts.queued_count > 0 {
// Still work in flight — keep parent running.
new_status = "running";
set_completed = false;
} else if counts.total_count > 0 && counts.succeeded_count == counts.total_count {
// Every host succeeded.
new_status = "succeeded";
set_completed = true;
} else if counts.total_count > 0 && counts.cancelled_count == counts.total_count {
// Every host cancelled.
new_status = "cancelled";
set_completed = true;
} else if counts.failed_count > 0 {
// At least one failure and nothing still active → failed (partial counts too).
new_status = "failed";
set_completed = true;
} else {
// Fallback: nothing actionable yet.
return;
}
tracing::info!(
%job_id,
new_status,
running = counts.running_count,
pending = counts.pending_count,
queued = counts.queued_count,
succeeded = counts.succeeded_count,
failed = counts.failed_count,
"sync_job_status: updating parent job"
);
let result = if set_completed {
sqlx::query(
r#"
UPDATE patch_jobs
SET status = $2,
completed_at = COALESCE(completed_at, NOW())
WHERE id = $1
"#,
)
.bind(job_id)
.bind(new_status)
.execute(pool)
.await
} else {
sqlx::query(
"UPDATE patch_jobs SET status = $2 WHERE id = $1",
)
.bind(job_id)
.bind(new_status)
.execute(pool)
.await
};
if let Err(e) = result {
tracing::error!(%job_id, error = %e, "sync_job_status: failed to update parent job");
}
}
// ─────────────────────────────────────────────────────────────────────────────
// retry_pending_jobs
// ─────────────────────────────────────────────────────────────────────────────
/// Find pending host entries whose back-off window has elapsed, reset them to
/// `queued`, and dispatch them immediately.
pub async fn retry_pending_jobs(pool: PgPool, config: Arc<AppConfig>) {
let rows: Vec<PatchJobHostPending> = match sqlx::query_as(
r#"
SELECT pjh.id, pjh.host_id, pjh.job_id
FROM patch_job_hosts pjh
JOIN patch_jobs j ON j.id = pjh.job_id
WHERE pjh.status = 'pending'
AND pjh.retry_next_at <= NOW()
AND j.status != 'cancelled'
"#,
)
.fetch_all(&pool)
.await
{
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, "retry_pending_jobs: DB query failed");
return;
}
};
for row in rows {
// Reset to queued so execute_host_job can pick it up cleanly.
if let Err(e) = sqlx::query(
"UPDATE patch_job_hosts SET status = 'queued', retry_next_at = NULL WHERE id = $1",
)
.bind(row.id)
.execute(&pool)
.await
{
tracing::error!(
pjh_id = %row.id,
error = %e,
"retry_pending_jobs: failed to reset pjh to queued"
);
continue;
}
tracing::info!(
pjh_id = %row.id,
job_id = %row.job_id,
"retry_pending_jobs: re-dispatching host job"
);
let (p, c) = (pool.clone(), config.clone());
let (job_id, host_id, pjh_id) = (row.job_id, row.host_id, row.id);
tokio::spawn(async move {
execute_host_job(p, c, job_id, host_id, pjh_id).await;
});
}
}

View File

@ -3,6 +3,12 @@
//! Handles scheduled polling, job execution, maintenance window scheduling,
//! retry logic, email notifications, and data pruning.
mod agent_loader;
mod health_poller;
mod patch_poller;
mod refresh_listener;
mod job_executor;
use pm_core::{
config::AppConfig,
db,
@ -12,6 +18,11 @@ use sqlx::PgPool;
use std::{sync::Arc, time::Duration};
use tokio::time;
use health_poller::run_health_poller;
use patch_poller::run_patch_poller;
use refresh_listener::run_refresh_listener;
use job_executor::run_job_executor;
/// Minimum number of applied migrations the worker requires before
/// accepting work. Prevents the worker from running against a schema
/// that hasn't been migrated yet.
@ -51,14 +62,24 @@ async fn main() -> anyhow::Result<()> {
config.worker.heartbeat_interval_secs,
));
// TODO M4: spawn health_poller, patch_data_poller
// TODO M5: spawn job_executor
// TODO M6: spawn job_scheduler
// M4: agent health poller, patch data poller, on-demand refresh listener
let health_handle = tokio::spawn(run_health_poller(pool.clone(), config.clone()));
let patch_handle = tokio::spawn(run_patch_poller(pool.clone(), config.clone()));
let refresh_handle = tokio::spawn(run_refresh_listener(pool.clone(), config.clone()));
// M5: job execution engine
let job_exec_handle = tokio::spawn(run_job_executor(pool.clone(), config.clone()));
tracing::info!("Worker tasks started");
// Wait for all tasks (they run indefinitely)
let _ = tokio::join!(heartbeat_handle);
let _ = tokio::join!(
heartbeat_handle,
health_handle,
patch_handle,
refresh_handle,
job_exec_handle,
);
Ok(())
}

View File

@ -0,0 +1,209 @@
//! Periodic patch-data poller for all registered hosts.
//!
//! Polls every host via the agent `/patches` and `/packages` endpoints on
//! each tick of `patch_poll_interval_secs`, with bounded concurrency
//! controlled by a [`tokio::sync::Semaphore`].
use std::sync::Arc;
use pm_agent_client::AgentClient;
use pm_core::config::AppConfig;
use sqlx::{FromRow, PgPool};
use tokio::{
sync::Semaphore,
time,
};
use uuid::Uuid;
use crate::agent_loader::load_agent_certs;
/// Minimal host projection fetched for each poll cycle.
#[derive(Debug, FromRow)]
struct HostRow {
id: Uuid,
ip_address: String,
agent_port: i32,
}
/// Run the patch poller loop indefinitely.
///
/// On each tick all registered hosts are queried concurrently (up to
/// `max_concurrent_agent_calls` in-flight at once). Results are persisted
/// to `host_patch_data` and `hosts.last_patch_at` is updated.
pub async fn run_patch_poller(pool: PgPool, config: Arc<AppConfig>) {
let interval_secs = config.worker.patch_poll_interval_secs;
let mut ticker = time::interval(std::time::Duration::from_secs(interval_secs));
tracing::info!(
interval_secs,
"Patch poller started"
);
loop {
ticker.tick().await;
let certs = match load_agent_certs(&config.security) {
Ok(c) => c,
Err(e) => {
tracing::error!(error = %e, "Patch poller: failed to load agent certs — skipping cycle");
continue;
}
};
let client_cert = Arc::new(certs.client_cert);
let client_key = Arc::new(certs.client_key);
let ca_cert = Arc::new(certs.ca_cert);
let hosts: Vec<HostRow> = match sqlx::query_as(
"SELECT id, ip_address::text AS ip_address, agent_port FROM hosts ORDER BY id",
)
.fetch_all(&pool)
.await
{
Ok(rows) => rows,
Err(e) => {
tracing::error!(error = %e, "Patch poller: failed to fetch hosts");
continue;
}
};
if hosts.is_empty() {
tracing::debug!("Patch poller: no hosts registered, skipping cycle");
continue;
}
let total = hosts.len();
let semaphore = Arc::new(Semaphore::new(config.worker.max_concurrent_agent_calls));
let mut handles = Vec::with_capacity(total);
for host in hosts {
let pool = pool.clone();
let sem = semaphore.clone();
let cert = client_cert.clone();
let key = client_key.clone();
let ca = ca_cert.clone();
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.expect("semaphore closed");
poll_host_patches(pool, host, &cert, &key, &ca).await
});
handles.push(handle);
}
let mut succeeded = 0usize;
let mut failed = 0usize;
for handle in handles {
match handle.await {
Ok(true) => succeeded += 1,
Ok(false) => failed += 1,
Err(e) => {
tracing::error!(error = %e, "Patch poller task panicked");
failed += 1;
}
}
}
tracing::info!(
total,
succeeded,
failed,
"Patch poll cycle complete"
);
}
}
/// Poll a single host for patch and package data, persist the result.
/// Returns `true` on success, `false` on any error.
async fn poll_host_patches(
pool: PgPool,
host: HostRow,
client_cert: &[u8],
client_key: &[u8],
ca_cert: &[u8],
) -> bool {
let client = match AgentClient::new(
&host.ip_address,
host.agent_port as u16,
client_cert,
client_key,
ca_cert,
) {
Ok(c) => c,
Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Patch poller: failed to build AgentClient");
return false;
}
};
// Fetch patches and packages concurrently.
let (patches_result, packages_result) =
tokio::join!(client.patches(), client.packages_upgradable());
let patches_data = match patches_result {
Ok(d) => d,
Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Patch poller: patches() failed");
return false;
}
};
let packages_data = match packages_result {
Ok(d) => d,
Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Patch poller: packages_upgradable() failed");
return false;
}
};
let available_patches = serde_json::to_value(&patches_data.patches).unwrap_or_default();
let installed_packages = serde_json::to_value(&packages_data.packages).unwrap_or_default();
let patch_count = patches_data.total as i32;
let cve_count = patches_data
.patches
.iter()
.filter(|p| !p.cve_ids.is_empty())
.count() as i32;
// Insert into host_patch_data.
if let Err(e) = sqlx::query(
r#"
INSERT INTO host_patch_data
(host_id, available_patches, installed_packages, patch_count, cve_count)
VALUES ($1, $2, $3, $4, $5)
"#,
)
.bind(host.id)
.bind(&available_patches)
.bind(&installed_packages)
.bind(patch_count)
.bind(cve_count)
.execute(&pool)
.await
{
tracing::error!(host_id = %host.id, error = %e, "Patch poller: failed to insert patch data");
return false;
}
// Update hosts.last_patch_at.
if let Err(e) = sqlx::query(
"UPDATE hosts SET last_patch_at = NOW() WHERE id = $1",
)
.bind(host.id)
.execute(&pool)
.await
{
tracing::error!(host_id = %host.id, error = %e, "Patch poller: failed to update last_patch_at");
}
tracing::debug!(
host_id = %host.id,
patch_count,
cve_count,
"Patch data collected"
);
true
}

View File

@ -0,0 +1,265 @@
//! On-demand refresh listener.
//!
//! Listens on the PostgreSQL `refresh_requested` NOTIFY channel. When a
//! notification arrives the payload is expected to be a host UUID string.
//! The listener immediately polls that host for health and patch data and
//! persists the results — bypassing the normal poll intervals.
use std::sync::Arc;
use pm_agent_client::{AgentClient, AgentClientError};
use pm_core::{
config::AppConfig,
models::HostHealthStatus,
};
use sqlx::{FromRow, PgPool};
use tokio::time;
use uuid::Uuid;
use crate::agent_loader::load_agent_certs;
/// Minimal host row used for on-demand refresh.
#[derive(Debug, FromRow)]
struct HostRow {
id: Uuid,
ip_address: String,
agent_port: i32,
}
/// Run the LISTEN/NOTIFY refresh listener indefinitely.
///
/// Automatically reconnects if the underlying PostgreSQL connection drops.
pub async fn run_refresh_listener(pool: PgPool, config: Arc<AppConfig>) {
tracing::info!("Refresh listener started — listening on 'refresh_requested'");
loop {
if let Err(e) = listen_loop(&pool, &config).await {
tracing::error!(
error = %e,
"Refresh listener disconnected, reconnecting in 5s"
);
time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
/// Inner loop — returns `Err` only on a fatal listener error so the outer
/// loop can reconnect.
async fn listen_loop(pool: &PgPool, config: &AppConfig) -> anyhow::Result<()> {
let mut listener =
sqlx::postgres::PgListener::connect(&config.database.url).await?;
listener.listen("refresh_requested").await?;
tracing::debug!("Refresh listener connected and listening");
loop {
let notification = listener.recv().await?;
let payload = notification.payload().to_string();
tracing::info!(payload, "Refresh notification received");
let host_id = match payload.parse::<Uuid>() {
Ok(id) => id,
Err(e) => {
tracing::warn!(
payload,
error = %e,
"Refresh listener: invalid UUID in notification payload"
);
continue;
}
};
// Fetch the host from the database.
let host: Option<HostRow> = sqlx::query_as(
"SELECT id, ip_address::text AS ip_address, agent_port FROM hosts WHERE id = $1",
)
.bind(host_id)
.fetch_optional(pool)
.await
.unwrap_or(None);
let host = match host {
Some(h) => h,
None => {
tracing::warn!(%host_id, "Refresh listener: host not found");
continue;
}
};
// Load certs for this refresh.
let certs = match load_agent_certs(&config.security) {
Ok(c) => c,
Err(e) => {
tracing::error!(
%host_id,
error = %e,
"Refresh listener: failed to load agent certs"
);
continue;
}
};
// Spawn the actual work so the listener loop is not blocked.
let pool_clone = pool.clone();
let cert = certs.client_cert;
let key = certs.client_key;
let ca = certs.ca_cert;
tokio::spawn(async move {
refresh_host(pool_clone, host, &cert, &key, &ca).await;
});
}
}
/// Perform a full health + patch refresh for one host and persist results.
async fn refresh_host(
pool: PgPool,
host: HostRow,
client_cert: &[u8],
client_key: &[u8],
ca_cert: &[u8],
) {
let client = match AgentClient::new(
&host.ip_address,
host.agent_port as u16,
client_cert,
client_key,
ca_cert,
) {
Ok(c) => c,
Err(e) => {
tracing::warn!(
host_id = %host.id,
error = %e,
"Refresh: failed to build AgentClient"
);
persist_health_unreachable(&pool, host.id).await;
return;
}
};
// ── Health ────────────────────────────────────────────────────────────
let (health_status, health_payload) = match client.health().await {
Ok(data) => {
let payload = serde_json::to_value(&data).unwrap_or_default();
(HostHealthStatus::Healthy, payload)
}
Err(AgentClientError::Timeout) | Err(AgentClientError::Connect(_)) => {
tracing::warn!(host_id = %host.id, "Refresh: agent unreachable");
(HostHealthStatus::Unreachable, serde_json::Value::Object(Default::default()))
}
Err(e) => {
tracing::warn!(host_id = %host.id, error = %e, "Refresh: health error");
(HostHealthStatus::Degraded, serde_json::Value::Object(Default::default()))
}
};
persist_health(&pool, host.id, &health_status, &health_payload).await;
// ── Patch data ────────────────────────────────────────────────────────
let (patches_result, packages_result) =
tokio::join!(client.patches(), client.packages_upgradable());
match (patches_result, packages_result) {
(Ok(patches_data), Ok(packages_data)) => {
let available_patches =
serde_json::to_value(&patches_data.patches).unwrap_or_default();
let installed_packages =
serde_json::to_value(&packages_data.packages).unwrap_or_default();
let patch_count = patches_data.total as i32;
let cve_count = patches_data
.patches
.iter()
.filter(|p| !p.cve_ids.is_empty())
.count() as i32;
if let Err(e) = sqlx::query(
r#"
INSERT INTO host_patch_data
(host_id, available_patches, installed_packages, patch_count, cve_count)
VALUES ($1, $2, $3, $4, $5)
"#,
)
.bind(host.id)
.bind(&available_patches)
.bind(&installed_packages)
.bind(patch_count)
.bind(cve_count)
.execute(&pool)
.await
{
tracing::error!(
host_id = %host.id,
error = %e,
"Refresh: failed to insert patch data"
);
} else {
let _ = sqlx::query(
"UPDATE hosts SET last_patch_at = NOW() WHERE id = $1",
)
.bind(host.id)
.execute(&pool)
.await;
tracing::info!(
host_id = %host.id,
patch_count,
cve_count,
"On-demand refresh complete"
);
}
}
(Err(e), _) | (_, Err(e)) => {
tracing::warn!(
host_id = %host.id,
error = %e,
"Refresh: failed to collect patch data"
);
}
}
}
async fn persist_health_unreachable(pool: &PgPool, host_id: Uuid) {
let status = HostHealthStatus::Unreachable;
let payload = serde_json::Value::Object(Default::default());
persist_health(pool, host_id, &status, &payload).await;
}
async fn persist_health(
pool: &PgPool,
host_id: Uuid,
status: &HostHealthStatus,
payload: &serde_json::Value,
) {
if let Err(e) = sqlx::query(
r#"
INSERT INTO host_health_data (host_id, status, payload)
VALUES ($1, $2, $3)
"#,
)
.bind(host_id)
.bind(status)
.bind(payload)
.execute(pool)
.await
{
tracing::error!(
%host_id,
error = %e,
"Refresh: failed to insert health data"
);
}
if let Err(e) = sqlx::query(
"UPDATE hosts SET health_status = $2, last_health_at = NOW() WHERE id = $1",
)
.bind(host_id)
.bind(status)
.execute(pool)
.await
{
tracing::error!(%host_id, error = %e, "Refresh: failed to update host health_status");
}
}