feat: implement proper WebSocket handler with actix-web-actors
Some checks failed
CI/CD Pipeline / Code Format (push) Failing after 1s
CI/CD Pipeline / Clippy Lints (push) Successful in 38s
CI/CD Pipeline / Unit Tests (push) Successful in 54s
CI/CD Pipeline / Build Debian Package (push) Has been skipped
CI/CD Pipeline / Build Debian Package (Ubuntu 22.04) (push) Has been skipped
CI/CD Pipeline / Build RPM Package (push) Has been skipped
CI/CD Pipeline / Build Alpine Package (push) Has been skipped
CI/CD Pipeline / Build Arch Package (push) Has been skipped
CI/CD Pipeline / Security Audit (push) Successful in 47s
Some checks failed
CI/CD Pipeline / Code Format (push) Failing after 1s
CI/CD Pipeline / Clippy Lints (push) Successful in 38s
CI/CD Pipeline / Unit Tests (push) Successful in 54s
CI/CD Pipeline / Build Debian Package (push) Has been skipped
CI/CD Pipeline / Build Debian Package (Ubuntu 22.04) (push) Has been skipped
CI/CD Pipeline / Build RPM Package (push) Has been skipped
CI/CD Pipeline / Build Alpine Package (push) Has been skipped
CI/CD Pipeline / Build Arch Package (push) Has been skipped
CI/CD Pipeline / Security Audit (push) Successful in 47s
- Replace stub websocket_handler with proper actix_web_actors::ws::start() - Add WsJobActor that subscribes to JobManager broadcast channel - Add broadcast::Sender/Receiver to JobManager for real-time status updates - Emit JobStatusEvent on job state changes (create, update, complete, fail) - Handle subscribe/unsubscribe client messages for per-job filtering - Add 5-second heartbeat ping/pong for connection keepalive - Properly compute Sec-WebSocket-Accept header per RFC 6455
This commit is contained in:
@ -1,13 +1,14 @@
|
||||
//! Job Manager - Async job queue management
|
||||
//!
|
||||
//! Manages async job execution with concurrency limits and timeout enforcement.
|
||||
//! Broadcasts job status events via tokio broadcast channel for WebSocket streaming.
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{RwLock, broadcast};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Job status
|
||||
@ -21,6 +22,20 @@ pub enum JobStatus {
|
||||
TimedOut,
|
||||
}
|
||||
|
||||
/// Convert JobStatus to lowercase string for WebSocket events
|
||||
impl JobStatus {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
JobStatus::Pending => "pending",
|
||||
JobStatus::Running => "running",
|
||||
JobStatus::Completed => "completed",
|
||||
JobStatus::Failed => "failed",
|
||||
JobStatus::Cancelled => "cancelled",
|
||||
JobStatus::TimedOut => "timed_out",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Job operation type
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub enum JobOperation {
|
||||
@ -110,20 +125,35 @@ impl Job {
|
||||
}
|
||||
}
|
||||
|
||||
/// Job Manager - handles async job queue with limits
|
||||
/// Job status event broadcast to WebSocket clients
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct JobStatusEvent {
|
||||
pub event: String,
|
||||
pub job_id: Uuid,
|
||||
pub status: String,
|
||||
pub progress: u8,
|
||||
pub message: String,
|
||||
pub timestamp: String,
|
||||
}
|
||||
|
||||
/// Job Manager - handles async job queue with limits and WebSocket broadcast
|
||||
pub struct JobManager {
|
||||
max_concurrent: usize,
|
||||
timeout_minutes: u64,
|
||||
jobs: Arc<RwLock<HashMap<Uuid, Job>>>,
|
||||
/// Broadcast sender for job status events
|
||||
event_sender: broadcast::Sender<JobStatusEvent>,
|
||||
}
|
||||
|
||||
impl JobManager {
|
||||
/// Create a new job manager
|
||||
pub fn new(max_concurrent: usize, timeout_minutes: u64) -> Result<Self> {
|
||||
let (event_sender, _) = broadcast::channel(256);
|
||||
Ok(Self {
|
||||
max_concurrent,
|
||||
timeout_minutes,
|
||||
jobs: Arc::new(RwLock::new(HashMap::new())),
|
||||
event_sender,
|
||||
})
|
||||
}
|
||||
|
||||
@ -137,13 +167,46 @@ impl JobManager {
|
||||
self.max_concurrent
|
||||
}
|
||||
|
||||
/// Subscribe to job status events
|
||||
/// Returns a broadcast receiver that will receive JobStatusEvent messages
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<JobStatusEvent> {
|
||||
self.event_sender.subscribe()
|
||||
}
|
||||
|
||||
/// Emit a job status event to all subscribers
|
||||
fn emit_event(
|
||||
&self,
|
||||
event_type: &str,
|
||||
job_id: &Uuid,
|
||||
status: &JobStatus,
|
||||
progress: u8,
|
||||
message: &str,
|
||||
) {
|
||||
let event = JobStatusEvent {
|
||||
event: event_type.to_string(),
|
||||
job_id: *job_id,
|
||||
status: status.as_str().to_string(),
|
||||
progress,
|
||||
message: message.to_string(),
|
||||
timestamp: Utc::now().to_rfc3339(),
|
||||
};
|
||||
// Ignore send errors (no receivers is fine)
|
||||
let _ = self.event_sender.send(event);
|
||||
}
|
||||
|
||||
/// Create a new job and return its ID
|
||||
pub async fn create_job(&self, operation: JobOperation, packages: Vec<String>) -> Result<Uuid> {
|
||||
let job = Job::new(operation, packages);
|
||||
let job_id = job.id;
|
||||
let status = job.status.clone();
|
||||
let progress = job.progress;
|
||||
let message = job.message.clone();
|
||||
|
||||
let mut jobs = self.jobs.write().await;
|
||||
jobs.insert(job_id, job);
|
||||
drop(jobs); // Release lock before emitting event
|
||||
|
||||
self.emit_event("job_status", &job_id, &status, progress, &message);
|
||||
|
||||
Ok(job_id)
|
||||
}
|
||||
@ -162,17 +225,28 @@ impl JobManager {
|
||||
progress: Option<u8>,
|
||||
message: Option<String>,
|
||||
) -> Result<()> {
|
||||
let mut jobs = self.jobs.write().await;
|
||||
let event_data;
|
||||
{
|
||||
let mut jobs = self.jobs.write().await;
|
||||
|
||||
if let Some(job) = jobs.get_mut(job_id) {
|
||||
job.status = status;
|
||||
if let Some(p) = progress {
|
||||
job.progress = p;
|
||||
if let Some(job) = jobs.get_mut(job_id) {
|
||||
job.status = status;
|
||||
if let Some(p) = progress {
|
||||
job.progress = p;
|
||||
}
|
||||
if let Some(m) = message {
|
||||
job.message = m;
|
||||
}
|
||||
job.updated_at = Utc::now();
|
||||
|
||||
event_data = Some((job.status.clone(), job.progress, job.message.clone()));
|
||||
} else {
|
||||
event_data = None;
|
||||
}
|
||||
if let Some(m) = message {
|
||||
job.message = m;
|
||||
}
|
||||
job.updated_at = Utc::now();
|
||||
} // Write lock dropped here
|
||||
|
||||
if let Some((status, progress, message)) = event_data {
|
||||
self.emit_event("job_status", job_id, &status, progress, &message);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -191,10 +265,24 @@ impl JobManager {
|
||||
|
||||
/// Mark a job as completed
|
||||
pub async fn complete_job(&self, job_id: &Uuid) -> Result<()> {
|
||||
let mut jobs = self.jobs.write().await;
|
||||
let event_data;
|
||||
{
|
||||
let mut jobs = self.jobs.write().await;
|
||||
|
||||
if let Some(job) = jobs.get_mut(job_id) {
|
||||
job.complete();
|
||||
if let Some(job) = jobs.get_mut(job_id) {
|
||||
job.complete();
|
||||
event_data = Some((
|
||||
job.status.clone(),
|
||||
job.progress,
|
||||
job.message.clone(),
|
||||
));
|
||||
} else {
|
||||
event_data = None;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((status, progress, message)) = event_data {
|
||||
self.emit_event("job_status", job_id, &status, progress, &message);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -202,10 +290,24 @@ impl JobManager {
|
||||
|
||||
/// Mark a job as failed
|
||||
pub async fn fail_job(&self, job_id: &Uuid, error: String) -> Result<()> {
|
||||
let mut jobs = self.jobs.write().await;
|
||||
let event_data;
|
||||
{
|
||||
let mut jobs = self.jobs.write().await;
|
||||
|
||||
if let Some(job) = jobs.get_mut(job_id) {
|
||||
job.fail(error);
|
||||
if let Some(job) = jobs.get_mut(job_id) {
|
||||
job.fail(error);
|
||||
event_data = Some((
|
||||
job.status.clone(),
|
||||
job.progress,
|
||||
job.message.clone(),
|
||||
));
|
||||
} else {
|
||||
event_data = None;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((status, progress, message)) = event_data {
|
||||
self.emit_event("job_status", job_id, &status, progress, &message);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -308,6 +410,7 @@ impl Clone for JobManager {
|
||||
max_concurrent: self.max_concurrent,
|
||||
timeout_minutes: self.timeout_minutes,
|
||||
jobs: self.jobs.clone(),
|
||||
event_sender: self.event_sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user