Private
Public Access
1
0

feat(M1): Project scaffolding, DB schema, core infrastructure

- Initialize Rust workspace with 7 crates (pm-web, pm-worker, pm-core,
  pm-agent-client, pm-auth, pm-ca, pm-reports)
- React + TypeScript + Vite + MUI frontend scaffold
- Full PostgreSQL schema: all 17 tables with indexes and constraints
- pm-core: config (TOML+env), db (SQLx pool + migrations), error
  (unified AppError + JSON envelope), request_id (ULID middleware),
  logging (tracing JSON/pretty)
- pm-web: Axum skeleton, /status/health endpoint, static file serving
- pm-worker: Tokio skeleton, heartbeat writer, schema version check
- Embedded sqlx migrations with advisory lock (single-writer)
- systemd unit files, setup.sh, build-frontend.sh
- config.example.toml with all configuration keys
- docs/runbooks/restore.md
- cargo check passes with zero warnings

Closes M1.
This commit is contained in:
2026-04-23 15:55:53 +00:00
parent 3eb7fd9f95
commit da5a94d838
50 changed files with 6139 additions and 3 deletions

View File

@ -0,0 +1,19 @@
[package]
name = "pm-agent-client"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
pm-core = { path = "../pm-core" }
tokio = { workspace = true }
reqwest = { workspace = true }
rustls = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }

View File

@ -0,0 +1,10 @@
//! Agent HTTP client stub.
//! Full mTLS Rustls-based implementation arrives in M4.
use thiserror::Error;
#[derive(Debug, Error)]
pub enum AgentClientError {
#[error("Not yet implemented")]
NotImplemented,
}

View File

@ -0,0 +1,4 @@
//! pm-agent-client — mTLS HTTP client for Linux Patch API agent communication.
//!
//! M1: Stub. Full implementation in M4.
pub mod client;

19
crates/pm-auth/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "pm-auth"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
pm-core = { path = "../pm-core" }
tokio = { workspace = true }
axum = { workspace = true }
sqlx = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }

View File

@ -0,0 +1 @@
//! jwt — stub for M2.

10
crates/pm-auth/src/lib.rs Normal file
View File

@ -0,0 +1,10 @@
//! pm-auth — Authentication and authorization.
//!
//! Modules: password (Argon2id), jwt (EdDSA), refresh tokens,
//! mfa_totp, mfa_webauthn, rbac, session.
//!
//! M1: Stub. Full implementation in M2.
pub mod password;
pub mod jwt;
pub mod rbac;
pub mod session;

View File

@ -0,0 +1 @@
//! password — stub for M2.

View File

@ -0,0 +1 @@
//! rbac — stub for M2.

View File

@ -0,0 +1 @@
//! session — stub for M2.

16
crates/pm-ca/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "pm-ca"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
pm-core = { path = "../pm-core" }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }

1
crates/pm-ca/src/ca.rs Normal file
View File

@ -0,0 +1 @@
//! Internal CA stub for M8.

7
crates/pm-ca/src/lib.rs Normal file
View File

@ -0,0 +1,7 @@
//! pm-ca — Internal Certificate Authority.
//!
//! Issues and renews mTLS client certificates for agent communication.
//! Uses rcgen + rustls. CA key stored at /etc/patch-manager/ca/ca.key.
//!
//! M1: Stub. Full implementation in M8.
pub mod ca;

22
crates/pm-core/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "pm-core"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
tokio = { workspace = true }
sqlx = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }
ulid = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
axum = { workspace = true }

View File

