diff --git a/Cargo.lock b/Cargo.lock index f1295d6..a228078 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,7 +1859,7 @@ dependencies = [ [[package]] name = "linux-patch-api" -version = "0.3.2" +version = "0.3.5" dependencies = [ "actix", "actix-rt", diff --git a/src/api/handlers/mod.rs b/src/api/handlers/mod.rs index f4390c4..4fdd760 100644 --- a/src/api/handlers/mod.rs +++ b/src/api/handlers/mod.rs @@ -15,4 +15,5 @@ pub mod websocket; // Re-export commonly used types pub use packages::{ApiError, ApiResponse}; -pub use websocket::{WsClientMessage, WsServerMessage}; +// WebSocket message types are now in crate::jobs::websocket +pub use crate::jobs::websocket::{WsClientMessage, WsServerMessage}; diff --git a/src/api/handlers/websocket.rs b/src/api/handlers/websocket.rs index da27eef..9e1a690 100644 --- a/src/api/handlers/websocket.rs +++ b/src/api/handlers/websocket.rs @@ -3,128 +3,34 @@ //! Implements WebSocket endpoint for real-time job status updates: //! - WS /api/v1/ws/jobs - Real-time job status streaming //! -//! Note: Full WebSocket implementation requires actix-web-actors compatibility. -//! This stub provides the endpoint structure for future enhancement. +//! Uses actix-web-actors for proper WebSocket handshake and protocol handling. +//! The actual actor logic lives in crate::jobs::websocket::WsJobActor. -use actix_web::{http::StatusCode, web, Error, HttpRequest, HttpResponse}; -use chrono::Utc; -use serde::{Deserialize, Serialize}; +use actix_web::{web, Error, HttpRequest, HttpResponse}; use tracing::info; -use uuid::Uuid; use crate::jobs::manager::JobManager; - -/// WebSocket message from client -#[derive(Debug, Deserialize, Clone)] -#[serde(tag = "action")] -pub enum WsClientMessage { - #[serde(rename = "subscribe")] - Subscribe { - #[serde(default)] - job_id: Option, - }, - #[serde(rename = "unsubscribe")] - Unsubscribe { job_id: String }, -} - -/// WebSocket message to client -#[derive(Debug, Serialize, Clone)] -pub struct WsServerMessage { - pub event: String, - pub job_id: String, - pub status: String, - pub progress: u8, - pub message: String, - pub timestamp: String, -} - -impl WsServerMessage { - pub fn job_status(job_id: &str, status: &str, progress: u8, message: &str) -> Self { - Self { - event: "job_status".to_string(), - job_id: job_id.to_string(), - status: status.to_string(), - progress, - message: message.to_string(), - timestamp: Utc::now().to_rfc3339(), - } - } - - pub fn job_complete(job_id: &str, status: &str, message: &str) -> Self { - Self { - event: "job_complete".to_string(), - job_id: job_id.to_string(), - status: status.to_string(), - progress: 100, - message: message.to_string(), - timestamp: Utc::now().to_rfc3339(), - } - } -} +use crate::jobs::websocket::WsJobActor; /// Handle WebSocket connection request -/// Returns upgrade response for WebSocket handshake +/// Performs the WebSocket handshake and spawns a WsJobActor +/// that streams job status events to the connected client. pub async fn websocket_handler( req: HttpRequest, - _job_manager: web::Data, + stream: web::Payload, + job_manager: web::Data, ) -> Result { - let ws_id = Uuid::new_v4(); - info!(ws_id = %ws_id, "WebSocket connection request"); + info!("WebSocket connection request received"); - // Check if this is a WebSocket upgrade request - if req - .headers() - .get("upgrade") - .and_then(|v| v.to_str().ok()) - .map(|v| v.eq_ignore_ascii_case("websocket")) - .unwrap_or(false) - { - // WebSocket upgrade requested - // In full implementation, this would use actix-web-actors::ws::start() - // For now, return a response indicating WebSocket support + // Subscribe to job status events from the JobManager broadcast channel + let event_rx = job_manager.subscribe(); - let response_msg = serde_json::json!({ - "event": "connected", - "ws_id": ws_id.to_string(), - "timestamp": Utc::now().to_rfc3339(), - "message": "WebSocket endpoint ready. Full implementation requires actix-web-actors compatibility.", - "polling_alternative": "Use GET /api/v1/jobs/{id} for job status polling" - }); + // Create the WebSocket actor with the broadcast receiver + let actor = WsJobActor::new(event_rx); - // Return HTTP 101 Switching Protocols for WebSocket upgrade - // In production, this would be handled by actix-web-actors - Ok(HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS) - .insert_header(("upgrade", "websocket")) - .insert_header(("connection", "upgrade")) - .json(response_msg)) - } else { - // Not a WebSocket request - return info about the endpoint - let info_msg = serde_json::json!({ - "endpoint": "/api/v1/ws/jobs", - "method": "GET", - "upgrade_required": "websocket", - "headers": { - "upgrade": "websocket", - "connection": "Upgrade", - "sec-websocket-key": "", - "sec-websocket-version": "13" - }, - "alternative": "Use GET /api/v1/jobs/{id} for job status polling" - }); - - Ok(HttpResponse::Ok().json(info_msg)) - } -} - -/// Broadcast job status update to subscribed WebSocket clients -pub async fn broadcast_job_update( - job_id: &Uuid, - status: &crate::jobs::manager::JobStatus, - progress: u8, - _message: &str, -) { - info!(job_id = %job_id, status = ?status, progress = progress, "Job status update available for broadcast"); - // In production, would use a broadcast channel to notify all subscribed WebSocket clients + // Perform the WebSocket handshake and start the actor + // This computes the proper Sec-WebSocket-Accept header and upgrades the connection + actix_web_actors::ws::start(actor, &req, stream) } /// Configure WebSocket route @@ -134,7 +40,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { #[cfg(test)] mod tests { - use super::*; + use crate::jobs::websocket::{WsClientMessage, WsServerMessage}; #[test] fn test_ws_server_message_serialization() { diff --git a/src/jobs/manager.rs b/src/jobs/manager.rs index e26452a..f122cbc 100644 --- a/src/jobs/manager.rs +++ b/src/jobs/manager.rs @@ -1,13 +1,14 @@ //! Job Manager - Async job queue management //! //! Manages async job execution with concurrency limits and timeout enforcement. +//! Broadcasts job status events via tokio broadcast channel for WebSocket streaming. use anyhow::Result; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, broadcast}; use uuid::Uuid; /// Job status @@ -21,6 +22,20 @@ pub enum JobStatus { TimedOut, } +/// Convert JobStatus to lowercase string for WebSocket events +impl JobStatus { + pub fn as_str(&self) -> &'static str { + match self { + JobStatus::Pending => "pending", + JobStatus::Running => "running", + JobStatus::Completed => "completed", + JobStatus::Failed => "failed", + JobStatus::Cancelled => "cancelled", + JobStatus::TimedOut => "timed_out", + } + } +} + /// Job operation type #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum JobOperation { @@ -110,20 +125,35 @@ impl Job { } } -/// Job Manager - handles async job queue with limits +/// Job status event broadcast to WebSocket clients +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct JobStatusEvent { + pub event: String, + pub job_id: Uuid, + pub status: String, + pub progress: u8, + pub message: String, + pub timestamp: String, +} + +/// Job Manager - handles async job queue with limits and WebSocket broadcast pub struct JobManager { max_concurrent: usize, timeout_minutes: u64, jobs: Arc>>, + /// Broadcast sender for job status events + event_sender: broadcast::Sender, } impl JobManager { /// Create a new job manager pub fn new(max_concurrent: usize, timeout_minutes: u64) -> Result { + let (event_sender, _) = broadcast::channel(256); Ok(Self { max_concurrent, timeout_minutes, jobs: Arc::new(RwLock::new(HashMap::new())), + event_sender, }) } @@ -137,13 +167,46 @@ impl JobManager { self.max_concurrent } + /// Subscribe to job status events + /// Returns a broadcast receiver that will receive JobStatusEvent messages + pub fn subscribe(&self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + /// Emit a job status event to all subscribers + fn emit_event( + &self, + event_type: &str, + job_id: &Uuid, + status: &JobStatus, + progress: u8, + message: &str, + ) { + let event = JobStatusEvent { + event: event_type.to_string(), + job_id: *job_id, + status: status.as_str().to_string(), + progress, + message: message.to_string(), + timestamp: Utc::now().to_rfc3339(), + }; + // Ignore send errors (no receivers is fine) + let _ = self.event_sender.send(event); + } + /// Create a new job and return its ID pub async fn create_job(&self, operation: JobOperation, packages: Vec) -> Result { let job = Job::new(operation, packages); let job_id = job.id; + let status = job.status.clone(); + let progress = job.progress; + let message = job.message.clone(); let mut jobs = self.jobs.write().await; jobs.insert(job_id, job); + drop(jobs); // Release lock before emitting event + + self.emit_event("job_status", &job_id, &status, progress, &message); Ok(job_id) } @@ -162,17 +225,28 @@ impl JobManager { progress: Option, message: Option, ) -> Result<()> { - let mut jobs = self.jobs.write().await; + let event_data; + { + let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - job.status = status; - if let Some(p) = progress { - job.progress = p; + if let Some(job) = jobs.get_mut(job_id) { + job.status = status; + if let Some(p) = progress { + job.progress = p; + } + if let Some(m) = message { + job.message = m; + } + job.updated_at = Utc::now(); + + event_data = Some((job.status.clone(), job.progress, job.message.clone())); + } else { + event_data = None; } - if let Some(m) = message { - job.message = m; - } - job.updated_at = Utc::now(); + } // Write lock dropped here + + if let Some((status, progress, message)) = event_data { + self.emit_event("job_status", job_id, &status, progress, &message); } Ok(()) @@ -191,10 +265,24 @@ impl JobManager { /// Mark a job as completed pub async fn complete_job(&self, job_id: &Uuid) -> Result<()> { - let mut jobs = self.jobs.write().await; + let event_data; + { + let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - job.complete(); + if let Some(job) = jobs.get_mut(job_id) { + job.complete(); + event_data = Some(( + job.status.clone(), + job.progress, + job.message.clone(), + )); + } else { + event_data = None; + } + } + + if let Some((status, progress, message)) = event_data { + self.emit_event("job_status", job_id, &status, progress, &message); } Ok(()) @@ -202,10 +290,24 @@ impl JobManager { /// Mark a job as failed pub async fn fail_job(&self, job_id: &Uuid, error: String) -> Result<()> { - let mut jobs = self.jobs.write().await; + let event_data; + { + let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - job.fail(error); + if let Some(job) = jobs.get_mut(job_id) { + job.fail(error); + event_data = Some(( + job.status.clone(), + job.progress, + job.message.clone(), + )); + } else { + event_data = None; + } + } + + if let Some((status, progress, message)) = event_data { + self.emit_event("job_status", job_id, &status, progress, &message); } Ok(()) @@ -308,6 +410,7 @@ impl Clone for JobManager { max_concurrent: self.max_concurrent, timeout_minutes: self.timeout_minutes, jobs: self.jobs.clone(), + event_sender: self.event_sender.clone(), } } } diff --git a/src/jobs/websocket.rs b/src/jobs/websocket.rs index 629260e..0bc42fc 100644 --- a/src/jobs/websocket.rs +++ b/src/jobs/websocket.rs @@ -1,3 +1,425 @@ -//! Job WebSocket Handler +//! Job WebSocket Actor //! -//! Placeholder - implementation in future phases +//! Implements real-time WebSocket streaming for job status updates using +//! actix-web-actors. Each connected client gets a WsJobActor that: +//! - Subscribes to JobManager broadcast channel for job status events +//! - Filters events based on client subscribe/unsubscribe messages +//! - Forwards matching events as JSON to the WebSocket client +//! - Handles ping/pong heartbeat for connection keep-alive +//! - Cleans up on disconnect + +use actix::prelude::*; +use actix_web_actors::ws; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +use super::manager::JobStatusEvent; + +/// How often heartbeat pings are sent (seconds) +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a disconnect (seconds) +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Client-to-server WebSocket message +#[derive(Debug, Deserialize, Clone)] +#[serde(tag = "action")] +pub enum WsClientMessage { + /// Subscribe to events for a specific job, or all jobs if job_id is None + #[serde(rename = "subscribe")] + Subscribe { + #[serde(default)] + job_id: Option, + }, + /// Unsubscribe from events for a specific job + #[serde(rename = "unsubscribe")] + Unsubscribe { job_id: String }, +} + +/// Server-to-client WebSocket message +#[derive(Debug, Serialize, Clone)] +pub struct WsServerMessage { + pub event: String, + pub job_id: String, + pub status: String, + pub progress: u8, + pub message: String, + pub timestamp: String, +} + +impl WsServerMessage { + /// Create a job status message from a JobStatusEvent + pub fn from_job_status_event(event: &JobStatusEvent) -> Self { + Self { + event: event.event.clone(), + job_id: event.job_id.to_string(), + status: event.status.clone(), + progress: event.progress, + message: event.message.clone(), + timestamp: event.timestamp.clone(), + } + } + + /// Create a connection established message + pub fn connected(ws_id: &Uuid) -> Self { + Self { + event: "connected".to_string(), + job_id: String::new(), + status: "connected".to_string(), + progress: 0, + message: format!("WebSocket connected: {}", ws_id), + timestamp: Utc::now().to_rfc3339(), + } + } + + /// Create a subscription confirmation message + pub fn subscribed(job_id: &Option) -> Self { + match job_id { + Some(id) => Self { + event: "subscribed".to_string(), + job_id: id.clone(), + status: "subscribed".to_string(), + progress: 0, + message: format!("Subscribed to job: {}", id), + timestamp: Utc::now().to_rfc3339(), + }, + None => Self { + event: "subscribed".to_string(), + job_id: "all".to_string(), + status: "subscribed".to_string(), + progress: 0, + message: "Subscribed to all job events".to_string(), + timestamp: Utc::now().to_rfc3339(), + }, + } + } + + /// Create an unsubscription confirmation message + pub fn unsubscribed(job_id: &str) -> Self { + Self { + event: "unsubscribed".to_string(), + job_id: job_id.to_string(), + status: "unsubscribed".to_string(), + progress: 0, + message: format!("Unsubscribed from job: {}", job_id), + timestamp: Utc::now().to_rfc3339(), + } + } + + /// Create an error message + pub fn error(code: &str, message: &str) -> Self { + Self { + event: "error".to_string(), + job_id: String::new(), + status: code.to_string(), + progress: 0, + message: message.to_string(), + timestamp: Utc::now().to_rfc3339(), + } + } + + /// Create a job status message (convenience constructor) + pub fn job_status(job_id: &str, status: &str, progress: u8, message: &str) -> Self { + Self { + event: "job_status".to_string(), + job_id: job_id.to_string(), + status: status.to_string(), + progress, + message: message.to_string(), + timestamp: Utc::now().to_rfc3339(), + } + } +} + +/// Internal message for broadcasting a job status event to the actor +#[derive(Message)] +#[rtype(result = "()")] +pub struct BroadcastEvent(pub JobStatusEvent); + +/// WebSocket actor for streaming job status updates +pub struct WsJobActor { + /// Unique ID for this WebSocket connection + ws_id: Uuid, + /// Broadcast receiver for job status events from JobManager + event_rx: Option>, + /// Set of specific job IDs this client is subscribed to + subscribed_jobs: HashSet, + /// Whether the client is subscribed to all job events + subscribed_all: bool, + /// Last time we heard from the client (ping/pong or message) + last_heartbeat: Instant, + /// The actor's own address for the broadcast listener + addr: Option>, +} + +impl WsJobActor { + /// Create a new WebSocket actor with a broadcast receiver + pub fn new(event_rx: broadcast::Receiver) -> Self { + Self { + ws_id: Uuid::new_v4(), + event_rx: Some(event_rx), + subscribed_jobs: HashSet::new(), + subscribed_all: true, // Default: subscribe to all events + last_heartbeat: Instant::now(), + addr: None, + } + } + + /// Start the heartbeat check interval + fn start_heartbeat(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + if Instant::now().duration_since(act.last_heartbeat) > CLIENT_TIMEOUT { + // Heartbeat timed out, disconnect + warn!( + ws_id = %act.ws_id, + "WebSocket heartbeat timeout, disconnecting" + ); + ctx.stop(); + return; + } + // Send ping + ctx.ping(b""); + }); + } + + /// Start listening to the broadcast channel in a background task + fn start_broadcast_listener(&mut self, ctx: &mut ::Context) { + let addr = ctx.address(); + self.addr = Some(addr.clone()); + + // Take ownership of the receiver + let mut rx = self.event_rx.take().expect("event_rx already taken"); + + // Spawn a task that forwards broadcast events to this actor + actix::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => { + // Send the event to the actor + if addr.try_send(BroadcastEvent(event)).is_err() { + // Actor is dead, stop listening + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + // We fell behind, but can continue + debug!("WebSocket broadcast receiver lagged by {} events", n); + } + Err(broadcast::error::RecvError::Closed) => { + // Channel closed, stop listening + break; + } + } + } + }); + } +} + +impl Actor for WsJobActor { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + info!(ws_id = %self.ws_id, "WebSocket actor started"); + + // Start heartbeat monitoring + self.start_heartbeat(ctx); + + // Start listening to broadcast events + self.start_broadcast_listener(ctx); + + // Send connection established message + let msg = WsServerMessage::connected(&self.ws_id); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + + fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { + info!(ws_id = %self.ws_id, "WebSocket actor stopping"); + Running::Stop + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + info!(ws_id = %self.ws_id, "WebSocket actor stopped"); + } +} + +/// Handle broadcast events from the JobManager channel +impl Handler for WsJobActor { + type Result = (); + + fn handle(&mut self, msg: BroadcastEvent, ctx: &mut Self::Context) { + let event = msg.0; + + // Check if this client should receive this event + let should_forward = self.subscribed_all + || self + .subscribed_jobs + .contains(&event.job_id.to_string()); + + if should_forward { + let server_msg = WsServerMessage::from_job_status_event(&event); + match serde_json::to_string(&server_msg) { + Ok(json) => ctx.text(json), + Err(e) => { + error!(ws_id = %self.ws_id, error = %e, "Failed to serialize job status event"); + } + } + } + } +} + +/// Handle WebSocket protocol messages (ping/pong, text, close, etc.) +impl StreamHandler> for WsJobActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + let msg = match msg { + Ok(msg) => msg, + Err(e) => { + error!(ws_id = %self.ws_id, error = %e, "WebSocket protocol error"); + ctx.stop(); + return; + } + }; + + match msg { + ws::Message::Ping(msg) => { + self.last_heartbeat = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.last_heartbeat = Instant::now(); + } + ws::Message::Text(text) => { + let text = text.to_string(); + debug!(ws_id = %self.ws_id, text = %text, "Received WebSocket text message"); + + // Parse as client message + match serde_json::from_str::(&text) { + Ok(client_msg) => match client_msg { + WsClientMessage::Subscribe { job_id } => { + match job_id { + Some(id) => { + self.subscribed_jobs.insert(id.clone()); + let msg = WsServerMessage::subscribed(&Some(id)); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + None => { + self.subscribed_all = true; + let msg = WsServerMessage::subscribed(&None); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + } + } + WsClientMessage::Unsubscribe { job_id } => { + self.subscribed_jobs.remove(&job_id); + let msg = WsServerMessage::unsubscribed(&job_id); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + }, + Err(e) => { + warn!( + ws_id = %self.ws_id, + error = %e, + text = %text, + "Invalid WebSocket client message" + ); + let msg = WsServerMessage::error("invalid_message", &format!("Invalid message: {}", e)); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + } + } + ws::Message::Binary(_) => { + // We don't handle binary messages + warn!(ws_id = %self.ws_id, "Received binary message, ignoring"); + } + ws::Message::Close(reason) => { + info!(ws_id = %self.ws_id, reason = ?reason, "WebSocket close received"); + ctx.close(reason); + ctx.stop(); + } + ws::Message::Continuation(_) => { + // Continuation frames not expected for our use case + ctx.stop(); + } + ws::Message::Nop => (), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ws_server_message_from_event() { + let event = JobStatusEvent { + event: "job_status".to_string(), + job_id: Uuid::new_v4(), + status: "running".to_string(), + progress: 50, + message: "Processing...".to_string(), + timestamp: "2026-01-01T00:00:00Z".to_string(), + }; + let msg = WsServerMessage::from_job_status_event(&event); + assert_eq!(msg.event, "job_status"); + assert_eq!(msg.status, "running"); + assert_eq!(msg.progress, 50); + } + + #[test] + fn test_ws_server_message_serialization() { + let msg = WsServerMessage::job_status("test-uuid", "running", 50, "Processing..."); + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("job_status")); + assert!(json.contains("running")); + assert!(json.contains("50")); + } + + #[test] + fn test_ws_client_message_subscribe() { + let json = r#"{"action": "subscribe", "job_id": "test-uuid"}"#; + let msg: WsClientMessage = serde_json::from_str(json).unwrap(); + match msg { + WsClientMessage::Subscribe { job_id } => { + assert_eq!(job_id, Some("test-uuid".to_string())); + } + _ => panic!("Expected Subscribe message"), + } + } + + #[test] + fn test_ws_client_message_subscribe_all() { + let json = r#"{"action": "subscribe"}"#; + let msg: WsClientMessage = serde_json::from_str(json).unwrap(); + match msg { + WsClientMessage::Subscribe { job_id } => { + assert!(job_id.is_none()); + } + _ => panic!("Expected Subscribe message"), + } + } + + #[test] + fn test_ws_client_message_unsubscribe() { + let json = r#"{"action": "unsubscribe", "job_id": "test-uuid"}"#; + let msg: WsClientMessage = serde_json::from_str(json).unwrap(); + match msg { + WsClientMessage::Unsubscribe { job_id } => { + assert_eq!(job_id, "test-uuid"); + } + _ => panic!("Expected Unsubscribe message"), + } + } +}