Private
Public Access
1
0

feat: add rate limiting and job queue depth cap (closes #15)
Some checks failed
CI/CD Pipeline / Code Format (push) Successful in 3s
CI/CD Pipeline / Clippy Lints (push) Successful in 45s
CI/CD Pipeline / All Unit Tests (push) Successful in 1m24s
CI/CD Pipeline / Security Audit (push) Successful in 4s
CI/CD Pipeline / Enrollment Tests (push) Successful in 1m14s
CI/CD Pipeline / Build Debian Package (Ubuntu 22.04) (push) Failing after 4s
CI/CD Pipeline / Verify Enrollment CLI Flag (push) Successful in 1m0s
CI/CD Pipeline / Build Debian Package (push) Failing after 5s
CI/CD Pipeline / Build Arch Package (push) Successful in 2m24s
CI/CD Pipeline / Build RPM Package (push) Successful in 2m15s
CI/CD Pipeline / Build Alpine Package (push) Failing after 3m19s

- Add custom RateLimitMiddleware using governor crate for per-IP rate limiting
- Two-tier rate limiting: destructive (20 req/min, burst 10) and read (120 req/min, burst 30)
- Health endpoints (/health, /api/v1/system/info) exempt from rate limiting
- Add max_queue_depth to JobManager (default: 100, configurable via config.yaml)
- Return 429 Too Many Requests with Retry-After header when queue is full
- Add RateLimitConfig to config.yaml with all rate limit settings
- Add 10 tests covering rate limiting, queue depth, and configuration defaults

Co-authored-by: git-echo <git-echo@moon-dragon.us>
This commit is contained in:
Draco-Lunaris-Echo
2026-06-06 15:39:49 -05:00
committed by GitHub
parent 6a4c4c95a4
commit df2f4c70c9
14 changed files with 849 additions and 31 deletions

View File

@ -190,6 +190,19 @@ pub async fn rollback_job(
info!(request_id = %request_id, job_id = %job_id_str, "Initiating job rollback");
// Check job queue capacity
if !job_manager.can_accept_job().await {
let response = ApiResponse::<()>::error(
"QUEUE_FULL",
"Job queue is at capacity. Please retry later.",
None,
true,
);
return HttpResponse::TooManyRequests()
.insert_header(("Retry-After", "60"))
.json(response);
}
// Parse job ID
let job_id = match Uuid::parse_str(&job_id_str) {
Ok(id) => id,
@ -321,7 +334,7 @@ pub async fn delete_job(
}
}
/// Configure routes for job endpoints
/// Configure all job routes
pub fn configure_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/jobs")

View File

@ -252,6 +252,19 @@ pub async fn install_packages(
info!(request_id = %request_id, packages = ?package_names, "Installing packages");
// Check job queue capacity
if !job_manager.can_accept_job().await {
let response = ApiResponse::<()>::error(
"QUEUE_FULL",
"Job queue is at capacity. Please retry later.",
None,
true,
);
return HttpResponse::TooManyRequests()
.insert_header(("Retry-After", "60"))
.json(response);
}
// Create async job
match job_manager
.create_job(JobOperation::Install, package_names.clone())
@ -337,6 +350,19 @@ pub async fn update_package(
info!(request_id = %request_id, package = %package_name, "Updating package");
// Check job queue capacity
if !job_manager.can_accept_job().await {
let response = ApiResponse::<()>::error(
"QUEUE_FULL",
"Job queue is at capacity. Please retry later.",
None,
true,
);
return HttpResponse::TooManyRequests()
.insert_header(("Retry-After", "60"))
.json(response);
}
// Create async job
match job_manager
.create_job(JobOperation::Update, vec![package_name.clone()])
@ -420,6 +446,20 @@ pub async fn remove_package(
}
info!(request_id = %request_id, package = %package_name, "Removing package");
// Check job queue capacity
if !job_manager.can_accept_job().await {
let response = ApiResponse::<()>::error(
"QUEUE_FULL",
"Job queue is at capacity. Please retry later.",
None,
true,
);
return HttpResponse::TooManyRequests()
.insert_header(("Retry-After", "60"))
.json(response);
}
match job_manager
.create_job(JobOperation::Remove, vec![package_name.clone()])
.await
@ -484,7 +524,7 @@ pub async fn remove_package(
}
}
/// Configure routes for package endpoints
/// Configure all package routes
pub fn configure_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/packages")

View File

@ -105,6 +105,19 @@ pub async fn apply_patches(
"Applying patches"
);
// Check job queue capacity
if !job_manager.can_accept_job().await {
let response = ApiResponse::<()>::error(
"QUEUE_FULL",
"Job queue is at capacity. Please retry later.",
None,
true,
);
return HttpResponse::TooManyRequests()
.insert_header(("Retry-After", "60"))
.json(response);
}
// Create async job
let package_list = body.packages.clone().unwrap_or_default();
match job_manager
@ -321,7 +334,7 @@ pub async fn apply_patches(
}
}
/// Configure routes for patch endpoints
/// Configure all patch routes
pub fn configure_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/patches")

View File

@ -229,6 +229,19 @@ pub async fn reboot_system(
}
}
// Check job queue capacity
if !job_manager.can_accept_job().await {
let response = ApiResponse::<()>::error(
"QUEUE_FULL",
"Job queue is at capacity. Please retry later.",
None,
true,
);
return HttpResponse::TooManyRequests()
.insert_header(("Retry-After", "60"))
.json(response);
}
// Create async job for reboot
match job_manager.create_job(JobOperation::Reboot, vec![]).await {
Ok(job_id) => {

View File

@ -8,6 +8,7 @@
//! - WebSocket endpoint for real-time job status streaming
pub mod handlers;
pub mod rate_limit;
pub mod routes;
// Re-export handlers for convenience

209
src/api/rate_limit.rs Normal file
View File

@ -0,0 +1,209 @@
//! Rate Limiting Middleware
//!
//! Custom Actix-web middleware that provides per-IP rate limiting with two tiers:
//! - **Destructive tier**: POST/PUT/DELETE methods (20 req/min, burst 10 by default)
//! - **Read tier**: GET methods (120 req/min, burst 30 by default)
//! - **Health exempt**: /health, /api/v1/system/info bypass rate limiting entirely
use actix_governor::governor::clock::{Clock, DefaultClock};
use actix_governor::governor::middleware::NoOpMiddleware;
use actix_governor::governor::state::keyed::DefaultKeyedStateStore;
use actix_governor::governor::{Quota, RateLimiter};
use actix_web::body::BoxBody;
use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform};
use actix_web::http::Method;
use actix_web::{HttpResponse, ResponseError};
use std::future::{ready, Ready};
use std::net::IpAddr;
use std::num::NonZeroU32;
use std::sync::Arc;
use tracing::info;
use crate::config::loader::RateLimitConfig;
/// Paths exempt from rate limiting
const EXEMPT_PATHS: &[&str] = &["/health", "/api/v1/system/info"];
/// Rate limiting middleware factory
pub struct RateLimitMiddleware {
config: RateLimitConfig,
}
impl RateLimitMiddleware {
pub fn new(config: RateLimitConfig) -> Self {
Self { config }
}
}
/// Error returned when rate limit is exceeded
#[derive(Debug)]
pub struct RateLimitError {
retry_after_secs: u64,
}
impl std::fmt::Display for RateLimitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Rate limit exceeded. Retry after {} seconds.",
self.retry_after_secs
)
}
}
impl ResponseError for RateLimitError {
fn status_code(&self) -> actix_web::http::StatusCode {
actix_web::http::StatusCode::TOO_MANY_REQUESTS
}
fn error_response(&self) -> HttpResponse {
HttpResponse::TooManyRequests()
.insert_header(("Retry-After", self.retry_after_secs.to_string()))
.content_type("text/plain; charset=utf-8")
.body(self.to_string())
}
}
/// Type alias for per-IP rate limiter
pub type KeyedRateLimiter =
RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock, NoOpMiddleware>;
/// Shared rate limiter state
#[derive(Clone)]
pub struct RateLimiters {
/// Rate limiter for destructive operations (POST/PUT/DELETE)
destructive: Arc<KeyedRateLimiter>,
/// Rate limiter for read operations (GET)
read: Arc<KeyedRateLimiter>,
/// Whether rate limiting is enabled
enabled: bool,
}
impl RateLimiters {
/// Build rate limiters from configuration
pub fn new(config: &RateLimitConfig) -> Self {
let destructive_quota =
Quota::per_minute(NonZeroU32::new(config.destructive_per_minute).unwrap())
.allow_burst(NonZeroU32::new(config.destructive_burst).unwrap());
let read_quota = Quota::per_minute(NonZeroU32::new(config.read_per_minute).unwrap())
.allow_burst(NonZeroU32::new(config.read_burst).unwrap());
let destructive = Arc::new(KeyedRateLimiter::keyed(destructive_quota));
let read = Arc::new(KeyedRateLimiter::keyed(read_quota));
info!(
enabled = config.enabled,
destructive_per_min = config.destructive_per_minute,
destructive_burst = config.destructive_burst,
read_per_min = config.read_per_minute,
read_burst = config.read_burst,
"Rate limiters configured"
);
Self {
destructive,
read,
enabled: config.enabled,
}
}
/// Check if a request should be rate limited
/// Returns Ok(()) if the request is allowed, Err(RateLimitError) if rate limited
pub fn check(
&self,
method: &Method,
path: &str,
peer_ip: IpAddr,
) -> Result<(), RateLimitError> {
if !self.enabled {
return Ok(());
}
// Exempt paths bypass rate limiting entirely
if EXEMPT_PATHS.contains(&path) {
return Ok(());
}
let limiter = match *method {
Method::POST | Method::PUT | Method::DELETE => &self.destructive,
Method::GET => &self.read,
_ => &self.read, // Default to read tier for other methods
};
match limiter.check_key(&peer_ip) {
Ok(()) => Ok(()),
Err(negative) => {
let retry_after = negative
.wait_time_from(DefaultClock::default().now())
.as_secs();
Err(RateLimitError {
retry_after_secs: retry_after.max(1),
})
}
}
}
}
impl<S> Transform<S, ServiceRequest> for RateLimitMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<BoxBody>, Error = actix_web::Error>,
S::Future: 'static,
{
type Response = ServiceResponse<BoxBody>;
type Error = actix_web::Error;
type Transform = RateLimitService<S>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(RateLimitService {
service,
limiters: RateLimiters::new(&self.config),
}))
}
}
/// Rate limiting service wrapper
pub struct RateLimitService<S> {
service: S,
limiters: RateLimiters,
}
impl<S> Service<ServiceRequest> for RateLimitService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<BoxBody>, Error = actix_web::Error>,
S::Future: 'static,
{
type Response = ServiceResponse<BoxBody>;
type Error = actix_web::Error;
type Future =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>>>>;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
// Extract peer IP
let peer_ip = req
.connection_info()
.peer_addr()
.and_then(|addr| addr.parse::<IpAddr>().ok());
// Check rate limiting
if let Some(ip) = peer_ip {
let method = req.method().clone();
let path = req.path().to_string();
if let Err(e) = self.limiters.check(&method, &path, ip) {
// Rate limited - return 429 response
let (http_req, _) = req.into_parts();
let response = e.error_response();
let srv_resp = ServiceResponse::new(http_req, response);
return Box::pin(ready(Ok(srv_resp)));
}
}
// Not rate limited - pass through to the inner service
Box::pin(self.service.call(req))
}
}