@ -0,0 +1,137 @@
use serde::{Deserialize, Serialize};
use config::{Config, ConfigError, Environment, File};
/// Top-level application configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AppConfig {
pub server: ServerConfig,
pub database: DatabaseConfig,
pub worker: WorkerConfig,
pub logging: LoggingConfig,
pub security: SecurityConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServerConfig {
/// Bind address for the web server
pub host: String,
/// HTTPS port
pub port: u16,
/// Path to static frontend assets
pub static_dir: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DatabaseConfig {
/// Full PostgreSQL connection URL
pub url: String,
/// Maximum pool connections
pub max_connections: u32,
/// Minimum pool connections
pub min_connections: u32,
/// Connection acquire timeout in seconds
pub acquire_timeout_secs: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WorkerConfig {
/// Health poll interval in seconds (default: 300 = 5 min)
pub health_poll_interval_secs: u64,
/// Patch data poll interval in seconds (default: 1800 = 30 min)
pub patch_poll_interval_secs: u64,
/// Maximum concurrent agent calls
pub max_concurrent_agent_calls: usize,
/// Worker heartbeat interval in seconds
pub heartbeat_interval_secs: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LoggingConfig {
/// Log level filter: trace, debug, info, warn, error
pub level: String,
/// Output format: json or pretty
pub format: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SecurityConfig {
/// IP whitelist (CIDR or individual IPs); empty = allow all (not recommended)
pub ip_whitelist: Vec<String>,
/// JWT signing key path (Ed25519 PEM)
pub jwt_signing_key_path: String,
/// JWT verification key path (Ed25519 public PEM)
pub jwt_verify_key_path: String,
/// JWT access token TTL in seconds (default: 900 = 15 min)
pub jwt_access_ttl_secs: u64,
/// Agent mTLS client cert path
pub agent_client_cert_path: String,
/// Agent mTLS client key path
pub agent_client_key_path: String,
/// Internal CA cert path
pub ca_cert_path: String,
/// Internal CA key path
pub ca_key_path: String,
/// Web UI TLS cert path
pub web_tls_cert_path: String,
/// Web UI TLS key path
pub web_tls_key_path: String,
}
impl AppConfig {
/// Load configuration from a TOML file and environment variable overrides.
///
/// Environment variables follow the pattern: `PATCH_MANAGER__SECTION__KEY`
/// e.g. `PATCH_MANAGER__DATABASE__URL=postgres://...`
pub fn load(config_path: &str) -> Result<Self, ConfigError> {
let cfg = Config::builder()
.add_source(File::with_name(config_path).required(false))
.add_source(
Environment::with_prefix("PATCH_MANAGER")
.separator("__")
.try_parsing(true),
)
.build()?;
cfg.try_deserialize()
}
}
impl Default for AppConfig {
fn default() -> Self {
Self {
server: ServerConfig {
host: "0.0.0.0".to_string(),
port: 443,
static_dir: "/usr/share/patch-manager/frontend".to_string(),
},
database: DatabaseConfig {
url: "postgres://patch_manager:changeme@localhost/patch_manager".to_string(),
max_connections: 20,
min_connections: 2,
acquire_timeout_secs: 30,
},
worker: WorkerConfig {
health_poll_interval_secs: 300,
patch_poll_interval_secs: 1800,
max_concurrent_agent_calls: 64,
heartbeat_interval_secs: 30,
},
logging: LoggingConfig {
level: "info".to_string(),
format: "json".to_string(),
},
security: SecurityConfig {
ip_whitelist: vec![],
jwt_signing_key_path: "/etc/patch-manager/jwt/signing.pem".to_string(),
jwt_verify_key_path: "/etc/patch-manager/jwt/verify.pem".to_string(),
jwt_access_ttl_secs: 900,
agent_client_cert_path: "/etc/patch-manager/certs/client.crt".to_string(),
agent_client_key_path: "/etc/patch-manager/certs/client.key".to_string(),
ca_cert_path: "/etc/patch-manager/ca/ca.crt".to_string(),
ca_key_path: "/etc/patch-manager/ca/ca.key".to_string(),
web_tls_cert_path: "/etc/patch-manager/tls/web.crt".to_string(),
web_tls_key_path: "/etc/patch-manager/tls/web.key".to_string(),
},
}
}
}

69
crates/pm-core/src/db.rs Normal file
View File

@ -0,0 +1,69 @@
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::time::Duration;
use crate::config::DatabaseConfig;
/// Initialize and return a PostgreSQL connection pool.
pub async fn init_pool(cfg: &DatabaseConfig) -> Result<PgPool, sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(cfg.max_connections)
.min_connections(cfg.min_connections)
.acquire_timeout(Duration::from_secs(cfg.acquire_timeout_secs))
.connect(&cfg.url)
.await?;
tracing::info!(
max_connections = cfg.max_connections,
"PostgreSQL connection pool initialized"
);
Ok(pool)
}
/// Run embedded SQLx migrations.
/// Uses a PostgreSQL advisory lock to ensure only one writer runs migrations.
pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::migrate::MigrateError> {
tracing::info!("Acquiring advisory lock for migrations");
// Advisory lock key — consistent hash of the application name
const LOCK_KEY: i64 = 0x7061_7463_686d_6772; // "patchmgr" bytes
// Acquire advisory lock; blocks until granted
sqlx::query("SELECT pg_advisory_lock($1)")
.bind(LOCK_KEY)
.execute(pool)
.await
.map_err(|e| {
tracing::error!(error = %e, "Failed to acquire advisory lock");
e
})
.expect("Advisory lock must be acquired before running migrations");
tracing::info!("Running database migrations");
let result = sqlx::migrate!("../../migrations").run(pool).await;
// Always release the lock
sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(LOCK_KEY)
.execute(pool)
.await
.ok();
match &result {
Ok(_) => tracing::info!("Database migrations completed successfully"),
Err(e) => tracing::error!(error = %e, "Database migrations failed"),
}
result
}
/// Check that the database schema is at the expected version.
/// Used by the worker to wait until migrations have been applied.
pub async fn check_schema_version(pool: &PgPool) -> Result<i64, sqlx::Error> {
let row: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM _sqlx_migrations WHERE success = true",
)
.fetch_one(pool)
.await?;
Ok(row.0)
}

124
crates/pm-core/src/error.rs Normal file
View File

@ -0,0 +1,124 @@
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
/// Unified application error type.
#[derive(Debug, Error)]
pub enum AppError {
#[error("Not found: {0}")]
NotFound(String),
#[error("Unauthorized: {0}")]
Unauthorized(String),
#[error("Forbidden: {0}")]
Forbidden(String),
#[error("Bad request: {0}")]
BadRequest(String),
#[error("Conflict: {0}")]
Conflict(String),
#[error("Unprocessable entity: {0}")]
UnprocessableEntity(String),
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Internal error: {0}")]
Internal(#[from] anyhow::Error),
#[error("Configuration error: {0}")]
Config(String),
}
/// JSON error envelope returned to clients.
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub error: ErrorDetail,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorDetail {
/// Machine-readable error code (e.g. "not_found", "unauthorized")
pub code: String,
/// Human-readable message
pub message: String,
/// Request ID for correlation (set by middleware)
pub request_id: Option<String>,
/// Optional structured details
pub details: Option<serde_json::Value>,
}
impl ErrorResponse {
pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
error: ErrorDetail {
code: code.into(),
message: message.into(),
request_id: None,
details: None,
},
}
}
pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
self.error.request_id = Some(request_id.into());
self
}
pub fn with_details(mut self, details: serde_json::Value) -> Self {
self.error.details = Some(details);
self
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, code, message) = match &self {
AppError::NotFound(msg) => (StatusCode::NOT_FOUND, "not_found", msg.clone()),
AppError::Unauthorized(msg) => (StatusCode::UNAUTHORIZED, "unauthorized", msg.clone()),
AppError::Forbidden(msg) => (StatusCode::FORBIDDEN, "forbidden", msg.clone()),
AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, "bad_request", msg.clone()),
AppError::Conflict(msg) => (StatusCode::CONFLICT, "conflict", msg.clone()),
AppError::UnprocessableEntity(msg) => {
(StatusCode::UNPROCESSABLE_ENTITY, "unprocessable_entity", msg.clone())
}
AppError::Database(e) => {
tracing::error!(error = %e, "Database error");
(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_error",
"An internal error occurred".to_string(),
)
}
AppError::Internal(e) => {
tracing::error!(error = %e, "Internal error");
(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_error",
"An internal error occurred".to_string(),
)
}
AppError::Config(msg) => {
tracing::error!(error = %msg, "Configuration error");
(
StatusCode::INTERNAL_SERVER_ERROR,
"config_error",
"Server configuration error".to_string(),
)
}
};
let body = ErrorResponse::new(code, message);
(status, Json(body)).into_response()
}
}
/// Convenience alias for handler return types.
pub type ApiResult<T> = Result<T, AppError>;

