From 12d640e5de70152d5d3122925970909b2c6c729e Mon Sep 17 00:00:00 2001 From: Echo Date: Wed, 6 May 2026 03:57:01 +0000 Subject: [PATCH] style: cargo fmt --all to fix CI format check --- crates/pm-agent-client/src/client.rs | 9 +- crates/pm-ca/src/ca.rs | 5 +- crates/pm-core/src/audit.rs | 5 +- crates/pm-core/src/config.rs | 4 +- crates/pm-core/src/lib.rs | 2 +- crates/pm-web/src/main.rs | 5 +- crates/pm-web/src/routes/health_checks.rs | 205 ++++++++++++-------- crates/pm-web/src/routes/hosts.rs | 5 +- crates/pm-web/src/routes/mod.rs | 2 +- crates/pm-worker/src/health_check_poller.rs | 78 ++++---- crates/pm-worker/src/job_executor.rs | 9 +- crates/pm-worker/src/ws_relay.rs | 7 +- 12 files changed, 199 insertions(+), 137 deletions(-) diff --git a/crates/pm-agent-client/src/client.rs b/crates/pm-agent-client/src/client.rs index 85dbb9d..c579e3b 100644 --- a/crates/pm-agent-client/src/client.rs +++ b/crates/pm-agent-client/src/client.rs @@ -223,15 +223,18 @@ impl AgentClient { /// `GET /api/v1/system/services/{name}` — check status of a specific service on the agent. #[instrument(skip(self), fields(base_url = %self.base_url, service_name = %service_name))] - pub async fn service_status(&self, service_name: &str) -> Result { - self.get(&format!("system/services/{}", service_name), &[]).await + pub async fn service_status( + &self, + service_name: &str, + ) -> Result { + self.get(&format!("system/services/{}", service_name), &[]) + .await } // -------------------------------------------------------- // Private POST helper // -------------------------------------------------------- - /// Execute a POST request against `{base_url}/{path}`, serialize `body` as /// JSON, deserialize the [`AgentEnvelope`], and extract the `data` field — /// or propagate an [`AgentClientError::ApiError`]. diff --git a/crates/pm-ca/src/ca.rs b/crates/pm-ca/src/ca.rs index 6da4983..348d0e7 100644 --- a/crates/pm-ca/src/ca.rs +++ b/crates/pm-ca/src/ca.rs @@ -301,8 +301,9 @@ impl CertAuthority { ); // Also issue a server certificate for the agent's TLS listener. - let (server_cert_pem, server_key_pem, server_serial_number, _server_expires_at) = - self.issue_server_cert(host_id, hostname, ip_address, db).await?; + let (server_cert_pem, server_key_pem, server_serial_number, _server_expires_at) = self + .issue_server_cert(host_id, hostname, ip_address, db) + .await?; Ok(IssuedCert { cert_pem, diff --git a/crates/pm-core/src/audit.rs b/crates/pm-core/src/audit.rs index b96bd8b..ab8deb3 100644 --- a/crates/pm-core/src/audit.rs +++ b/crates/pm-core/src/audit.rs @@ -287,7 +287,10 @@ pub async fn verify_integrity(pool: &PgPool) -> IntegrityResult { .unwrap_or_default(); let ip_str = row.ip_address.as_deref().unwrap_or(""); let rid = row.request_id.as_deref().unwrap_or(""); - let created_str = row.created_at.map(|c| c.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)).unwrap_or_default(); + let created_str = row + .created_at + .map(|c| c.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)) + .unwrap_or_default(); let mut hasher = Sha256::new(); hasher.update(row.prev_hash.as_bytes()); diff --git a/crates/pm-core/src/config.rs b/crates/pm-core/src/config.rs index 25f2165..d351099 100644 --- a/crates/pm-core/src/config.rs +++ b/crates/pm-core/src/config.rs @@ -101,7 +101,9 @@ impl AppConfig { } } -fn default_health_check_poll_interval() -> u64 { 300 } +fn default_health_check_poll_interval() -> u64 { + 300 +} impl Default for AppConfig { fn default() -> Self { diff --git a/crates/pm-core/src/lib.rs b/crates/pm-core/src/lib.rs index ae7bb68..2f84fc2 100644 --- a/crates/pm-core/src/lib.rs +++ b/crates/pm-core/src/lib.rs @@ -9,7 +9,7 @@ pub mod request_id; // Re-export commonly used types pub use config::AppConfig; -pub use crypto::{CryptoError, KEY_PATH, decrypt, encrypt, load_or_create_key}; +pub use crypto::{decrypt, encrypt, load_or_create_key, CryptoError, KEY_PATH}; pub use error::{AppError, ErrorResponse}; pub use models::{ AuthProvider, CreateGroupRequest, CreateHealthCheckRequest, CreateHostRequest, diff --git a/crates/pm-web/src/main.rs b/crates/pm-web/src/main.rs index 3484dd5..179f59c 100644 --- a/crates/pm-web/src/main.rs +++ b/crates/pm-web/src/main.rs @@ -189,7 +189,10 @@ pub fn build_router(state: AppState) -> Router { .merge(routes::ws::ticket_router()) // Reports .nest("/reports", routes::reports::router()) - .nest("/hosts/{host_id}/health-checks", routes::health_checks::router()) + .nest( + "/hosts/{host_id}/health-checks", + routes::health_checks::router(), + ) // Settings (admin-only) .nest("/settings", routes::settings::router()) // Apply auth middleware to all the above diff --git a/crates/pm-web/src/routes/health_checks.rs b/crates/pm-web/src/routes/health_checks.rs index fa90769..2dad360 100644 --- a/crates/pm-web/src/routes/health_checks.rs +++ b/crates/pm-web/src/routes/health_checks.rs @@ -23,11 +23,11 @@ use pm_core::{ UpdateHealthCheckRequest, }, }; +use reqwest::tls::Version; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::path::PathBuf; use uuid::Uuid; -use reqwest::tls::Version; use crate::AppState; @@ -85,13 +85,12 @@ async fn operator_can_access_host( } // Also allow if host has no groups (ungrouped) - let has_groups: bool = sqlx::query_scalar( - "SELECT EXISTS (SELECT 1 FROM host_groups WHERE host_id = $1)", - ) - .bind(host_id) - .fetch_one(pool) - .await - .unwrap_or(true); + let has_groups: bool = + sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM host_groups WHERE host_id = $1)") + .bind(host_id) + .fetch_one(pool) + .await + .unwrap_or(true); Ok(!has_groups) } @@ -117,25 +116,25 @@ async fn list_health_checks( if !can_access { return Err(( StatusCode::FORBIDDEN, - Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + Json( + json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } }), + ), )); } } // Verify host exists - let host_exists: bool = sqlx::query_scalar( - "SELECT EXISTS (SELECT 1 FROM hosts WHERE id = $1)", - ) - .bind(host_id) - .fetch_one(&state.db) - .await - .map_err(|e| { - tracing::error!(error = %e, "Failed to check host"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), - ) - })?; + let host_exists: bool = sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM hosts WHERE id = $1)") + .bind(host_id) + .fetch_one(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to check host"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; if !host_exists { return Err(( @@ -191,10 +190,7 @@ async fn list_health_checks( ) })?; - checks_with_results.push(HealthCheckWithResult { - check, - last_result, - }); + checks_with_results.push(HealthCheckWithResult { check, last_result }); } Ok(Json(HealthCheckListResponse { @@ -225,7 +221,9 @@ async fn create_health_check( if !can_access { return Err(( StatusCode::FORBIDDEN, - Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + Json( + json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } }), + ), )); } } @@ -234,7 +232,9 @@ async fn create_health_check( if req.check_type != "service" && req.check_type != "http" { return Err(( StatusCode::BAD_REQUEST, - Json(json!({ "error": { "code": "invalid_check_type", "message": "check_type must be 'service' or 'http'" } })), + Json( + json!({ "error": { "code": "invalid_check_type", "message": "check_type must be 'service' or 'http'" } }), + ), )); } @@ -242,13 +242,17 @@ async fn create_health_check( if req.check_type == "service" && req.service_name.is_none() { return Err(( StatusCode::BAD_REQUEST, - Json(json!({ "error": { "code": "validation_error", "message": "service_name is required for service checks" } })), + Json( + json!({ "error": { "code": "validation_error", "message": "service_name is required for service checks" } }), + ), )); } if req.check_type == "http" && (req.url.is_none() || req.expected_body.is_none()) { return Err(( StatusCode::BAD_REQUEST, - Json(json!({ "error": { "code": "validation_error", "message": "url and expected_body are required for http checks" } })), + Json( + json!({ "error": { "code": "validation_error", "message": "url and expected_body are required for http checks" } }), + ), )); } @@ -270,7 +274,9 @@ async fn create_health_check( if count >= 5 { return Err(( StatusCode::BAD_REQUEST, - Json(json!({ "error": { "code": "limit_exceeded", "message": "Maximum 5 health checks per host" } })), + Json( + json!({ "error": { "code": "limit_exceeded", "message": "Maximum 5 health checks per host" } }), + ), )); } @@ -288,7 +294,9 @@ async fn create_health_check( tracing::error!(error = %e, "Failed to encrypt password"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": { "code": "internal_error", "message": "Encryption error" } })), + Json( + json!({ "error": { "code": "internal_error", "message": "Encryption error" } }), + ), ) })?; (Some(enc), Some(nonce)) @@ -342,7 +350,8 @@ async fn create_health_check( }), None, None, - ).await; + ) + .await; Ok(( StatusCode::CREATED, @@ -377,7 +386,9 @@ async fn get_health_check( if !can_access { return Err(( StatusCode::FORBIDDEN, - Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + Json( + json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } }), + ), )); } } @@ -429,10 +440,7 @@ async fn get_health_check( ) })?; - Ok(Json(HealthCheckWithResult { - check, - last_result, - })) + Ok(Json(HealthCheckWithResult { check, last_result })) } // ── PUT /api/v1/hosts/{host_id}/health-checks/{check_id} ────────────────────── @@ -457,7 +465,9 @@ async fn update_health_check( if !can_access { return Err(( StatusCode::FORBIDDEN, - Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + Json( + json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } }), + ), )); } } @@ -499,7 +509,9 @@ async fn update_health_check( tracing::error!(error = %e, "Failed to encrypt password"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": { "code": "internal_error", "message": "Encryption error" } })), + Json( + json!({ "error": { "code": "internal_error", "message": "Encryption error" } }), + ), ) })?; (Some(enc), Some(nonce)) @@ -549,7 +561,9 @@ async fn update_health_check( if set_clauses.is_empty() { return Err(( StatusCode::BAD_REQUEST, - Json(json!({ "error": { "code": "validation_error", "message": "No fields to update" } })), + Json( + json!({ "error": { "code": "validation_error", "message": "No fields to update" } }), + ), )); } @@ -613,7 +627,8 @@ async fn update_health_check( json!({ "check_id": check_id }), None, None, - ).await; + ) + .await; Ok(Json(json!({ "id": check_id, "updated": true }))) } @@ -639,25 +654,25 @@ async fn delete_health_check( if !can_access { return Err(( StatusCode::FORBIDDEN, - Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + Json( + json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } }), + ), )); } } - let deleted = sqlx::query( - "DELETE FROM host_health_checks WHERE id = $1 AND host_id = $2", - ) - .bind(check_id) - .bind(host_id) - .execute(&state.db) - .await - .map_err(|e| { - tracing::error!(error = %e, "Failed to delete health check"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), - ) - })?; + let deleted = sqlx::query("DELETE FROM host_health_checks WHERE id = $1 AND host_id = $2") + .bind(check_id) + .bind(host_id) + .execute(&state.db) + .await + .map_err(|e| { + tracing::error!(error = %e, "Failed to delete health check"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": { "code": "internal_error", "message": "Database error" } })), + ) + })?; if deleted.rows_affected() == 0 { return Err(( @@ -677,7 +692,8 @@ async fn delete_health_check( json!({ "check_id": check_id }), None, None, - ).await; + ) + .await; Ok(StatusCode::NO_CONTENT) } @@ -703,7 +719,9 @@ async fn test_health_check( if !can_access { return Err(( StatusCode::FORBIDDEN, - Json(json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } })), + Json( + json!({ "error": { "code": "forbidden", "message": "Not authorized for this host" } }), + ), )); } } @@ -794,7 +812,7 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult detail: "No service_name configured".to_string(), latency_ms: None, } - } + }, }; // Get host info for agent connection @@ -815,11 +833,14 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult detail: "Host not found".to_string(), latency_ms: None, } - } + }, }; // Build agent URL - let agent_url = format!("https://{}:12443/api/v1/system/services/{}", ip, service_name); + let agent_url = format!( + "https://{}:12443/api/v1/system/services/{}", + ip, service_name + ); let start = std::time::Instant::now(); @@ -832,10 +853,15 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult detail: format!("Failed to build HTTP client: {}", e), latency_ms: None, } - } + }, }; - match client.get(&agent_url).timeout(std::time::Duration::from_secs(10)).send().await { + match client + .get(&agent_url) + .timeout(std::time::Duration::from_secs(10)) + .send() + .await + { Ok(resp) => { let latency = start.elapsed().as_millis() as i32; let status = resp.status(); @@ -847,11 +873,14 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult // Try to parse as ApiResponse if let Ok(api_resp) = serde_json::from_str::(&body) { if let Some(data) = api_resp.get("data") { - if let Some(healthy) = data.get("healthy").and_then(|v| v.as_bool()) { - let active_state = data.get("active_state") + if let Some(healthy) = data.get("healthy").and_then(|v| v.as_bool()) + { + let active_state = data + .get("active_state") .and_then(|v| v.as_str()) .unwrap_or("unknown"); - let sub_state = data.get("sub_state") + let sub_state = data + .get("sub_state") .and_then(|v| v.as_str()) .unwrap_or(""); return CheckResult { @@ -867,7 +896,7 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult detail: format!("Failed to parse agent response"), latency_ms: Some(latency), } - } + }, Err(e) => CheckResult { healthy: false, detail: format!("Failed to read response: {}", e), @@ -881,7 +910,7 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult latency_ms: Some(latency), } } - } + }, Err(e) => { let latency = start.elapsed().as_millis() as i32; if e.is_timeout() { @@ -897,7 +926,7 @@ async fn run_service_check(check: &HealthCheck, state: &AppState) -> CheckResult latency_ms: Some(latency), } } - } + }, } } @@ -910,7 +939,7 @@ async fn run_http_check(check: &HealthCheck, state: &AppState) -> CheckResult { detail: "No URL configured".to_string(), latency_ms: None, } - } + }, }; let expected = match &check.expected_body { @@ -921,7 +950,7 @@ async fn run_http_check(check: &HealthCheck, state: &AppState) -> CheckResult { detail: "No expected_body configured".to_string(), latency_ms: None, } - } + }, }; // Build HTTP client @@ -983,14 +1012,14 @@ async fn run_http_check(check: &HealthCheck, state: &AppState) -> CheckResult { }, latency_ms: Some(latency), } - } + }, Err(e) => CheckResult { healthy: false, detail: format!("Failed to read response: {}", e), latency_ms: Some(latency), }, } - } + }, Err(e) => { let latency = start.elapsed().as_millis() as i32; if e.is_timeout() { @@ -1006,7 +1035,7 @@ async fn run_http_check(check: &HealthCheck, state: &AppState) -> CheckResult { latency_ms: Some(latency), } } - } + }, } } @@ -1030,20 +1059,32 @@ fn build_agent_http_client(state: &AppState) -> Result ); // Add CA cert (mandatory since we disabled built-in root certs) - let ca_pem = std::fs::read(ca_cert_path).map_err(|e| format!("Read CA cert {}: {}", ca_cert_path, e))?; + let ca_pem = + std::fs::read(ca_cert_path).map_err(|e| format!("Read CA cert {}: {}", ca_cert_path, e))?; tracing::info!(ca_pem_len = ca_pem.len(), "CA cert read"); - let ca = reqwest::Certificate::from_pem(&ca_pem).map_err(|e| format!("Parse CA cert: {}", e))?; + let ca = + reqwest::Certificate::from_pem(&ca_pem).map_err(|e| format!("Parse CA cert: {}", e))?; builder = builder.add_root_certificate(ca); // Add client cert + key for mTLS let client_cert_exists = std::path::Path::new(client_cert_path).exists(); let client_key_exists = std::path::Path::new(client_key_path).exists(); - tracing::info!(client_cert_exists, client_key_exists, "Checking client cert files"); + tracing::info!( + client_cert_exists, + client_key_exists, + "Checking client cert files" + ); if client_cert_exists && client_key_exists { - let client_pem = std::fs::read(client_cert_path).map_err(|e| format!("Read client cert: {}", e))?; - let key_pem = std::fs::read(client_key_path).map_err(|e| format!("Read client key: {}", e))?; - tracing::info!(cert_len = client_pem.len(), key_len = key_pem.len(), "Client cert/key read"); + let client_pem = + std::fs::read(client_cert_path).map_err(|e| format!("Read client cert: {}", e))?; + let key_pem = + std::fs::read(client_key_path).map_err(|e| format!("Read client key: {}", e))?; + tracing::info!( + cert_len = client_pem.len(), + key_len = key_pem.len(), + "Client cert/key read" + ); let mut combined = Vec::new(); combined.extend_from_slice(&client_pem); combined.extend_from_slice(&key_pem); @@ -1062,6 +1103,6 @@ fn build_agent_http_client(state: &AppState) -> Result Err(e) => { tracing::error!(error = %e, "Failed to build reqwest client"); Err(format!("Build client: {}", e)) - } + }, } } diff --git a/crates/pm-web/src/routes/hosts.rs b/crates/pm-web/src/routes/hosts.rs index 513388e..226c090 100644 --- a/crates/pm-web/src/routes/hosts.rs +++ b/crates/pm-web/src/routes/hosts.rs @@ -31,7 +31,10 @@ pub fn router() -> Router { Router::new() .route("/", get(list_hosts).post(register_host)) .route("/{id}", get(get_host).delete(remove_host)) - .route("/{id}/groups", get(list_host_groups).post(add_host_to_group)) + .route( + "/{id}/groups", + get(list_host_groups).post(add_host_to_group), + ) .route("/{id}/groups/{group_id}", delete(remove_host_from_group)) .route("/{id}/refresh", post(refresh_host)) } diff --git a/crates/pm-web/src/routes/mod.rs b/crates/pm-web/src/routes/mod.rs index cc2213c..07c1f50 100644 --- a/crates/pm-web/src/routes/mod.rs +++ b/crates/pm-web/src/routes/mod.rs @@ -4,6 +4,7 @@ pub mod azure_sso; pub mod ca; pub mod discovery; pub mod groups; +pub mod health_checks; pub mod hosts; pub mod jobs; pub mod maintenance_windows; @@ -11,6 +12,5 @@ pub mod settings; pub mod status; pub mod users; pub mod ws; -pub mod health_checks; pub mod reports; diff --git a/crates/pm-worker/src/health_check_poller.rs b/crates/pm-worker/src/health_check_poller.rs index 093b0e7..361b6c3 100644 --- a/crates/pm-worker/src/health_check_poller.rs +++ b/crates/pm-worker/src/health_check_poller.rs @@ -263,34 +263,32 @@ async fn run_service_check( let detail = if data.healthy { format!( "Service '{}' is {}/{} (enabled: {})", - data.name, - data.active_state, - data.sub_state, - data.enabled_state + data.name, data.active_state, data.sub_state, data.enabled_state ) } else { format!( "Service '{}' status: {}/{} (unhealthy, enabled: {})", - data.name, data.active_state, - data.sub_state, - data.enabled_state + data.name, data.active_state, data.sub_state, data.enabled_state ) }; (data.healthy, detail) }, - Err(AgentClientError::Timeout) => { - (false, format!("Agent timed out querying service '{service_name}'")) - }, - Err(AgentClientError::Connect(_)) => { - (false, format!("Agent connection refused for service '{service_name}'")) - }, + Err(AgentClientError::Timeout) => ( + false, + format!("Agent timed out querying service '{service_name}'"), + ), + Err(AgentClientError::Connect(_)) => ( + false, + format!("Agent connection refused for service '{service_name}'"), + ), Err(AgentClientError::ApiError { code, message }) => { // 404, 400, 500 etc. from the agent means the service is unhealthy. (false, format!("Agent error [{code}]: {message}")) }, - Err(e) => { - (false, format!("Agent error querying service '{service_name}': {e}")) - }, + Err(e) => ( + false, + format!("Agent error querying service '{service_name}': {e}"), + ), } } @@ -300,10 +298,7 @@ async fn run_service_check( /// Execute an HTTP check by making a GET request to the configured URL. /// Supports optional basic auth (decrypted from DB) and substring body matching. -async fn run_http_check( - check: &HealthCheckRow, - crypto_key: &[u8; 32], -) -> (bool, String) { +async fn run_http_check(check: &HealthCheckRow, crypto_key: &[u8; 32]) -> (bool, String) { let url = match &check.url { Some(u) => u.clone(), None => { @@ -325,7 +320,9 @@ async fn run_http_check( .build() .unwrap_or_else(|_| reqwest::Client::new()) } else { - client_builder.build().unwrap_or_else(|_| reqwest::Client::new()) + client_builder + .build() + .unwrap_or_else(|_| reqwest::Client::new()) }; // Build the request. @@ -334,21 +331,22 @@ async fn run_http_check( // Add basic auth if configured. if let Some(user) = &check.basic_auth_user { // Decrypt the password if present. - let password = match (&check.basic_auth_pass_encrypted, &check.basic_auth_pass_nonce) { - (Some(enc), Some(nonce)) => { - match crypto::decrypt(enc, nonce, crypto_key) { - Ok(p) => p, - Err(e) => { - return ( - false, - format!("Failed to decrypt basic auth password: {e}"), - ); - }, - } + let password = match ( + &check.basic_auth_pass_encrypted, + &check.basic_auth_pass_nonce, + ) { + (Some(enc), Some(nonce)) => match crypto::decrypt(enc, nonce, crypto_key) { + Ok(p) => p, + Err(e) => { + return (false, format!("Failed to decrypt basic auth password: {e}")); + }, }, _ => { // No encrypted password stored — treat as missing credentials. - return (false, "HTTP check has basic_auth_user but no encrypted password".to_string()); + return ( + false, + "HTTP check has basic_auth_user but no encrypted password".to_string(), + ); }, }; request = request.basic_auth(user.as_str(), Some(password.as_str())); @@ -382,7 +380,10 @@ async fn run_http_check( let body = match response.text().await { Ok(b) => b, Err(e) => { - return (false, format!("HTTP check failed to read response body: {e}")); + return ( + false, + format!("HTTP check failed to read response body: {e}"), + ); }, }; @@ -391,14 +392,15 @@ async fn run_http_check( if !body.contains(expected) { return ( false, - format!( - "HTTP check body mismatch for {url}: expected substring not found" - ), + format!("HTTP check body mismatch for {url}: expected substring not found"), ); } } - (true, format!("HTTP check OK for {url} (status {})", status.as_u16())) + ( + true, + format!("HTTP check OK for {url} (status {})", status.as_u16()), + ) } // ───────────────────────────────────────────────────────────────────────────── diff --git a/crates/pm-worker/src/job_executor.rs b/crates/pm-worker/src/job_executor.rs index 47a77e8..2777503 100644 --- a/crates/pm-worker/src/job_executor.rs +++ b/crates/pm-worker/src/job_executor.rs @@ -870,7 +870,11 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) { let new_status: &str; let set_completed: bool; - if counts.running_count > 0 || counts.pending_count > 0 || counts.queued_count > 0 || counts.waiting_health_check_count > 0 { + if counts.running_count > 0 + || counts.pending_count > 0 + || counts.queued_count > 0 + || counts.waiting_health_check_count > 0 + { // Still work in flight — keep parent running. new_status = "running"; set_completed = false; @@ -1009,7 +1013,8 @@ pub async fn retry_pending_jobs(pool: PgPool, config: Arc) { WHERE pjh.status IN ('pending', 'waiting_health_check') AND pjh.retry_next_at <= NOW() AND j.status != 'cancelled' - "#,) + "#, + ) .fetch_all(&pool) .await { diff --git a/crates/pm-worker/src/ws_relay.rs b/crates/pm-worker/src/ws_relay.rs index 6fb8c9d..c45d7e8 100644 --- a/crates/pm-worker/src/ws_relay.rs +++ b/crates/pm-worker/src/ws_relay.rs @@ -19,9 +19,9 @@ use tokio::sync::Mutex; use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::protocol::Message, Connector}; use uuid::Uuid; +use pm_agent_client::client::AgentClient; use pm_agent_client::client::DEFAULT_AGENT_PORT; use pm_core::config::AppConfig; -use pm_agent_client::client::AgentClient; // ── Types ───────────────────────────────────────────────────────────────────── @@ -48,7 +48,7 @@ struct AgentWsEvent { /// Payload broadcast via `pg_notify('job_update', …)`. #[derive(Debug, Serialize)] struct NotifyPayload { - event_type: String, // "host" or "job" + event_type: String, // "host" or "job" job_id: String, host_id: String, status: String, @@ -254,7 +254,6 @@ async fn build_tls_config(config: &AppConfig) -> anyhow::Result Ok(config) } - // ── Per-job relay ───────────────────────────────────────────────────────────── async fn relay_one_job( @@ -681,7 +680,7 @@ async fn update_parent_job_status(pool: &PgPool, job_id: Uuid) { let payload = NotifyPayload { event_type: "job".to_string(), job_id: job_id.to_string(), - host_id: String::new(), // no specific host for job-level events + host_id: String::new(), // no specific host for job-level events status: final_status.to_string(), output: None, error_message: None,