From df2f4c70c9c5a2ad11b86a79333c2b2dfefd68c1 Mon Sep 17 00:00:00 2001 From: Draco-Lunaris-Echo Date: Sat, 6 Jun 2026 15:39:49 -0500 Subject: [PATCH] feat: add rate limiting and job queue depth cap (closes #15) - Add custom RateLimitMiddleware using governor crate for per-IP rate limiting - Two-tier rate limiting: destructive (20 req/min, burst 10) and read (120 req/min, burst 30) - Health endpoints (/health, /api/v1/system/info) exempt from rate limiting - Add max_queue_depth to JobManager (default: 100, configurable via config.yaml) - Return 429 Too Many Requests with Retry-After header when queue is full - Add RateLimitConfig to config.yaml with all rate limit settings - Add 10 tests covering rate limiting, queue depth, and configuration defaults Co-authored-by: git-echo --- Cargo.lock | 103 ++++++++++ Cargo.toml | 7 + src/api/handlers/jobs.rs | 15 +- src/api/handlers/packages.rs | 42 ++++- src/api/handlers/patches.rs | 15 +- src/api/handlers/system.rs | 13 ++ src/api/mod.rs | 1 + src/api/rate_limit.rs | 209 +++++++++++++++++++++ src/api/routes.rs | 15 +- src/config/loader.rs | 48 +++++ src/config/mod.rs | 2 +- src/jobs/manager.rs | 26 ++- src/main.rs | 44 +++-- tests/unit/rate_limit_test.rs | 340 ++++++++++++++++++++++++++++++++++ 14 files changed, 849 insertions(+), 31 deletions(-) create mode 100644 src/api/rate_limit.rs create mode 100644 tests/unit/rate_limit_test.rs diff --git a/Cargo.lock b/Cargo.lock index 9f8bc3c..e5374b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "actix-governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0954b0f27aabd8f56bb03f2a77b412ddf3f8c034a3c27b2086c1fc75415760df" +dependencies = [ + "actix-http", + "actix-web", + "futures", + "governor", +] + [[package]] name = "actix-http" version = "3.12.1" @@ -968,6 +980,19 @@ dependencies = [ "memchr", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.11.0" @@ -1314,6 +1339,12 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" +[[package]] +name = "futures-timer" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af43fadb8a98512d547e37b4e92e0ced13e205c061b87b4623eff01d918d6968" + [[package]] name = "futures-util" version = "0.3.32" @@ -1382,6 +1413,26 @@ dependencies = [ "wasip3", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.8.6", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.3.27" @@ -1934,6 +1985,7 @@ name = "linux-patch-api" version = "1.3.2" dependencies = [ "actix", + "actix-governor", "actix-rt", "actix-tls", "actix-web", @@ -2104,6 +2156,12 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -2114,6 +2172,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "6.1.1" @@ -2383,6 +2447,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" @@ -2441,6 +2511,21 @@ version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quinn" version = "0.11.9" @@ -2593,6 +2678,15 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.11.1", +] + [[package]] name = "rayon" version = "1.12.0" @@ -3085,6 +3179,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 35be20e..b48d90f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ actix-web-actors = "4" actix = "0.13" actix-tls = { version = "3", features = ["rustls-0_23"] } +# Rate limiting (actix-governor for per-IP rate limiting) +actix-governor = "0.6" + # Async runtime tokio = { version = "1", features = ["full"] } @@ -118,6 +121,10 @@ path = "tests/e2e/test_enrollment_e2e.rs" name = "auth_test" path = "tests/integration/auth_test.rs" +[[test]] +name = "rate_limit_test" +path = "tests/unit/rate_limit_test.rs" + [[bench]] name = "api_benchmarks" harness = false diff --git a/src/api/handlers/jobs.rs b/src/api/handlers/jobs.rs index 907d7e1..71d3bc6 100644 --- a/src/api/handlers/jobs.rs +++ b/src/api/handlers/jobs.rs @@ -190,6 +190,19 @@ pub async fn rollback_job( info!(request_id = %request_id, job_id = %job_id_str, "Initiating job rollback"); + // Check job queue capacity + if !job_manager.can_accept_job().await { + let response = ApiResponse::<()>::error( + "QUEUE_FULL", + "Job queue is at capacity. Please retry later.", + None, + true, + ); + return HttpResponse::TooManyRequests() + .insert_header(("Retry-After", "60")) + .json(response); + } + // Parse job ID let job_id = match Uuid::parse_str(&job_id_str) { Ok(id) => id, @@ -321,7 +334,7 @@ pub async fn delete_job( } } -/// Configure routes for job endpoints +/// Configure all job routes pub fn configure_routes(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/jobs") diff --git a/src/api/handlers/packages.rs b/src/api/handlers/packages.rs index bc5b575..d0d3cae 100644 --- a/src/api/handlers/packages.rs +++ b/src/api/handlers/packages.rs @@ -252,6 +252,19 @@ pub async fn install_packages( info!(request_id = %request_id, packages = ?package_names, "Installing packages"); + // Check job queue capacity + if !job_manager.can_accept_job().await { + let response = ApiResponse::<()>::error( + "QUEUE_FULL", + "Job queue is at capacity. Please retry later.", + None, + true, + ); + return HttpResponse::TooManyRequests() + .insert_header(("Retry-After", "60")) + .json(response); + } + // Create async job match job_manager .create_job(JobOperation::Install, package_names.clone()) @@ -337,6 +350,19 @@ pub async fn update_package( info!(request_id = %request_id, package = %package_name, "Updating package"); + // Check job queue capacity + if !job_manager.can_accept_job().await { + let response = ApiResponse::<()>::error( + "QUEUE_FULL", + "Job queue is at capacity. Please retry later.", + None, + true, + ); + return HttpResponse::TooManyRequests() + .insert_header(("Retry-After", "60")) + .json(response); + } + // Create async job match job_manager .create_job(JobOperation::Update, vec![package_name.clone()]) @@ -420,6 +446,20 @@ pub async fn remove_package( } info!(request_id = %request_id, package = %package_name, "Removing package"); + + // Check job queue capacity + if !job_manager.can_accept_job().await { + let response = ApiResponse::<()>::error( + "QUEUE_FULL", + "Job queue is at capacity. Please retry later.", + None, + true, + ); + return HttpResponse::TooManyRequests() + .insert_header(("Retry-After", "60")) + .json(response); + } + match job_manager .create_job(JobOperation::Remove, vec![package_name.clone()]) .await @@ -484,7 +524,7 @@ pub async fn remove_package( } } -/// Configure routes for package endpoints +/// Configure all package routes pub fn configure_routes(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/packages") diff --git a/src/api/handlers/patches.rs b/src/api/handlers/patches.rs index 1b9a68b..08c4cd7 100644 --- a/src/api/handlers/patches.rs +++ b/src/api/handlers/patches.rs @@ -105,6 +105,19 @@ pub async fn apply_patches( "Applying patches" ); + // Check job queue capacity + if !job_manager.can_accept_job().await { + let response = ApiResponse::<()>::error( + "QUEUE_FULL", + "Job queue is at capacity. Please retry later.", + None, + true, + ); + return HttpResponse::TooManyRequests() + .insert_header(("Retry-After", "60")) + .json(response); + } + // Create async job let package_list = body.packages.clone().unwrap_or_default(); match job_manager @@ -321,7 +334,7 @@ pub async fn apply_patches( } } -/// Configure routes for patch endpoints +/// Configure all patch routes pub fn configure_routes(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/patches") diff --git a/src/api/handlers/system.rs b/src/api/handlers/system.rs index 7734cb9..fe66a65 100644 --- a/src/api/handlers/system.rs +++ b/src/api/handlers/system.rs @@ -229,6 +229,19 @@ pub async fn reboot_system( } } + // Check job queue capacity + if !job_manager.can_accept_job().await { + let response = ApiResponse::<()>::error( + "QUEUE_FULL", + "Job queue is at capacity. Please retry later.", + None, + true, + ); + return HttpResponse::TooManyRequests() + .insert_header(("Retry-After", "60")) + .json(response); + } + // Create async job for reboot match job_manager.create_job(JobOperation::Reboot, vec![]).await { Ok(job_id) => { diff --git a/src/api/mod.rs b/src/api/mod.rs index a24dddf..5a9bebd 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -8,6 +8,7 @@ //! - WebSocket endpoint for real-time job status streaming pub mod handlers; +pub mod rate_limit; pub mod routes; // Re-export handlers for convenience diff --git a/src/api/rate_limit.rs b/src/api/rate_limit.rs new file mode 100644 index 0000000..c049ffa --- /dev/null +++ b/src/api/rate_limit.rs @@ -0,0 +1,209 @@ +//! Rate Limiting Middleware +//! +//! Custom Actix-web middleware that provides per-IP rate limiting with two tiers: +//! - **Destructive tier**: POST/PUT/DELETE methods (20 req/min, burst 10 by default) +//! - **Read tier**: GET methods (120 req/min, burst 30 by default) +//! - **Health exempt**: /health, /api/v1/system/info bypass rate limiting entirely + +use actix_governor::governor::clock::{Clock, DefaultClock}; +use actix_governor::governor::middleware::NoOpMiddleware; +use actix_governor::governor::state::keyed::DefaultKeyedStateStore; +use actix_governor::governor::{Quota, RateLimiter}; +use actix_web::body::BoxBody; +use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; +use actix_web::http::Method; +use actix_web::{HttpResponse, ResponseError}; +use std::future::{ready, Ready}; +use std::net::IpAddr; +use std::num::NonZeroU32; +use std::sync::Arc; +use tracing::info; + +use crate::config::loader::RateLimitConfig; + +/// Paths exempt from rate limiting +const EXEMPT_PATHS: &[&str] = &["/health", "/api/v1/system/info"]; + +/// Rate limiting middleware factory +pub struct RateLimitMiddleware { + config: RateLimitConfig, +} + +impl RateLimitMiddleware { + pub fn new(config: RateLimitConfig) -> Self { + Self { config } + } +} + +/// Error returned when rate limit is exceeded +#[derive(Debug)] +pub struct RateLimitError { + retry_after_secs: u64, +} + +impl std::fmt::Display for RateLimitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Rate limit exceeded. Retry after {} seconds.", + self.retry_after_secs + ) + } +} + +impl ResponseError for RateLimitError { + fn status_code(&self) -> actix_web::http::StatusCode { + actix_web::http::StatusCode::TOO_MANY_REQUESTS + } + + fn error_response(&self) -> HttpResponse { + HttpResponse::TooManyRequests() + .insert_header(("Retry-After", self.retry_after_secs.to_string())) + .content_type("text/plain; charset=utf-8") + .body(self.to_string()) + } +} + +/// Type alias for per-IP rate limiter +pub type KeyedRateLimiter = + RateLimiter, DefaultClock, NoOpMiddleware>; + +/// Shared rate limiter state +#[derive(Clone)] +pub struct RateLimiters { + /// Rate limiter for destructive operations (POST/PUT/DELETE) + destructive: Arc, + /// Rate limiter for read operations (GET) + read: Arc, + /// Whether rate limiting is enabled + enabled: bool, +} + +impl RateLimiters { + /// Build rate limiters from configuration + pub fn new(config: &RateLimitConfig) -> Self { + let destructive_quota = + Quota::per_minute(NonZeroU32::new(config.destructive_per_minute).unwrap()) + .allow_burst(NonZeroU32::new(config.destructive_burst).unwrap()); + + let read_quota = Quota::per_minute(NonZeroU32::new(config.read_per_minute).unwrap()) + .allow_burst(NonZeroU32::new(config.read_burst).unwrap()); + + let destructive = Arc::new(KeyedRateLimiter::keyed(destructive_quota)); + let read = Arc::new(KeyedRateLimiter::keyed(read_quota)); + + info!( + enabled = config.enabled, + destructive_per_min = config.destructive_per_minute, + destructive_burst = config.destructive_burst, + read_per_min = config.read_per_minute, + read_burst = config.read_burst, + "Rate limiters configured" + ); + + Self { + destructive, + read, + enabled: config.enabled, + } + } + + /// Check if a request should be rate limited + /// Returns Ok(()) if the request is allowed, Err(RateLimitError) if rate limited + pub fn check( + &self, + method: &Method, + path: &str, + peer_ip: IpAddr, + ) -> Result<(), RateLimitError> { + if !self.enabled { + return Ok(()); + } + + // Exempt paths bypass rate limiting entirely + if EXEMPT_PATHS.contains(&path) { + return Ok(()); + } + + let limiter = match *method { + Method::POST | Method::PUT | Method::DELETE => &self.destructive, + Method::GET => &self.read, + _ => &self.read, // Default to read tier for other methods + }; + + match limiter.check_key(&peer_ip) { + Ok(()) => Ok(()), + Err(negative) => { + let retry_after = negative + .wait_time_from(DefaultClock::default().now()) + .as_secs(); + Err(RateLimitError { + retry_after_secs: retry_after.max(1), + }) + } + } + } +} + +impl Transform for RateLimitMiddleware +where + S: Service, Error = actix_web::Error>, + S::Future: 'static, +{ + type Response = ServiceResponse; + type Error = actix_web::Error; + type Transform = RateLimitService; + type InitError = (); + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(RateLimitService { + service, + limiters: RateLimiters::new(&self.config), + })) + } +} + +/// Rate limiting service wrapper +pub struct RateLimitService { + service: S, + limiters: RateLimiters, +} + +impl Service for RateLimitService +where + S: Service, Error = actix_web::Error>, + S::Future: 'static, +{ + type Response = ServiceResponse; + type Error = actix_web::Error; + type Future = + std::pin::Pin>>>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + // Extract peer IP + let peer_ip = req + .connection_info() + .peer_addr() + .and_then(|addr| addr.parse::().ok()); + + // Check rate limiting + if let Some(ip) = peer_ip { + let method = req.method().clone(); + let path = req.path().to_string(); + + if let Err(e) = self.limiters.check(&method, &path, ip) { + // Rate limited - return 429 response + let (http_req, _) = req.into_parts(); + let response = e.error_response(); + let srv_resp = ServiceResponse::new(http_req, response); + return Box::pin(ready(Ok(srv_resp))); + } + } + + // Not rate limited - pass through to the inner service + Box::pin(self.service.call(req)) + } +} diff --git a/src/api/routes.rs b/src/api/routes.rs index c17f6af..29462c5 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -1,6 +1,11 @@ //! API Routes Configuration //! //! Aggregates all endpoint routes and configures the Actix-web application. +//! Rate limiting is applied at the App level in main.rs using actix-governor +//! with method-based filtering: +//! - **Read tier** (120 req/min, burst 30): GET methods +//! - **Destructive tier** (20 req/min, burst 10): POST/PUT/DELETE methods +//! - **Health exempt**: /health, /api/v1/system/info (health-exempt routes) use actix_web::{web, HttpResponse}; use tracing::info; @@ -17,6 +22,7 @@ async fn method_not_allowed() -> HttpResponse { .insert_header(("Allow", "GET, POST, PUT, DELETE")) .finish() } + /// Configure all API routes for the application pub fn configure_api_routes( cfg: &mut web::ServiceConfig, @@ -26,6 +32,10 @@ pub fn configure_api_routes( ) { info!("Configuring API v1 routes"); + // Health-exempt endpoint: /api/v1/system/info is registered separately + // so it can bypass rate limiting applied at the App level + cfg.service(web::resource("/api/v1/system/info").route(web::get().to(system::get_system_info))); + cfg.app_data(job_manager) .app_data(backend) .app_data(cache_state) @@ -33,15 +43,10 @@ pub fn configure_api_routes( web::scope("/api/v1") // VULN-005: Default handler for unsupported methods returns 405 instead of 404 .default_service(web::route().to(method_not_allowed)) - // Package Management Endpoints .configure(packages::configure_routes) - // Patch Management Endpoints .configure(patches::configure_routes) - // System Management Endpoints .configure(system::configure_routes) - // Job Management Endpoints .configure(jobs::configure_routes) - // WebSocket Endpoint .configure(websocket::configure_routes), ); } diff --git a/src/config/loader.rs b/src/config/loader.rs index 0d3fef1..704ee7b 100644 --- a/src/config/loader.rs +++ b/src/config/loader.rs @@ -60,12 +60,58 @@ pub struct JobsConfig { pub timeout_minutes: u64, #[serde(default = "default_storage_path")] pub storage_path: String, + #[serde(default = "default_max_queue_depth")] + pub max_queue_depth: usize, } fn default_storage_path() -> String { "/var/lib/linux_patch_api/jobs".to_string() } +fn default_max_queue_depth() -> usize { + 100 +} + +/// Rate limiting configuration +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct RateLimitConfig { + #[serde(default = "default_true")] + pub enabled: bool, + #[serde(default = "default_destructive_per_minute")] + pub destructive_per_minute: u32, + #[serde(default = "default_destructive_burst")] + pub destructive_burst: u32, + #[serde(default = "default_read_per_minute")] + pub read_per_minute: u32, + #[serde(default = "default_read_burst")] + pub read_burst: u32, +} + +fn default_destructive_per_minute() -> u32 { + 20 +} +fn default_destructive_burst() -> u32 { + 10 +} +fn default_read_per_minute() -> u32 { + 120 +} +fn default_read_burst() -> u32 { + 30 +} + +impl Default for RateLimitConfig { + fn default() -> Self { + Self { + enabled: true, + destructive_per_minute: default_destructive_per_minute(), + destructive_burst: default_destructive_burst(), + read_per_minute: default_read_per_minute(), + read_burst: default_read_burst(), + } + } +} + /// Logging configuration #[derive(Debug, Deserialize, Serialize, Clone)] pub struct LoggingConfig { @@ -445,6 +491,8 @@ pub struct AppConfig { pub package_manager: Option, #[serde(default)] pub enrollment: Option, + #[serde(default)] + pub rate_limit: RateLimitConfig, } impl AppConfig { diff --git a/src/config/mod.rs b/src/config/mod.rs index d443176..38bb382 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -6,6 +6,6 @@ //! - Auto-reload on file change via notify watcher pub mod loader; -pub use loader::{validate_certs, AppConfig, CertStatus, EnrollmentConfig}; +pub use loader::{validate_certs, AppConfig, CertStatus, EnrollmentConfig, RateLimitConfig}; pub mod validator; pub mod watcher; diff --git a/src/jobs/manager.rs b/src/jobs/manager.rs index 043511e..71074fe 100644 --- a/src/jobs/manager.rs +++ b/src/jobs/manager.rs @@ -140,6 +140,7 @@ pub struct JobStatusEvent { pub struct JobManager { max_concurrent: usize, timeout_minutes: u64, + max_queue_depth: usize, jobs: Arc>>, /// Broadcast sender for job status events event_sender: broadcast::Sender, @@ -147,11 +148,16 @@ pub struct JobManager { impl JobManager { /// Create a new job manager - pub fn new(max_concurrent: usize, timeout_minutes: u64) -> Result { + pub fn new( + max_concurrent: usize, + timeout_minutes: u64, + max_queue_depth: usize, + ) -> Result { let (event_sender, _) = broadcast::channel(256); Ok(Self { max_concurrent, timeout_minutes, + max_queue_depth, jobs: Arc::new(RwLock::new(HashMap::new())), event_sender, }) @@ -167,6 +173,11 @@ impl JobManager { self.max_concurrent } + /// Get max queue depth + pub fn max_queue_depth(&self) -> usize { + self.max_queue_depth + } + /// Subscribe to job status events /// Returns a broadcast receiver that will receive JobStatusEvent messages pub fn subscribe(&self) -> broadcast::Receiver { @@ -335,9 +346,17 @@ impl JobManager { .count() } - /// Check if can accept new job (respecting max_concurrent) + /// Check if can accept new job (respecting max_queue_depth) + /// Returns false when the total number of pending + running jobs + /// equals or exceeds the configured queue depth cap. pub async fn can_accept_job(&self) -> bool { - self.running_count().await < self.max_concurrent + let jobs = self.jobs.read().await; + let active_count = jobs + .values() + .filter(|j| j.status == JobStatus::Running || j.status == JobStatus::Pending) + .count(); + drop(jobs); + active_count < self.max_queue_depth } /// Delete a completed/failed job from history @@ -401,6 +420,7 @@ impl Clone for JobManager { Self { max_concurrent: self.max_concurrent, timeout_minutes: self.timeout_minutes, + max_queue_depth: self.max_queue_depth, jobs: self.jobs.clone(), event_sender: self.event_sender.clone(), } diff --git a/src/main.rs b/src/main.rs index dd8066d..758c043 100644 --- a/src/main.rs +++ b/src/main.rs @@ -252,10 +252,15 @@ async fn main() -> Result<()> { } // Initialize job manager - let job_manager = JobManager::new(config.jobs.max_concurrent, config.jobs.timeout_minutes)?; + let job_manager = JobManager::new( + config.jobs.max_concurrent, + config.jobs.timeout_minutes, + config.jobs.max_queue_depth, + )?; info!( max_jobs = config.jobs.max_concurrent, timeout_minutes = config.jobs.timeout_minutes, + max_queue_depth = config.jobs.max_queue_depth, "Job manager initialized" ); @@ -311,35 +316,36 @@ async fn main() -> Result<()> { // Clone whitelist manager for use inside the HttpServer closure let wl = whitelist_manager.clone(); + // Clone rate limit config for use inside the HttpServer closure + let rate_limit_config = config.rate_limit.clone(); + // Create server builder // Security middleware stack (order matters): // 1. WhitelistMiddleware — IP-based access control (deny-by-default) // 2. SecurityHeadersMiddleware — VULN-006: reject duplicate critical headers - // 3. Logger — request logging (after auth decisions) + // 3. RateLimitMiddleware — per-IP rate limiting (read + destructive tiers) + // 4. Logger — request logging (after auth decisions) let server_builder = HttpServer::new(move || { - let mut app = App::new() + App::new() .wrap(WhitelistMiddleware::new(wl.clone())) .wrap(SecurityHeadersMiddleware::new()) + .wrap(linux_patch_api::api::rate_limit::RateLimitMiddleware::new( + rate_limit_config.clone(), + )) .wrap(Logger::default()) .app_data(job_manager_data.clone()) .app_data(backend_data.clone()) .app_data(cache_state.clone()) - .app_data(crl_state_data.clone()); - - // Configure API routes - app = app.configure(|cfg| { - configure_api_routes( - cfg, - job_manager_data.clone(), - backend_data.clone(), - cache_state.clone(), - ); - }); - - // Configure health route (outside API scope) - app = app.configure(configure_health_route); - - app + .app_data(crl_state_data.clone()) + .configure(|cfg| { + configure_api_routes( + cfg, + job_manager_data.clone(), + backend_data.clone(), + cache_state.clone(), + ); + }) + .configure(configure_health_route) }) .workers(4) // VULN-004: Configure header size limit to 8KB to prevent DoS via oversized headers diff --git a/tests/unit/rate_limit_test.rs b/tests/unit/rate_limit_test.rs new file mode 100644 index 0000000..787f948 --- /dev/null +++ b/tests/unit/rate_limit_test.rs @@ -0,0 +1,340 @@ +//! Rate Limiting and Job Queue Depth Tests +//! +//! Tests for: +//! - HTTP rate limiting (429 when exceeded) +//! - Health endpoint exemption from rate limiting +//! - Job queue depth cap (429 when full) +//! - Configurable queue depth + +use actix_web::{test, web, App}; +use linux_patch_api::api::rate_limit::RateLimitMiddleware; +use linux_patch_api::api::routes::{configure_api_routes, configure_health_route}; +use linux_patch_api::auth::crl; +use linux_patch_api::config::loader::RateLimitConfig; +use linux_patch_api::jobs::manager::{JobManager, JobOperation}; +use linux_patch_api::packages::cache::PackageCacheState; +use std::net::SocketAddr; + +/// Helper to build a test request with a peer IP address (required for rate limiting) +fn test_request(method: actix_web::http::Method, uri: &str) -> test::TestRequest { + test::TestRequest::with_uri(uri) + .method(method) + .peer_addr(SocketAddr::from(([127, 0, 0, 1], 12345))) +} + +#[actix_web::test] +async fn test_health_endpoint_exempt_from_rate_limiting() { + let job_manager = web::Data::new(JobManager::new(5, 30, 100).unwrap()); + let backend = web::Data::new(linux_patch_api::packages::create_backend().unwrap()); + let cache_state = web::Data::new(PackageCacheState::new()); + let shared_crl_state = web::Data::new(crl::new_shared_state()); + let rl_cfg = RateLimitConfig::default(); + + let app = test::init_service( + App::new() + .wrap(RateLimitMiddleware::new(rl_cfg)) + .app_data(job_manager.clone()) + .app_data(backend.clone()) + .app_data(cache_state.clone()) + .app_data(shared_crl_state.clone()) + .configure(|cfg| { + configure_api_routes( + cfg, + job_manager.clone(), + backend.clone(), + cache_state.clone(), + ); + }) + .configure(configure_health_route), + ) + .await; + + // Health endpoint should always respond 200 regardless of rate limiting + for _ in 0..50 { + let req = test_request(actix_web::http::Method::GET, "/health").to_request(); + let resp = test::call_service(&app, req).await; + assert_eq!( + resp.status(), + 200, + "Health endpoint should be exempt from rate limiting" + ); + } +} + +#[actix_web::test] +async fn test_system_info_exempt_from_rate_limiting() { + let job_manager = web::Data::new(JobManager::new(5, 30, 100).unwrap()); + let backend = web::Data::new(linux_patch_api::packages::create_backend().unwrap()); + let cache_state = web::Data::new(PackageCacheState::new()); + let shared_crl_state = web::Data::new(crl::new_shared_state()); + let rl_cfg = RateLimitConfig::default(); + + let app = test::init_service( + App::new() + .wrap(RateLimitMiddleware::new(rl_cfg)) + .app_data(job_manager.clone()) + .app_data(backend.clone()) + .app_data(cache_state.clone()) + .app_data(shared_crl_state.clone()) + .configure(|cfg| { + configure_api_routes( + cfg, + job_manager.clone(), + backend.clone(), + cache_state.clone(), + ); + }) + .configure(configure_health_route), + ) + .await; + + // /api/v1/system/info should be exempt from rate limiting + for _ in 0..50 { + let req = test_request(actix_web::http::Method::GET, "/api/v1/system/info").to_request(); + let resp = test::call_service(&app, req).await; + // May return 200 or 500 depending on system, but should NOT be 429 + assert_ne!( + resp.status(), + 429, + "System info endpoint should be exempt from rate limiting" + ); + } +} + +#[actix_web::test] +async fn test_read_rate_limiting_returns_429() { + let job_manager = web::Data::new(JobManager::new(5, 30, 100).unwrap()); + let backend = web::Data::new(linux_patch_api::packages::create_backend().unwrap()); + let cache_state = web::Data::new(PackageCacheState::new()); + let shared_crl_state = web::Data::new(crl::new_shared_state()); + // Use very low limits so sequential test requests can reliably trigger 429 + let rl_cfg = RateLimitConfig { + enabled: true, + destructive_per_minute: 20, + destructive_burst: 10, + read_per_minute: 5, + read_burst: 3, + }; + + let app = test::init_service( + App::new() + .wrap(RateLimitMiddleware::new(rl_cfg)) + .app_data(job_manager.clone()) + .app_data(backend.clone()) + .app_data(cache_state.clone()) + .app_data(shared_crl_state.clone()) + .configure(|cfg| { + configure_api_routes( + cfg, + job_manager.clone(), + backend.clone(), + cache_state.clone(), + ); + }) + .configure(configure_health_route), + ) + .await; + + // Read tier: 120 req/min, burst 30 + // Send more than burst_size requests to trigger rate limiting + let mut rate_limited = false; + for _ in 0..50 { + let req = test_request(actix_web::http::Method::GET, "/api/v1/packages").to_request(); + let resp = test::call_service(&app, req).await; + if resp.status() == 429 { + rate_limited = true; + break; + } + } + assert!( + rate_limited, + "Read endpoint should return 429 after exceeding burst limit" + ); +} + +#[actix_web::test] +async fn test_destructive_rate_limiting_returns_429() { + let job_manager = web::Data::new(JobManager::new(5, 30, 100).unwrap()); + let backend = web::Data::new(linux_patch_api::packages::create_backend().unwrap()); + let cache_state = web::Data::new(PackageCacheState::new()); + let shared_crl_state = web::Data::new(crl::new_shared_state()); + // Use very low limits so sequential test requests can reliably trigger 429 + let rl_cfg = RateLimitConfig { + enabled: true, + destructive_per_minute: 5, + destructive_burst: 3, + read_per_minute: 120, + read_burst: 30, + }; + + let app = test::init_service( + App::new() + .wrap(RateLimitMiddleware::new(rl_cfg)) + .app_data(job_manager.clone()) + .app_data(backend.clone()) + .app_data(cache_state.clone()) + .app_data(shared_crl_state.clone()) + .configure(|cfg| { + configure_api_routes( + cfg, + job_manager.clone(), + backend.clone(), + cache_state.clone(), + ); + }) + .configure(configure_health_route), + ) + .await; + + // Destructive tier: 20 req/min, burst 10 + // Send more than burst_size requests to trigger rate limiting + let mut rate_limited = false; + for _ in 0..15 { + let req = test_request(actix_web::http::Method::POST, "/api/v1/packages") + .set_json(serde_json::json!({ + "packages": [{"name": "test-pkg"}], + "options": {} + })) + .to_request(); + let resp = test::call_service(&app, req).await; + if resp.status() == 429 { + rate_limited = true; + break; + } + } + assert!( + rate_limited, + "Destructive endpoint should return 429 after exceeding burst limit" + ); +} + +#[actix_web::test] +async fn test_rate_limiting_disabled() { + let job_manager = web::Data::new(JobManager::new(5, 30, 100).unwrap()); + let backend = web::Data::new(linux_patch_api::packages::create_backend().unwrap()); + let cache_state = web::Data::new(PackageCacheState::new()); + let shared_crl_state = web::Data::new(crl::new_shared_state()); + let rl_cfg = RateLimitConfig { + enabled: false, + ..RateLimitConfig::default() + }; + + let app = test::init_service( + App::new() + .wrap(RateLimitMiddleware::new(rl_cfg)) + .app_data(job_manager.clone()) + .app_data(backend.clone()) + .app_data(cache_state.clone()) + .app_data(shared_crl_state.clone()) + .configure(|cfg| { + configure_api_routes( + cfg, + job_manager.clone(), + backend.clone(), + cache_state.clone(), + ); + }) + .configure(configure_health_route), + ) + .await; + + // With rate limiting disabled, even excessive requests should not get 429 + let mut got_429 = false; + for _ in 0..50 { + let req = test_request(actix_web::http::Method::GET, "/api/v1/packages").to_request(); + let resp = test::call_service(&app, req).await; + if resp.status() == 429 { + got_429 = true; + break; + } + } + assert!( + !got_429, + "Should not get 429 when rate limiting is disabled" + ); +} + +#[actix_web::test] +async fn test_job_queue_depth_cap() { + // Create JobManager with very small queue depth (2) + let job_manager = JobManager::new(5, 30, 2).unwrap(); + + // Fill the queue with pending jobs + job_manager + .create_job(JobOperation::Install, vec!["pkg1".to_string()]) + .await + .unwrap(); + job_manager + .create_job(JobOperation::Install, vec!["pkg2".to_string()]) + .await + .unwrap(); + + // Queue should now be at capacity + assert!( + !job_manager.can_accept_job().await, + "Queue should be at capacity after filling to max_queue_depth" + ); +} + +#[actix_web::test] +async fn test_job_queue_depth_default() { + // Default max_queue_depth should be 100 + let job_manager = JobManager::new(5, 30, 100).unwrap(); + assert_eq!( + job_manager.max_queue_depth(), + 100, + "Default max_queue_depth should be 100" + ); +} + +#[actix_web::test] +async fn test_job_queue_depth_configurable() { + // Verify queue depth is configurable + let job_manager = JobManager::new(5, 30, 50).unwrap(); + assert_eq!( + job_manager.max_queue_depth(), + 50, + "max_queue_depth should be configurable" + ); +} + +#[actix_web::test] +async fn test_can_accept_job_respects_queue_depth() { + let job_manager = JobManager::new(5, 30, 3).unwrap(); + + // Should accept when queue is empty + assert!( + job_manager.can_accept_job().await, + "Should accept job when queue is empty" + ); + + // Fill to capacity + job_manager + .create_job(JobOperation::Install, vec!["a".to_string()]) + .await + .unwrap(); + job_manager + .create_job(JobOperation::Install, vec!["b".to_string()]) + .await + .unwrap(); + job_manager + .create_job(JobOperation::Install, vec!["c".to_string()]) + .await + .unwrap(); + + // Should reject when at capacity + assert!( + !job_manager.can_accept_job().await, + "Should reject job when queue is at capacity" + ); +} + +#[actix_web::test] +async fn test_rate_limit_config_defaults() { + let config = RateLimitConfig::default(); + assert!(config.enabled, "Rate limiting should be enabled by default"); + assert_eq!(config.destructive_per_minute, 20); + assert_eq!(config.destructive_burst, 10); + assert_eq!(config.read_per_minute, 120); + assert_eq!(config.read_burst, 30); +}