View File

@ -0,0 +1,9 @@
pub mod config;
pub mod db;
pub mod error;
pub mod logging;
pub mod request_id;
// Re-export commonly used types
pub use error::{AppError, ErrorResponse};
pub use config::AppConfig;

View File

@ -0,0 +1,32 @@
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use crate::config::LoggingConfig;
/// Initialize the global tracing subscriber.
///
/// Format is controlled by `cfg.format`:
/// - `"json"` — machine-readable JSON (production default)
/// - anything else — human-readable pretty output (development)
///
/// Log level is controlled by `cfg.level` (e.g. `"info"`, `"debug"`).
/// The `RUST_LOG` environment variable overrides `cfg.level`.
pub fn init(cfg: &LoggingConfig) {
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(&cfg.level));
match cfg.format.as_str() {
"json" => {
tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().json().with_current_span(true))
.init();
}
_ => {
tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().pretty())
.init();
}
}
tracing::info!(format = %cfg.format, level = %cfg.level, "Logging initialized");
}

View File

@ -0,0 +1,44 @@
use axum::{
extract::Request,
http::HeaderValue,
middleware::Next,
response::Response,
};
use ulid::Ulid;
/// HTTP header name for request correlation IDs.
pub const REQUEST_ID_HEADER: &str = "x-request-id";
/// Axum middleware that generates a ULID request ID and attaches it
/// to both the request extensions and the response header.
pub async fn request_id_middleware(mut req: Request, next: Next) -> Response {
// Use existing X-Request-Id if provided by upstream proxy, else generate
let id = req
.headers()
.get(REQUEST_ID_HEADER)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| Ulid::new().to_string());
// Insert as extension so handlers can access it
req.extensions_mut().insert(RequestId(id.clone()));
let mut response = next.run(req).await;
// Echo the ID back in the response
if let Ok(value) = HeaderValue::from_str(&id) {
response.headers_mut().insert(REQUEST_ID_HEADER, value);
}
response
}
/// Extractor type for retrieving the request ID inside handlers.
#[derive(Debug, Clone)]
pub struct RequestId(pub String);
impl std::fmt::Display for RequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

