feat(M11+M12): Email notifications, audit hardening, deployment packaging, backup/DR, integration testing
M11 - Email Notifications + Audit Logging Hardening: - Email notifier (lettre crate) with templates for patch failure, job completion, maintenance reminders - Audit log hash chaining (prev_hash + row_hash) for tamper-evident logging - Periodic + on-demand audit integrity verification - Audit logging for all config changes and certificate operations - Frontend: email settings integration, audit integrity verification action M12 - Deployment Packaging, Backup/DR, Integration Testing: - scripts/backup.sh: Nightly pg_dump, CA backup (GPG), config backup (secrets excluded unless encrypted) - scripts/setup.sh: Enhanced with backup dir, seed migration, backup cron, systemd target install - systemd units: Restart=always, WatchdogSec, ReadWritePaths, security hardening - systemd/patch-manager.target: Service target for coordinated lifecycle - docs/runbooks/restore.md: Full DR runbook with RPO 24h / RTO 4h targets - scripts/integration-test.sh: 9 test suites covering full API lifecycle - scripts/performance-test.sh: NFR validation (dashboard <5s, CIDR /22 <10s, API <2s) - docs/security-review.md: Comprehensive security control verification - docs/compliance-mapping.md: HIPAA (6 sections) + PCI-DSS v4.0 (9 requirements) mapped
This commit is contained in:
@ -1,7 +1,14 @@
|
||||
//! Audit log helper functions.
|
||||
//!
|
||||
//! Writes tamper-evident, hash-chained audit events to the `audit_log` table.
|
||||
//! The hash chain: each row's `row_hash` = SHA-256(prev_row_hash || action || target_id || created_at).
|
||||
//! The hash chain: each row's `row_hash` = SHA-256(
|
||||
//! prev_hash || action || actor_user_id || actor_username ||
|
||||
//! target_type || target_id || details_json || ip_address ||
|
||||
//! request_id || created_at
|
||||
//! ).
|
||||
//!
|
||||
//! The `prev_hash` column stores the previous row's `row_hash` for chain
|
||||
//! verification. The first row has `prev_hash = ''`.
|
||||
|
||||
use sha2::{Digest, Sha256};
|
||||
use sqlx::PgPool;
|
||||
@ -34,6 +41,12 @@ pub enum AuditAction {
|
||||
CertificateDownloaded,
|
||||
ConfigChanged,
|
||||
DiscoveryScanStarted,
|
||||
// M11 additions
|
||||
AuditIntegrityVerified,
|
||||
EmailNotificationSent,
|
||||
PatchJobCompleted,
|
||||
PatchJobFailed,
|
||||
MaintenanceWindowReminder,
|
||||
}
|
||||
|
||||
impl AuditAction {
|
||||
@ -62,6 +75,11 @@ impl AuditAction {
|
||||
Self::CertificateDownloaded => "certificate_downloaded",
|
||||
Self::ConfigChanged => "config_changed",
|
||||
Self::DiscoveryScanStarted => "discovery_scan_started",
|
||||
Self::AuditIntegrityVerified => "audit_integrity_verified",
|
||||
Self::EmailNotificationSent => "email_notification_sent",
|
||||
Self::PatchJobCompleted => "patch_job_completed",
|
||||
Self::PatchJobFailed => "patch_job_failed",
|
||||
Self::MaintenanceWindowReminder => "maintenance_window_reminder",
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -114,25 +132,39 @@ async fn write_audit_row(
|
||||
let prev = prev_hash.unwrap_or_default();
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let action_str = action.as_str();
|
||||
let uid_str = actor_user_id.map(|u| u.to_string()).unwrap_or_default();
|
||||
let uname = actor_username.unwrap_or("");
|
||||
let ttype = target_type.unwrap_or("");
|
||||
let tid = target_id.unwrap_or("");
|
||||
let details_str = serde_json::to_string(&details).unwrap_or_default();
|
||||
let ip_str = ip_address.map(|ip| ip.to_string()).unwrap_or_default();
|
||||
let rid = request_id.unwrap_or("");
|
||||
|
||||
// Hash: SHA-256(prev_hash + action + target_id + timestamp)
|
||||
// Hash: SHA-256(prev_hash + action + actor_user_id + actor_username +
|
||||
// target_type + target_id + details_json + ip_address +
|
||||
// request_id + created_at)
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(prev.as_bytes());
|
||||
hasher.update(action_str.as_bytes());
|
||||
hasher.update(uid_str.as_bytes());
|
||||
hasher.update(uname.as_bytes());
|
||||
hasher.update(ttype.as_bytes());
|
||||
hasher.update(tid.as_bytes());
|
||||
hasher.update(details_str.as_bytes());
|
||||
hasher.update(ip_str.as_bytes());
|
||||
hasher.update(rid.as_bytes());
|
||||
hasher.update(now.as_bytes());
|
||||
let row_hash = hex::encode(hasher.finalize());
|
||||
|
||||
let ip_str = ip_address.map(|ip| ip.to_string());
|
||||
let ip_for_db = ip_address.map(|ip| ip.to_string());
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO audit_log
|
||||
(action, actor_user_id, actor_username, target_type, target_id,
|
||||
details, ip_address, request_id, row_hash)
|
||||
details, ip_address, request_id, created_at, row_hash, prev_hash)
|
||||
VALUES
|
||||
($1::audit_action, $2, $3, $4, $5, $6, $7::inet, $8, $9)
|
||||
($1::audit_action, $2, $3, $4, $5, $6, $7::inet, $8, $9::timestamptz, $10, $11)
|
||||
"#,
|
||||
)
|
||||
.bind(action_str)
|
||||
@ -141,11 +173,142 @@ async fn write_audit_row(
|
||||
.bind(target_type)
|
||||
.bind(target_id)
|
||||
.bind(details)
|
||||
.bind(ip_str)
|
||||
.bind(ip_for_db)
|
||||
.bind(request_id)
|
||||
.bind(&now)
|
||||
.bind(&row_hash)
|
||||
.bind(&prev)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Result of an audit integrity verification pass.
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
pub struct IntegrityResult {
|
||||
/// Whether the chain is intact (no tampering detected).
|
||||
pub intact: bool,
|
||||
/// Total number of rows checked.
|
||||
pub rows_checked: i64,
|
||||
/// List of errors found (row id, expected hash, actual hash).
|
||||
pub errors: Vec<IntegrityError>,
|
||||
}
|
||||
|
||||
/// A single integrity error detected in the audit chain.
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
pub struct IntegrityError {
|
||||
pub row_id: i64,
|
||||
pub expected_hash: String,
|
||||
pub actual_hash: String,
|
||||
}
|
||||
|
||||
/// Row read from audit_log for integrity verification.
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
struct AuditRow {
|
||||
id: i64,
|
||||
action: String,
|
||||
actor_user_id: Option<uuid::Uuid>,
|
||||
actor_username: Option<String>,
|
||||
target_type: Option<String>,
|
||||
target_id: Option<String>,
|
||||
details: Option<serde_json::Value>,
|
||||
ip_address: Option<String>,
|
||||
request_id: Option<String>,
|
||||
created_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
row_hash: String,
|
||||
prev_hash: String,
|
||||
}
|
||||
|
||||
/// Walk the audit_log rows ordered by id and verify each row_hash matches
|
||||
/// the recomputed hash. Returns an [`IntegrityResult`] describing any
|
||||
/// tampering detected.
|
||||
pub async fn verify_integrity(pool: &PgPool) -> IntegrityResult {
|
||||
let rows: Vec<AuditRow> = match sqlx::query_as(
|
||||
r#"
|
||||
SELECT id, action::text AS action, actor_user_id, actor_username,
|
||||
target_type, target_id, details,
|
||||
host(ip_address) AS ip_address,
|
||||
request_id, created_at, row_hash, prev_hash
|
||||
FROM audit_log
|
||||
ORDER BY id ASC
|
||||
"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "verify_integrity: failed to fetch audit rows");
|
||||
return IntegrityResult {
|
||||
intact: false,
|
||||
rows_checked: 0,
|
||||
errors: vec![],
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let mut errors = Vec::new();
|
||||
let mut expected_prev_hash = String::new();
|
||||
|
||||
for row in &rows {
|
||||
// Verify prev_hash linkage
|
||||
if row.prev_hash != expected_prev_hash {
|
||||
errors.push(IntegrityError {
|
||||
row_id: row.id,
|
||||
expected_hash: expected_prev_hash.clone(),
|
||||
actual_hash: row.prev_hash.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// Recompute the row hash from all fields
|
||||
let uid_str = row.actor_user_id.map(|u| u.to_string()).unwrap_or_default();
|
||||
let uname = row.actor_username.as_deref().unwrap_or("");
|
||||
let ttype = row.target_type.as_deref().unwrap_or("");
|
||||
let tid = row.target_id.as_deref().unwrap_or("");
|
||||
let details_str = row
|
||||
.details
|
||||
.as_ref()
|
||||
.and_then(|v| serde_json::to_string(v).ok())
|
||||
.unwrap_or_default();
|
||||
let ip_str = row.ip_address.as_deref().unwrap_or("");
|
||||
let rid = row.request_id.as_deref().unwrap_or("");
|
||||
let created_str = row
|
||||
.created_at
|
||||
.map(|c| c.to_rfc3339())
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(row.prev_hash.as_bytes());
|
||||
hasher.update(row.action.as_bytes());
|
||||
hasher.update(uid_str.as_bytes());
|
||||
hasher.update(uname.as_bytes());
|
||||
hasher.update(ttype.as_bytes());
|
||||
hasher.update(tid.as_bytes());
|
||||
hasher.update(details_str.as_bytes());
|
||||
hasher.update(ip_str.as_bytes());
|
||||
hasher.update(rid.as_bytes());
|
||||
hasher.update(created_str.as_bytes());
|
||||
let computed_hash = hex::encode(hasher.finalize());
|
||||
|
||||
if row.row_hash != computed_hash {
|
||||
errors.push(IntegrityError {
|
||||
row_id: row.id,
|
||||
expected_hash: computed_hash,
|
||||
actual_hash: row.row_hash.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// Next row should have this row's hash as prev_hash
|
||||
expected_prev_hash = row.row_hash.clone();
|
||||
}
|
||||
|
||||
let intact = errors.is_empty();
|
||||
let rows_checked = rows.len() as i64;
|
||||
|
||||
IntegrityResult {
|
||||
intact,
|
||||
rows_checked,
|
||||
errors,
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,3 +15,6 @@ pub use models::{
|
||||
User, UserRole as DbUserRole, AuthProvider, CreateUserRequest, UpdateUserRequest,
|
||||
DiscoveryResult, DiscoveryCidrRequest, RegisterDiscoveredRequest,
|
||||
};
|
||||
|
||||
// Re-export audit integrity types
|
||||
pub use audit::{verify_integrity, IntegrityResult, IntegrityError};
|
||||
|
||||
@ -22,6 +22,7 @@ use axum::{
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
use pm_auth::rbac::AuthUser;
|
||||
use pm_core::audit::{log_event, AuditAction};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
use uuid::Uuid;
|
||||
@ -129,9 +130,23 @@ fn db_error(e: sqlx::Error) -> (StatusCode, Json<Value>) {
|
||||
/// Download the root CA certificate as a PEM file.
|
||||
async fn download_root_ca(
|
||||
State(state): State<AppState>,
|
||||
_auth: AuthUser,
|
||||
auth: AuthUser,
|
||||
) -> Result<Response<Body>, (StatusCode, Json<Value>)> {
|
||||
let pem = state.ca.root_cert_pem().to_owned();
|
||||
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::CertificateDownloaded,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("certificate"),
|
||||
Some("root_ca"),
|
||||
json!({ "operation": "download_root_ca" }),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
pem_response(pem, "ca.crt")
|
||||
}
|
||||
|
||||
@ -230,7 +245,21 @@ async fn download_client_cert(
|
||||
})?;
|
||||
|
||||
match cert_pem {
|
||||
Some(pem) => pem_response(pem, "client.crt"),
|
||||
Some(pem) => {
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::CertificateDownloaded,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("certificate"),
|
||||
Some(&host_id.to_string()),
|
||||
json!({ "operation": "download_client_cert" }),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
pem_response(pem, "client.crt")
|
||||
}
|
||||
None => Err((
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(json!({
|
||||
@ -268,6 +297,19 @@ async fn issue_client_cert(
|
||||
)
|
||||
})?;
|
||||
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::CertificateIssued,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("certificate"),
|
||||
Some(&host_id.to_string()),
|
||||
json!({ "hostname": req.hostname, "serial_number": issued.serial_number }),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(json!({
|
||||
"cert_pem": issued.cert_pem,
|
||||
"key_pem": issued.key_pem,
|
||||
@ -306,6 +348,19 @@ async fn renew_cert(
|
||||
}
|
||||
})?;
|
||||
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::CertificateRenewed,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("certificate"),
|
||||
Some(&cert_id.to_string()),
|
||||
json!({ "serial_number": issued.serial_number }),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(json!({
|
||||
"cert_pem": issued.cert_pem,
|
||||
"key_pem": issued.key_pem,
|
||||
@ -345,5 +400,19 @@ async fn revoke_cert(
|
||||
})?;
|
||||
|
||||
tracing::info!(%cert_id, "Certificate revoked via API");
|
||||
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::CertificateRevoked,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("certificate"),
|
||||
Some(&cert_id.to_string()),
|
||||
json!({ "operation": "revoke" }),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(json!({ "revoked": true })))
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@
|
||||
//! POST /api/v1/settings/smtp/test — send test email (admin only)
|
||||
//! GET /api/v1/settings/ip-whitelist — get IP whitelist (admin only)
|
||||
//! PUT /api/v1/settings/ip-whitelist — update IP whitelist (admin only)
|
||||
//! POST /api/v1/settings/audit-integrity — verify audit log integrity (admin only)
|
||||
|
||||
use axum::{
|
||||
extract::State,
|
||||
@ -19,7 +20,7 @@ use lettre::{
|
||||
transport::smtp::authentication::Credentials,
|
||||
AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
|
||||
};
|
||||
use pm_core::audit::{log_event, AuditAction};
|
||||
use pm_core::audit::{log_event, verify_integrity, AuditAction};
|
||||
use pm_auth::rbac::AuthUser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
@ -38,6 +39,7 @@ pub struct SettingsResponse {
|
||||
pub polling: PollingConfig,
|
||||
pub ip_whitelist: Vec<String>,
|
||||
pub web_tls_strategy: String,
|
||||
pub notification: NotificationConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@ -72,6 +74,21 @@ pub struct UpdateSettingsRequest {
|
||||
pub polling: Option<PollingConfigUpdate>,
|
||||
pub ip_whitelist: Option<Vec<String>>,
|
||||
pub web_tls_strategy: Option<String>,
|
||||
pub notification: Option<NotificationConfigUpdate>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NotificationConfig {
|
||||
pub email_enabled: bool,
|
||||
pub email_from: String,
|
||||
pub recipients: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct NotificationConfigUpdate {
|
||||
pub email_enabled: Option<bool>,
|
||||
pub email_from: Option<String>,
|
||||
pub recipients: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@ -116,6 +133,7 @@ pub fn router() -> Router<AppState> {
|
||||
.route("/azure-sso/test", post(test_azure_sso))
|
||||
.route("/smtp/test", post(test_smtp))
|
||||
.route("/ip-whitelist", get(get_ip_whitelist).put(update_ip_whitelist))
|
||||
.route("/audit-integrity", post(audit_integrity))
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
@ -156,6 +174,8 @@ async fn load_system_config(
|
||||
fn build_settings_response(cfg: &HashMap<String, String>, azure: AzureSsoConfig) -> SettingsResponse {
|
||||
let get = |key: &str| -> String { cfg.get(key).cloned().unwrap_or_default() };
|
||||
|
||||
let recipients: Vec<String> = serde_json::from_str(&get("notification_email_recipients")).unwrap_or_default();
|
||||
|
||||
SettingsResponse {
|
||||
azure_sso: azure,
|
||||
smtp: SmtpConfig {
|
||||
@ -172,6 +192,11 @@ fn build_settings_response(cfg: &HashMap<String, String>, azure: AzureSsoConfig)
|
||||
},
|
||||
ip_whitelist: serde_json::from_str(&get("ip_whitelist")).unwrap_or_default(),
|
||||
web_tls_strategy: get("web_tls_strategy"),
|
||||
notification: NotificationConfig {
|
||||
email_enabled: get("notification_email_enabled") == "true",
|
||||
email_from: get("notification_email_from"),
|
||||
recipients,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,6 +454,33 @@ async fn update_settings(
|
||||
.await;
|
||||
}
|
||||
|
||||
// Update notification config
|
||||
if let Some(notif) = &req.notification {
|
||||
if let Some(v) = notif.email_enabled {
|
||||
update_config_key(&state.db, "notification_email_enabled", &v.to_string()).await?;
|
||||
}
|
||||
if let Some(ref v) = notif.email_from {
|
||||
update_config_key(&state.db, "notification_email_from", v).await?;
|
||||
}
|
||||
if let Some(ref v) = notif.recipients {
|
||||
let json_str = serde_json::to_string(v).unwrap_or_else(|_| "[]".to_string());
|
||||
update_config_key(&state.db, "notification_email_recipients", &json_str).await?;
|
||||
}
|
||||
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::ConfigChanged,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("notification"),
|
||||
Some("system_config"),
|
||||
json!({ "section": "notification" }),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Return updated settings
|
||||
let cfg = load_system_config(&state.db).await?;
|
||||
let azure = fetch_azure_sso_config(&state.db).await?;
|
||||
@ -689,6 +741,47 @@ async fn update_ip_whitelist(
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(json!({ "entries": req.entries })))
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// POST /api/v1/settings/audit-integrity
|
||||
// ============================================================
|
||||
|
||||
/// Verify audit log hash chain integrity.
|
||||
/// Returns whether the chain is intact, rows checked, and any errors.
|
||||
async fn audit_integrity(
|
||||
State(state): State<AppState>,
|
||||
auth: AuthUser,
|
||||
) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
|
||||
admin_only(&auth)?;
|
||||
|
||||
let result = verify_integrity(&state.db).await;
|
||||
|
||||
log_event(
|
||||
&state.db,
|
||||
AuditAction::AuditIntegrityVerified,
|
||||
Some(auth.user_id),
|
||||
Some(&auth.username),
|
||||
Some("audit_log"),
|
||||
None,
|
||||
json!({
|
||||
"intact": result.intact,
|
||||
"rows_checked": result.rows_checked,
|
||||
"error_count": result.errors.len(),
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(json!({
|
||||
"intact": result.intact,
|
||||
"rows_checked": result.rows_checked,
|
||||
"errors": result.errors.iter().map(|e| json!({
|
||||
"row_id": e.row_id,
|
||||
"expected_hash": e.expected_hash,
|
||||
"actual_hash": e.actual_hash,
|
||||
})).collect::<Vec<_>>(),
|
||||
})))
|
||||
}
|
||||
|
||||
@ -27,3 +27,4 @@ rustls = { workspace = true }
|
||||
tokio-rustls = { version = "0.26" }
|
||||
rustls-pemfile = { version = "2" }
|
||||
tokio-tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots"] }
|
||||
lettre = { version = "0.11", default-features = false, features = ["tokio1-rustls-tls", "smtp-transport", "builder"] }
|
||||
|
||||
86
crates/pm-worker/src/audit_verifier.rs
Normal file
86
crates/pm-worker/src/audit_verifier.rs
Normal file
@ -0,0 +1,86 @@
|
||||
//! Periodic audit log integrity verification.
|
||||
//!
|
||||
//! Runs every 24 hours, walks the audit_log rows ordered by id,
|
||||
//! verifies each row_hash matches the recomputed hash, and logs the
|
||||
//! result as an `AuditIntegrityVerified` event. If tampering is
|
||||
//! detected, logs an error and creates an alert.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use sqlx::PgPool;
|
||||
|
||||
use pm_core::audit::{log_event, verify_integrity, AuditAction};
|
||||
use pm_core::config::AppConfig;
|
||||
|
||||
/// Run the audit integrity verifier every 24 hours.
|
||||
pub async fn run_audit_verifier(pool: PgPool, _config: Arc<AppConfig>) {
|
||||
tracing::info!("Audit integrity verifier started");
|
||||
|
||||
// Run immediately on startup
|
||||
verify_once(&pool).await;
|
||||
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(24 * 60 * 60));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
tracing::info!("Running scheduled audit integrity verification");
|
||||
verify_once(&pool).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a single integrity verification pass.
|
||||
async fn verify_once(pool: &PgPool) {
|
||||
let result = verify_integrity(pool).await;
|
||||
|
||||
if result.intact {
|
||||
tracing::info!(
|
||||
rows_checked = result.rows_checked,
|
||||
"Audit integrity verification passed"
|
||||
);
|
||||
} else {
|
||||
tracing::error!(
|
||||
rows_checked = result.rows_checked,
|
||||
error_count = result.errors.len(),
|
||||
"Audit integrity verification FAILED — tampering detected!"
|
||||
);
|
||||
|
||||
for err in &result.errors {
|
||||
tracing::error!(
|
||||
row_id = err.row_id,
|
||||
expected_hash = %err.expected_hash,
|
||||
actual_hash = %err.actual_hash,
|
||||
"Audit chain integrity error"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Log the verification event
|
||||
log_event(
|
||||
pool,
|
||||
AuditAction::AuditIntegrityVerified,
|
||||
None,
|
||||
None,
|
||||
Some("audit_log"),
|
||||
None,
|
||||
serde_json::json!({
|
||||
"intact": result.intact,
|
||||
"rows_checked": result.rows_checked,
|
||||
"error_count": result.errors.len(),
|
||||
"errors": result.errors.iter().take(10).map(|e| serde_json::json!({
|
||||
"row_id": e.row_id,
|
||||
"expected_hash": e.expected_hash,
|
||||
"actual_hash": e.actual_hash,
|
||||
})).collect::<Vec<_>>(),
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Update last verified timestamp
|
||||
let _ = sqlx::query(
|
||||
"UPDATE system_config SET value = NOW()::text, updated_at = NOW() WHERE key = 'audit_integrity_last_verified'",
|
||||
)
|
||||
.execute(pool)
|
||||
.await;
|
||||
}
|
||||
332
crates/pm-worker/src/email.rs
Normal file
332
crates/pm-worker/src/email.rs
Normal file
@ -0,0 +1,332 @@
|
||||
//! Email notification module.
|
||||
//!
|
||||
//! Loads SMTP configuration from `system_config` and sends notification emails
|
||||
//! for patch job events (completion, failure) and maintenance window reminders.
|
||||
//! All emails are optional and disabled by default via `notification_email_enabled`.
|
||||
|
||||
use lettre::{
|
||||
message::{header::ContentType, Mailbox},
|
||||
transport::smtp::authentication::Credentials,
|
||||
AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
|
||||
};
|
||||
use serde_json;
|
||||
use sqlx::PgPool;
|
||||
|
||||
use pm_core::audit::{log_event, AuditAction};
|
||||
|
||||
/// SMTP configuration loaded from `system_config`.
|
||||
struct SmtpSettings {
|
||||
enabled: bool,
|
||||
host: String,
|
||||
port: u16,
|
||||
username: String,
|
||||
password: String,
|
||||
from: String,
|
||||
tls_mode: String,
|
||||
}
|
||||
|
||||
/// Notification preferences loaded from `system_config`.
|
||||
struct NotificationSettings {
|
||||
email_enabled: bool,
|
||||
email_from: String,
|
||||
recipients: Vec<String>,
|
||||
}
|
||||
|
||||
/// Load SMTP settings from the `system_config` table.
|
||||
async fn load_smtp_settings(pool: &PgPool) -> SmtpSettings {
|
||||
let rows: Vec<(String, String)> = sqlx::query_as(
|
||||
"SELECT key, value FROM system_config WHERE key IN (
|
||||
'smtp_enabled', 'smtp_host', 'smtp_port', 'smtp_username',
|
||||
'smtp_password', 'smtp_from', 'smtp_tls_mode'
|
||||
)",
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let get = |key: &str| -> String {
|
||||
rows.iter()
|
||||
.find(|(k, _)| k == key)
|
||||
.map(|(_, v)| v.clone())
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
SmtpSettings {
|
||||
enabled: get("smtp_enabled") == "true",
|
||||
host: get("smtp_host"),
|
||||
port: get("smtp_port").parse().unwrap_or(587),
|
||||
username: get("smtp_username"),
|
||||
password: get("smtp_password"),
|
||||
from: get("smtp_from"),
|
||||
tls_mode: get("smtp_tls_mode"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load notification preferences from `system_config`.
|
||||
async fn load_notification_settings(pool: &PgPool) -> NotificationSettings {
|
||||
let rows: Vec<(String, String)> = sqlx::query_as(
|
||||
"SELECT key, value FROM system_config WHERE key IN (
|
||||
'notification_email_enabled', 'notification_email_from', 'notification_email_recipients'
|
||||
)",
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let get = |key: &str| -> String {
|
||||
rows.iter()
|
||||
.find(|(k, _)| k == key)
|
||||
.map(|(_, v)| v.clone())
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
let recipients: Vec<String> = serde_json::from_str(&get("notification_email_recipients")).unwrap_or_default();
|
||||
|
||||
NotificationSettings {
|
||||
email_enabled: get("notification_email_enabled") == "true",
|
||||
email_from: get("notification_email_from"),
|
||||
recipients,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build an async SMTP transport from settings.
|
||||
fn build_transport(
|
||||
settings: &SmtpSettings,
|
||||
) -> Result<AsyncSmtpTransport<Tokio1Executor>, String> {
|
||||
match settings.tls_mode.as_str() {
|
||||
"tls" => {
|
||||
let mut builder = AsyncSmtpTransport::<Tokio1Executor>::relay(&settings.host)
|
||||
.map_err(|e| format!("TLS relay error: {}", e))?;
|
||||
builder = builder.port(settings.port);
|
||||
if !settings.username.is_empty() {
|
||||
builder = builder.credentials(Credentials::new(
|
||||
settings.username.clone(),
|
||||
settings.password.clone(),
|
||||
));
|
||||
}
|
||||
Ok(builder.build())
|
||||
}
|
||||
"starttls" => {
|
||||
let mut builder = AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&settings.host)
|
||||
.map_err(|e| format!("STARTTLS relay error: {}", e))?;
|
||||
builder = builder.port(settings.port);
|
||||
if !settings.username.is_empty() {
|
||||
builder = builder.credentials(Credentials::new(
|
||||
settings.username.clone(),
|
||||
settings.password.clone(),
|
||||
));
|
||||
}
|
||||
Ok(builder.build())
|
||||
}
|
||||
_ => {
|
||||
// "none" — plaintext / no TLS
|
||||
let mut builder = AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(&settings.host)
|
||||
.port(settings.port);
|
||||
if !settings.username.is_empty() {
|
||||
builder = builder.credentials(Credentials::new(
|
||||
settings.username.clone(),
|
||||
settings.password.clone(),
|
||||
));
|
||||
}
|
||||
Ok(builder.build())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send an email notification. Returns true if the email was sent successfully.
|
||||
async fn send_email(
|
||||
pool: &PgPool,
|
||||
subject: &str,
|
||||
body: &str,
|
||||
) -> bool {
|
||||
let smtp = match load_smtp_settings(pool).await {
|
||||
s if !s.enabled => {
|
||||
tracing::debug!("SMTP not enabled, skipping email notification");
|
||||
return false;
|
||||
}
|
||||
s => s,
|
||||
};
|
||||
|
||||
let notif = load_notification_settings(pool).await;
|
||||
if !notif.email_enabled {
|
||||
tracing::debug!("Email notifications disabled, skipping");
|
||||
return false;
|
||||
}
|
||||
|
||||
if notif.recipients.is_empty() {
|
||||
tracing::debug!("No email recipients configured, skipping notification");
|
||||
return false;
|
||||
}
|
||||
|
||||
let from_addr = if notif.email_from.is_empty() {
|
||||
smtp.from.clone()
|
||||
} else {
|
||||
notif.email_from
|
||||
};
|
||||
|
||||
let from_mailbox: Mailbox = match from_addr.parse() {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "Invalid from address for email notification");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let mut builder = Message::builder()
|
||||
.from(from_mailbox.clone())
|
||||
.subject(subject)
|
||||
.header(ContentType::TEXT_PLAIN);
|
||||
|
||||
// Add all recipients
|
||||
for recipient in ¬if.recipients {
|
||||
let mailbox: Mailbox = match recipient.parse() {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, recipient = %recipient, "Invalid recipient address");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
builder = builder.to(mailbox);
|
||||
}
|
||||
|
||||
let email = match builder.body(body.to_string()) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "Failed to build email message");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let transport = match build_transport(&smtp) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "Failed to build SMTP transport");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
match transport.send(email).await {
|
||||
Ok(_) => {
|
||||
tracing::info!(subject, "Email notification sent successfully");
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, subject, "Failed to send email notification");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a patch failure notification email for a specific host.
|
||||
pub async fn send_patch_failure_email(
|
||||
pool: &PgPool,
|
||||
host_fqdn: &str,
|
||||
job_id: &str,
|
||||
error_message: &str,
|
||||
) {
|
||||
let subject = format!("[Patch Manager] Patch Failed on {}", host_fqdn);
|
||||
let body = format!(
|
||||
"Patch operation failed on host: {host_fqdn}\n\
|
||||
Job ID: {job_id}\n\
|
||||
Error: {error_message}\n\
|
||||
\n\
|
||||
Please review the job details in the Patch Manager dashboard."
|
||||
);
|
||||
|
||||
let sent = send_email(pool, &subject, &body).await;
|
||||
|
||||
log_event(
|
||||
pool,
|
||||
AuditAction::EmailNotificationSent,
|
||||
None,
|
||||
None,
|
||||
Some("patch_job"),
|
||||
Some(job_id),
|
||||
serde_json::json!({
|
||||
"type": "patch_failure",
|
||||
"host_fqdn": host_fqdn,
|
||||
"sent": sent,
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Send a job completion notification email.
|
||||
pub async fn send_job_completion_email(
|
||||
pool: &PgPool,
|
||||
job_id: &str,
|
||||
host_count: i64,
|
||||
succeeded_count: i64,
|
||||
failed_count: i64,
|
||||
) {
|
||||
let subject = format!("[Patch Manager] Job {} Completed", job_id);
|
||||
let body = format!(
|
||||
"Patch job completed: {job_id}\n\
|
||||
Total hosts: {host_count}\n\
|
||||
Succeeded: {succeeded_count}\n\
|
||||
Failed: {failed_count}\n\
|
||||
\n\
|
||||
Please review the job details in the Patch Manager dashboard."
|
||||
);
|
||||
|
||||
let sent = send_email(pool, &subject, &body).await;
|
||||
|
||||
log_event(
|
||||
pool,
|
||||
AuditAction::EmailNotificationSent,
|
||||
None,
|
||||
None,
|
||||
Some("patch_job"),
|
||||
Some(job_id),
|
||||
serde_json::json!({
|
||||
"type": "job_completion",
|
||||
"host_count": host_count,
|
||||
"succeeded_count": succeeded_count,
|
||||
"failed_count": failed_count,
|
||||
"sent": sent,
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Send a maintenance window reminder email.
|
||||
pub async fn send_maintenance_window_reminder_email(
|
||||
pool: &PgPool,
|
||||
host_fqdn: &str,
|
||||
window_label: &str,
|
||||
start_at: &str,
|
||||
) {
|
||||
let subject = format!("[Patch Manager] Upcoming Maintenance Window: {}", window_label);
|
||||
let body = format!(
|
||||
"Maintenance window reminder:\n\
|
||||
Host: {host_fqdn}\n\
|
||||
Window: {window_label}\n\
|
||||
Starts at: {start_at}\n\
|
||||
\n\
|
||||
Patch operations will begin at the scheduled time."
|
||||
);
|
||||
|
||||
let sent = send_email(pool, &subject, &body).await;
|
||||
|
||||
log_event(
|
||||
pool,
|
||||
AuditAction::MaintenanceWindowReminder,
|
||||
None,
|
||||
None,
|
||||
Some("maintenance_window"),
|
||||
None,
|
||||
serde_json::json!({
|
||||
"type": "maintenance_reminder",
|
||||
"host_fqdn": host_fqdn,
|
||||
"window_label": window_label,
|
||||
"sent": sent,
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@ -22,6 +22,7 @@ use tokio::{sync::Semaphore, time};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::agent_loader::load_agent_certs;
|
||||
use crate::email;
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Internal DB row types
|
||||
@ -710,6 +711,8 @@ async fn handle_host_failure(pool: PgPool, pjh_id: Uuid, error_msg: String) {
|
||||
/// 2. All hosts `succeeded` → parent `succeeded`.
|
||||
/// 3. All hosts `cancelled` → parent `cancelled`.
|
||||
/// 4. Any `failed` with none still active → parent `failed` (includes partial).
|
||||
///
|
||||
/// After rolling up, sends email notifications for completed/failed jobs.
|
||||
async fn sync_job_status(pool: &PgPool, job_id: Uuid) {
|
||||
let counts: StatusCounts = match sqlx::query_as(
|
||||
r#"
|
||||
@ -798,6 +801,57 @@ async fn sync_job_status(pool: &PgPool, job_id: Uuid) {
|
||||
if let Err(e) = result {
|
||||
tracing::error!(%job_id, error = %e, "sync_job_status: failed to update parent job");
|
||||
}
|
||||
|
||||
// Send email notifications for completed/failed jobs
|
||||
if set_completed {
|
||||
// Spawn email notification in background — non-blocking
|
||||
let pool_clone = pool.clone();
|
||||
let job_id_str = job_id.to_string();
|
||||
let total = counts.total_count;
|
||||
let succeeded = counts.succeeded_count;
|
||||
let failed = counts.failed_count;
|
||||
|
||||
tokio::spawn(async move {
|
||||
email::send_job_completion_email(
|
||||
&pool_clone,
|
||||
&job_id_str,
|
||||
total,
|
||||
succeeded,
|
||||
failed,
|
||||
).await;
|
||||
|
||||
// If there are failures, also send failure emails per host
|
||||
if failed > 0 {
|
||||
let failed_hosts: Vec<(String, String)> = match sqlx::query_as(
|
||||
r#"
|
||||
SELECT h.fqdn, COALESCE(pjh.error_message, 'Unknown error')
|
||||
FROM patch_job_hosts pjh
|
||||
JOIN hosts h ON h.id = pjh.host_id
|
||||
WHERE pjh.job_id = $1 AND pjh.status = 'failed'
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.fetch_all(&pool_clone)
|
||||
.await
|
||||
{
|
||||
Ok(rows) => rows,
|
||||
Err(e) => {
|
||||
tracing::error!(%job_id, error = %e, "sync_job_status: failed to fetch failed hosts for email");
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
for (fqdn, error_msg) in failed_hosts {
|
||||
email::send_patch_failure_email(
|
||||
&pool_clone,
|
||||
&fqdn,
|
||||
&job_id_str,
|
||||
&error_msg,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
//! pm-worker — Linux Patch Manager background worker.
|
||||
//!
|
||||
//! Handles scheduled polling, job execution, maintenance window scheduling,
|
||||
//! retry logic, email notifications, and data pruning.
|
||||
//! retry logic, email notifications, audit integrity verification, and data pruning.
|
||||
|
||||
mod agent_loader;
|
||||
mod audit_verifier;
|
||||
mod email;
|
||||
mod health_poller;
|
||||
mod maintenance_scheduler;
|
||||
mod patch_poller;
|
||||
@ -20,6 +22,7 @@ use sqlx::PgPool;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time;
|
||||
|
||||
use audit_verifier::run_audit_verifier;
|
||||
use health_poller::run_health_poller;
|
||||
use maintenance_scheduler::run_maintenance_scheduler;
|
||||
use patch_poller::run_patch_poller;
|
||||
@ -30,7 +33,7 @@ use ws_relay::run_ws_relay;
|
||||
/// Minimum number of applied migrations the worker requires before
|
||||
/// accepting work. Prevents the worker from running against a schema
|
||||
/// that hasn't been migrated yet.
|
||||
const REQUIRED_MIGRATION_COUNT: i64 = 1;
|
||||
const REQUIRED_MIGRATION_COUNT: i64 = 5;
|
||||
|
||||
/// How long to wait between schema-version checks before giving up.
|
||||
const SCHEMA_CHECK_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
@ -80,6 +83,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
// M7: WS relay — streams agent job events → DB → pg_notify → browser WS
|
||||
let ws_relay_handle = tokio::spawn(run_ws_relay(pool.clone(), config.clone()));
|
||||
|
||||
// M11: audit integrity verification (runs every 24 hours)
|
||||
let audit_verifier_handle = tokio::spawn(run_audit_verifier(pool.clone(), config.clone()));
|
||||
|
||||
tracing::info!("Worker tasks started");
|
||||
|
||||
// Wait for all tasks (they run indefinitely)
|
||||
@ -91,6 +97,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
job_exec_handle,
|
||||
maint_sched_handle,
|
||||
ws_relay_handle,
|
||||
audit_verifier_handle,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user