View File

@ -1,6 +1,11 @@
//! API Routes Configuration
//!
//! Aggregates all endpoint routes and configures the Actix-web application.
//! Rate limiting is applied at the App level in main.rs using actix-governor
//! with method-based filtering:
//! - **Read tier** (120 req/min, burst 30): GET methods
//! - **Destructive tier** (20 req/min, burst 10): POST/PUT/DELETE methods
//! - **Health exempt**: /health, /api/v1/system/info (health-exempt routes)
use actix_web::{web, HttpResponse};
use tracing::info;
@ -17,6 +22,7 @@ async fn method_not_allowed() -> HttpResponse {
.insert_header(("Allow", "GET, POST, PUT, DELETE"))
.finish()
}
/// Configure all API routes for the application
pub fn configure_api_routes(
cfg: &mut web::ServiceConfig,
@ -26,6 +32,10 @@ pub fn configure_api_routes(
) {
info!("Configuring API v1 routes");
// Health-exempt endpoint: /api/v1/system/info is registered separately
// so it can bypass rate limiting applied at the App level
cfg.service(web::resource("/api/v1/system/info").route(web::get().to(system::get_system_info)));
cfg.app_data(job_manager)
.app_data(backend)
.app_data(cache_state)
@ -33,15 +43,10 @@ pub fn configure_api_routes(
web::scope("/api/v1")
// VULN-005: Default handler for unsupported methods returns 405 instead of 404
.default_service(web::route().to(method_not_allowed))
// Package Management Endpoints
.configure(packages::configure_routes)
// Patch Management Endpoints
.configure(patches::configure_routes)
// System Management Endpoints
.configure(system::configure_routes)
// Job Management Endpoints
.configure(jobs::configure_routes)
// WebSocket Endpoint
.configure(websocket::configure_routes),
);
}

View File

@ -60,12 +60,58 @@ pub struct JobsConfig {
pub timeout_minutes: u64,
#[serde(default = "default_storage_path")]
pub storage_path: String,
#[serde(default = "default_max_queue_depth")]
pub max_queue_depth: usize,
}
fn default_storage_path() -> String {
"/var/lib/linux_patch_api/jobs".to_string()
}
fn default_max_queue_depth() -> usize {
100
}
/// Rate limiting configuration
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RateLimitConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_destructive_per_minute")]
pub destructive_per_minute: u32,
#[serde(default = "default_destructive_burst")]
pub destructive_burst: u32,
#[serde(default = "default_read_per_minute")]
pub read_per_minute: u32,
#[serde(default = "default_read_burst")]
pub read_burst: u32,
}
fn default_destructive_per_minute() -> u32 {
20
}
fn default_destructive_burst() -> u32 {
10
}
fn default_read_per_minute() -> u32 {
120
}
fn default_read_burst() -> u32 {
30
}
impl Default for RateLimitConfig {
fn default() -> Self {
Self {
enabled: true,
destructive_per_minute: default_destructive_per_minute(),
destructive_burst: default_destructive_burst(),
read_per_minute: default_read_per_minute(),
read_burst: default_read_burst(),
}
}
}
/// Logging configuration
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LoggingConfig {
@ -445,6 +491,8 @@ pub struct AppConfig {
pub package_manager: Option<PackageManagerConfig>,
#[serde(default)]
pub enrollment: Option<EnrollmentConfig>,
#[serde(default)]
pub rate_limit: RateLimitConfig,
}
impl AppConfig {

View File

@ -6,6 +6,6 @@
//! - Auto-reload on file change via notify watcher
pub mod loader;
pub use loader::{validate_certs, AppConfig, CertStatus, EnrollmentConfig};
pub use loader::{validate_certs, AppConfig, CertStatus, EnrollmentConfig, RateLimitConfig};
pub mod validator;
pub mod watcher;

View File

@ -140,6 +140,7 @@ pub struct JobStatusEvent {
pub struct JobManager {
max_concurrent: usize,
timeout_minutes: u64,
max_queue_depth: usize,
jobs: Arc<RwLock<HashMap<Uuid, Job>>>,
/// Broadcast sender for job status events
event_sender: broadcast::Sender<JobStatusEvent>,
@ -147,11 +148,16 @@ pub struct JobManager {
impl JobManager {
/// Create a new job manager
pub fn new(max_concurrent: usize, timeout_minutes: u64) -> Result<Self> {
pub fn new(
max_concurrent: usize,
timeout_minutes: u64,
max_queue_depth: usize,
) -> Result<Self> {
let (event_sender, _) = broadcast::channel(256);
Ok(Self {
max_concurrent,
timeout_minutes,
max_queue_depth,
jobs: Arc::new(RwLock::new(HashMap::new())),
event_sender,
})
@ -167,6 +173,11 @@ impl JobManager {
self.max_concurrent
}
/// Get max queue depth
pub fn max_queue_depth(&self) -> usize {
self.max_queue_depth
}
/// Subscribe to job status events
/// Returns a broadcast receiver that will receive JobStatusEvent messages
pub fn subscribe(&self) -> broadcast::Receiver<JobStatusEvent> {
@ -335,9 +346,17 @@ impl JobManager {
.count()
}
/// Check if can accept new job (respecting max_concurrent)
/// Check if can accept new job (respecting max_queue_depth)
/// Returns false when the total number of pending + running jobs
/// equals or exceeds the configured queue depth cap.
pub async fn can_accept_job(&self) -> bool {
self.running_count().await < self.max_concurrent
let jobs = self.jobs.read().await;
let active_count = jobs
.values()
.filter(|j| j.status == JobStatus::Running || j.status == JobStatus::Pending)
.count();
drop(jobs);
active_count < self.max_queue_depth
}
/// Delete a completed/failed job from history
@ -401,6 +420,7 @@ impl Clone for JobManager {
Self {
max_concurrent: self.max_concurrent,
timeout_minutes: self.timeout_minutes,
max_queue_depth: self.max_queue_depth,
jobs: self.jobs.clone(),
event_sender: self.event_sender.clone(),
}

View File

@ -252,10 +252,15 @@ async fn main() -> Result<()> {
}
// Initialize job manager
let job_manager = JobManager::new(config.jobs.max_concurrent, config.jobs.timeout_minutes)?;
let job_manager = JobManager::new(
config.jobs.max_concurrent,
config.jobs.timeout_minutes,
config.jobs.max_queue_depth,
)?;
info!(
max_jobs = config.jobs.max_concurrent,
timeout_minutes = config.jobs.timeout_minutes,
max_queue_depth = config.jobs.max_queue_depth,
"Job manager initialized"
);
@ -311,35 +316,36 @@ async fn main() -> Result<()> {
// Clone whitelist manager for use inside the HttpServer closure
let wl = whitelist_manager.clone();
// Clone rate limit config for use inside the HttpServer closure
let rate_limit_config = config.rate_limit.clone();
// Create server builder
// Security middleware stack (order matters):
// 1. WhitelistMiddleware — IP-based access control (deny-by-default)
// 2. SecurityHeadersMiddleware — VULN-006: reject duplicate critical headers
// 3. Logger — request logging (after auth decisions)
// 3. RateLimitMiddleware — per-IP rate limiting (read + destructive tiers)
// 4. Logger — request logging (after auth decisions)
let server_builder = HttpServer::new(move || {
let mut app = App::new()
App::new()
.wrap(WhitelistMiddleware::new(wl.clone()))
.wrap(SecurityHeadersMiddleware::new())
.wrap(linux_patch_api::api::rate_limit::RateLimitMiddleware::new(
rate_limit_config.clone(),
))
.wrap(Logger::default())
.app_data(job_manager_data.clone())
.app_data(backend_data.clone())
.app_data(cache_state.clone())
.app_data(crl_state_data.clone());
// Configure API routes
app = app.configure(|cfg| {
configure_api_routes(
cfg,
job_manager_data.clone(),
backend_data.clone(),
cache_state.clone(),
);
});
// Configure health route (outside API scope)
app = app.configure(configure_health_route);
app
.app_data(crl_state_data.clone())
.configure(|cfg| {
configure_api_routes(
cfg,
job_manager_data.clone(),
backend_data.clone(),
cache_state.clone(),
);
})
.configure(configure_health_route)
})
.workers(4)
// VULN-004: Configure header size limit to 8KB to prevent DoS via oversized headers