View File

@ -0,0 +1,16 @@
[package]
name = "pm-reports"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
pm-core = { path = "../pm-core" }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }

View File

@ -0,0 +1 @@
//! csv report generation stub for M9.

View File

@ -0,0 +1,7 @@
//! pm-reports — CSV and PDF report generation.
//!
//! Uses printpdf + plotters for in-process PDF with charts.
//!
//! M1: Stub. Full implementation in M9.
pub mod csv;
pub mod pdf;

View File

@ -0,0 +1 @@
//! pdf report generation stub for M9.

27
crates/pm-web/Cargo.toml Normal file
View File

@ -0,0 +1,27 @@
[package]
name = "pm-web"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[[bin]]
name = "pm-web"
path = "src/main.rs"
[dependencies]
pm-core = { path = "../pm-core" }
tokio = { workspace = true }
axum = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
sqlx = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }
ulid = { workspace = true }
chrono = { workspace = true }

137
crates/pm-web/src/main.rs Normal file
View File

@ -0,0 +1,137 @@
//! pm-web — Linux Patch Manager web server.
//!
//! Serves the React SPA, exposes the REST API, and handles WebSocket relay.
use axum::{
extract::State,
http::StatusCode,
middleware,
response::Json,
routing::get,
Router,
};
use pm_core::{
config::AppConfig,
db,
logging,
request_id::request_id_middleware,
};
use serde_json::{json, Value};
use std::{net::SocketAddr, sync::Arc};
use tower_http::{
services::ServeDir,
trace::TraceLayer,
};
/// Shared application state threaded through Axum.
#[derive(Clone)]
pub struct AppState {
pub db: sqlx::PgPool,
pub config: Arc<AppConfig>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Load configuration
let config_path = std::env::var("PATCH_MANAGER_CONFIG")
.unwrap_or_else(|_| "/etc/patch-manager/config.toml".to_string());
let config = AppConfig::load(&config_path)
.unwrap_or_else(|_| {
eprintln!("Config file not found or invalid, using defaults");
AppConfig::default()
});
// Initialize logging
logging::init(&config.logging);
tracing::info!(version = env!("CARGO_PKG_VERSION"), "patch-manager-web starting");
// Initialize database pool
let pool = db::init_pool(&config.database).await?;
// Run migrations (advisory lock guards single-writer)
db::run_migrations(&pool).await?;
let state = AppState {
db: pool,
config: Arc::new(config.clone()),
};
// Build the application router
let app = build_router(state);
// Bind address
let addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port)
.parse()
.expect("Invalid bind address");
tracing::info!(%addr, "Listening");
// TODO M8: wrap with TLS (rustls). For M1 we bind plain HTTP for local dev.
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
/// Construct the full Axum router.
pub fn build_router(state: AppState) -> Router {
let static_dir = state.config.server.static_dir.clone();
Router::new()
// Health / status (unauthenticated)
.route("/status/health", get(health_handler))
// API v1 routes (stub — expanded in later milestones)
.nest("/api/v1", api_v1_router())
// Serve React SPA static files; fallback to index.html for client-side routing
.fallback_service(
ServeDir::new(&static_dir)
.append_index_html_on_directories(true),
)
// Middleware stack
.layer(middleware::from_fn(request_id_middleware))
.layer(TraceLayer::new_for_http())
.with_state(state)
}
/// API v1 sub-router — routes added per milestone.
fn api_v1_router() -> Router<AppState> {
Router::new()
// M2+: auth routes will be nested here
// M3+: host/group/user routes
// M4+: fleet status, agent polling
// M5+: jobs
// M6+: maintenance windows
// M7+: websocket relay
// M8+: certificates
// M9+: reports
// M10+: settings
}
/// GET /status/health — liveness probe.
///
/// Returns 200 OK with a JSON payload including service name, version,
/// and basic database reachability.
async fn health_handler(State(state): State<AppState>) -> Result<Json<Value>, StatusCode> {
// Quick DB ping
let db_ok = sqlx::query("SELECT 1")
.execute(&state.db)
.await
.is_ok();
let status = if db_ok { "healthy" } else { "degraded" };
let body = json!({
"service": "patch-manager-web",
"version": env!("CARGO_PKG_VERSION"),
"status": status,
"database": if db_ok { "ok" } else { "error" },
});
if db_ok {
Ok(Json(body))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}

View File

@ -0,0 +1,24 @@
[package]
name = "pm-worker"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[[bin]]
name = "pm-worker"
path = "src/main.rs"
[dependencies]
pm-core = { path = "../pm-core" }
tokio = { workspace = true }
sqlx = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }

