Private
Public Access
1
0

feat: implement proper WebSocket handler with actix-web-actors

- Replace stub websocket_handler with proper actix_web_actors::ws::start()
- Add WsJobActor that subscribes to JobManager broadcast channel
- Add broadcast::Sender/Receiver to JobManager for real-time status updates
- Emit JobStatusEvent on job state changes (create, update, complete, fail)
- Handle subscribe/unsubscribe client messages for per-job filtering
- Add 5-second heartbeat ping/pong for connection keepalive
- Properly compute Sec-WebSocket-Accept header per RFC 6455
This commit is contained in:
2026-05-04 15:19:44 +00:00
parent e8d568eb19
commit 385c675736
5 changed files with 564 additions and 132 deletions

View File

@ -1,13 +1,14 @@
//! Job Manager - Async job queue management
//!
//! Manages async job execution with concurrency limits and timeout enforcement.
//! Broadcasts job status events via tokio broadcast channel for WebSocket streaming.
use anyhow::Result;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, broadcast};
use uuid::Uuid;
/// Job status
@ -21,6 +22,20 @@ pub enum JobStatus {
TimedOut,
}
/// Convert JobStatus to lowercase string for WebSocket events
impl JobStatus {
pub fn as_str(&self) -> &'static str {
match self {
JobStatus::Pending => "pending",
JobStatus::Running => "running",
JobStatus::Completed => "completed",
JobStatus::Failed => "failed",
JobStatus::Cancelled => "cancelled",
JobStatus::TimedOut => "timed_out",
}
}
}
/// Job operation type
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum JobOperation {
@ -110,20 +125,35 @@ impl Job {
}
}
/// Job Manager - handles async job queue with limits
/// Job status event broadcast to WebSocket clients
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct JobStatusEvent {
pub event: String,
pub job_id: Uuid,
pub status: String,
pub progress: u8,
pub message: String,
pub timestamp: String,
}
/// Job Manager - handles async job queue with limits and WebSocket broadcast
pub struct JobManager {
max_concurrent: usize,
timeout_minutes: u64,
jobs: Arc<RwLock<HashMap<Uuid, Job>>>,
/// Broadcast sender for job status events
event_sender: broadcast::Sender<JobStatusEvent>,
}
impl JobManager {
/// Create a new job manager
pub fn new(max_concurrent: usize, timeout_minutes: u64) -> Result<Self> {
let (event_sender, _) = broadcast::channel(256);
Ok(Self {
max_concurrent,
timeout_minutes,
jobs: Arc::new(RwLock::new(HashMap::new())),
event_sender,
})
}
@ -137,13 +167,46 @@ impl JobManager {
self.max_concurrent
}
/// Subscribe to job status events
/// Returns a broadcast receiver that will receive JobStatusEvent messages
pub fn subscribe(&self) -> broadcast::Receiver<JobStatusEvent> {
self.event_sender.subscribe()
}
/// Emit a job status event to all subscribers
fn emit_event(
&self,
event_type: &str,
job_id: &Uuid,
status: &JobStatus,
progress: u8,
message: &str,
) {
let event = JobStatusEvent {
event: event_type.to_string(),
job_id: *job_id,
status: status.as_str().to_string(),
progress,
message: message.to_string(),
timestamp: Utc::now().to_rfc3339(),
};
// Ignore send errors (no receivers is fine)
let _ = self.event_sender.send(event);
}
/// Create a new job and return its ID
pub async fn create_job(&self, operation: JobOperation, packages: Vec<String>) -> Result<Uuid> {
let job = Job::new(operation, packages);
let job_id = job.id;
let status = job.status.clone();
let progress = job.progress;
let message = job.message.clone();
let mut jobs = self.jobs.write().await;
jobs.insert(job_id, job);
drop(jobs); // Release lock before emitting event
self.emit_event("job_status", &job_id, &status, progress, &message);
Ok(job_id)
}
@ -162,17 +225,28 @@ impl JobManager {
progress: Option<u8>,
message: Option<String>,
) -> Result<()> {
let mut jobs = self.jobs.write().await;
let event_data;
{
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(job_id) {
job.status = status;
if let Some(p) = progress {
job.progress = p;
if let Some(job) = jobs.get_mut(job_id) {
job.status = status;
if let Some(p) = progress {
job.progress = p;
}
if let Some(m) = message {
job.message = m;
}
job.updated_at = Utc::now();
event_data = Some((job.status.clone(), job.progress, job.message.clone()));
} else {
event_data = None;
}
if let Some(m) = message {
job.message = m;
}
job.updated_at = Utc::now();
} // Write lock dropped here
if let Some((status, progress, message)) = event_data {
self.emit_event("job_status", job_id, &status, progress, &message);
}
Ok(())
@ -191,10 +265,24 @@ impl JobManager {
/// Mark a job as completed
pub async fn complete_job(&self, job_id: &Uuid) -> Result<()> {
let mut jobs = self.jobs.write().await;
let event_data;
{
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(job_id) {
job.complete();
if let Some(job) = jobs.get_mut(job_id) {
job.complete();
event_data = Some((
job.status.clone(),
job.progress,
job.message.clone(),
));
} else {
event_data = None;
}
}
if let Some((status, progress, message)) = event_data {
self.emit_event("job_status", job_id, &status, progress, &message);
}
Ok(())
@ -202,10 +290,24 @@ impl JobManager {
/// Mark a job as failed
pub async fn fail_job(&self, job_id: &Uuid, error: String) -> Result<()> {
let mut jobs = self.jobs.write().await;
let event_data;
{
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(job_id) {
job.fail(error);
if let Some(job) = jobs.get_mut(job_id) {
job.fail(error);
event_data = Some((
job.status.clone(),
job.progress,
job.message.clone(),
));
} else {
event_data = None;
}
}
if let Some((status, progress, message)) = event_data {
self.emit_event("job_status", job_id, &status, progress, &message);
}
Ok(())
@ -308,6 +410,7 @@ impl Clone for JobManager {
max_concurrent: self.max_concurrent,
timeout_minutes: self.timeout_minutes,
jobs: self.jobs.clone(),
event_sender: self.event_sender.clone(),
}
}
}