View File

@ -0,0 +1,128 @@
//! pm-worker — Linux Patch Manager background worker.
//!
//! Handles scheduled polling, job execution, maintenance window scheduling,
//! retry logic, email notifications, and data pruning.
use pm_core::{
config::AppConfig,
db,
logging,
};
use sqlx::PgPool;
use std::{sync::Arc, time::Duration};
use tokio::time;
/// Minimum number of applied migrations the worker requires before
/// accepting work. Prevents the worker from running against a schema
/// that hasn't been migrated yet.
const REQUIRED_MIGRATION_COUNT: i64 = 1;
/// How long to wait between schema-version checks before giving up.
const SCHEMA_CHECK_TIMEOUT: Duration = Duration::from_secs(120);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Load configuration
let config_path = std::env::var("PATCH_MANAGER_CONFIG")
.unwrap_or_else(|_| "/etc/patch-manager/config.toml".to_string());
let config = AppConfig::load(&config_path)
.unwrap_or_else(|_| {
eprintln!("Config file not found or invalid, using defaults");
AppConfig::default()
});
// Initialize logging
logging::init(&config.logging);
tracing::info!(version = env!("CARGO_PKG_VERSION"), "patch-manager-worker starting");
// Initialize database pool
let pool = db::init_pool(&config.database).await?;
// Wait for schema to be at the expected version (web process runs migrations)
wait_for_schema(&pool).await?;
let config = Arc::new(config);
// Spawn worker tasks
let heartbeat_handle = tokio::spawn(run_heartbeat(
pool.clone(),
config.worker.heartbeat_interval_secs,
));
// TODO M4: spawn health_poller, patch_data_poller
// TODO M5: spawn job_executor
// TODO M6: spawn job_scheduler
tracing::info!("Worker tasks started");
// Wait for all tasks (they run indefinitely)
let _ = tokio::join!(heartbeat_handle);
Ok(())
}
/// Wait until the database schema has at least `REQUIRED_MIGRATION_COUNT`
/// successful migrations applied. Retries every 5 seconds up to
/// `SCHEMA_CHECK_TIMEOUT`.
async fn wait_for_schema(pool: &PgPool) -> anyhow::Result<()> {
let deadline = tokio::time::Instant::now() + SCHEMA_CHECK_TIMEOUT;
loop {
match db::check_schema_version(pool).await {
Ok(count) if count >= REQUIRED_MIGRATION_COUNT => {
tracing::info!(migration_count = count, "Schema version check passed");
return Ok(());
}
Ok(count) => {
tracing::warn!(
migration_count = count,
required = REQUIRED_MIGRATION_COUNT,
"Schema not ready, waiting..."
);
}
Err(e) => {
tracing::warn!(error = %e, "Schema version check failed, retrying...");
}
}
if tokio::time::Instant::now() >= deadline {
anyhow::bail!(
"Schema not ready after {}s — is the web process running migrations?",
SCHEMA_CHECK_TIMEOUT.as_secs()
);
}
time::sleep(Duration::from_secs(5)).await;
}
}
/// Writes a heartbeat row to `worker_heartbeat` every `interval_secs`.
/// The web process can query this to confirm the worker is alive.
async fn run_heartbeat(pool: PgPool, interval_secs: u64) {
let interval = Duration::from_secs(interval_secs);
let mut ticker = time::interval(interval);
loop {
ticker.tick().await;
let result = sqlx::query(
r#"
INSERT INTO worker_heartbeat (id, last_seen, worker_version)
VALUES (1, NOW(), $1)
ON CONFLICT (id) DO UPDATE
SET last_seen = EXCLUDED.last_seen,
worker_version = EXCLUDED.worker_version
"#,
)
.bind(env!("CARGO_PKG_VERSION"))
.execute(&pool)
.await;
match result {
Ok(_) => tracing::debug!("Worker heartbeat written"),
Err(e) => tracing::error!(error = %e, "Worker heartbeat failed"),
}
}
}