From c44045db387139e55b74d24dc156ee1cd75118dd Mon Sep 17 00:00:00 2001 From: Echo Date: Mon, 4 May 2026 15:19:44 +0000 Subject: [PATCH] feat: implement proper WebSocket handler with actix-web-actors - Replace stub websocket_handler with proper actix_web_actors::ws::start() - Add WsJobActor that subscribes to JobManager broadcast channel - Add broadcast::Sender/Receiver to JobManager for real-time status updates - Emit JobStatusEvent on job state changes (create, update, complete, fail) - Handle subscribe/unsubscribe client messages for per-job filtering - Add 5-second heartbeat ping/pong for connection keepalive - Properly compute Sec-WebSocket-Accept header per RFC 6455 --- .a0proj/audit.db | Bin 1699840 -> 1830912 bytes Cargo.lock | 2 +- src/api/handlers/mod.rs | 3 +- src/api/handlers/websocket.rs | 128 ++-------- src/jobs/manager.rs | 137 +++++++++-- src/jobs/websocket.rs | 426 +++++++++++++++++++++++++++++++++- 6 files changed, 564 insertions(+), 132 deletions(-) diff --git a/.a0proj/audit.db b/.a0proj/audit.db index de1374a3e7ae44a853ac299138079d88d85e28a8..b2ee6209134d9d037d74eb75f49dbd5b84ca6a0f 100644 GIT binary patch delta 47971 zcmeHw34B}CnQtub&XsptQq~)X;3bi*U9yvqIF6GTlGuq8wm7wA>Dm^tB_qiRW*HIo ztw2c0X_?YMx0x=qFm#&I4e0i|&;h2~v@>m?v$S-&K;OJFzt?v9{@-`*m9At5LYJ9& z^Q*y*bnjWe^X=z5-?{#U$?IQ8+`GJVddZTaCr_wbil!eMdv{I6K=sKb^yH*cdqZvY zN2?#J{$TZARKK(O!RohGpM3m8?W@-Fi#|I!;c6{CxjE2O^6D+S<0l`c z4>$kQjQnuZPp9RF8^3!*|8VFE{Xm9mv9BUpajfEw$1Apew*2ISyJ|AjX?@A+lA^xBJwvYVs9MtM041x$3H_BPT0{K3QM&LeMvD)viieV#rv^>w^nA9;*~{vLkAAJ_QWrZo4?GMzf7CI95H{n!u%y>{xU^h zlKoc<^oNr}ljbiIxi7`ZA??PWUViIOFE7Hc1;54kEx~Uoe#`J%j^7IWR^qn`zf19} z;Ma=ZYW&vVw-_^rop1AZIv+l1d{{MztqKl9Ve9gY3%#qK4+C0|}rB#Pq2-za*j z_&;ivF8$V$|83n>@^3YVtA9{lS@crnXDa@W<)fnIA;|^n}CX?m!?qG42jcQbBjH+ZFO_VW-DG9(4MAfo^BWI-2uPH>DJsnr_bf`J3}tN=8S~AsPFZ<10LVBc=J0Q?OfbF5%PP+ z1LIC_aNO_o1tT8kxH}wmjt3%WX*?VZj0b1VeE*$}b=D1m^2?W04Ap$A=0i0%*Nl{3 zUgItQU`+0?r!Hqw;!W)YXRqYDByGYJ-%^Ix6keJdEIfE&P3GZ^=b?t zZwN3AxSZXg?jT_5_4z#!f6yE79-)H4Ks4&^c14`whz}F-`vcCfFWBvL1%knF(A%x~ z+@UL|pca`Jzi=Ytayea5Ulc%fxt-nJA-6N!t)VZUI~a0%rl@4tYT3U;)Q4dUMYE+wu#bfE%QB9qQU8S}qwVA}x zSbS3K+h}Pn+90mqXldNGK@G>F>V7R8y(Ae+YeV7mRH|J)mPj7aqMa=ps#N`_Wxtl1 zn@*>C%+H+Y+L8U|nA#PCclxlp0o3+H5!(PHXs7`;M(K9iCK=x;nj`esv;|RQm?H z`nHRjPdE21Hvy^N)32trR2nVvRE2un+FZM1;L`r>Dh3jso~8vjIs21cB`=@8`*u;+ zS<|pg&Az9Z$t7Ad5+8}L70<6%8XJ2j(ppm8o?D6?v8%*!VRKgvg(FA8lbSj>EAIG9 z>q?;>wXP7gElVquYkAUb9$%-+CqDg4dvl8pxU6tduN{eBtIl~mE}z=w@ld%3j#^hP zUr@lu1-Kz9P?J|+BrfV+wX2&5qiDrbG)>bqIyx4^qhsL|AS6*q9TUI(j-#n$Yy&EFYuB)QMzbMomv?BK#&xle3c{w05m}$y7M4UV;_@7A;Co33HQ)L|To6 z=Te%A-o(oH+Z^Wuan_uz0$+ zvQZ3u(^lTDCgeY2X)Ue> zJZC=DY^$-9uT}RB9_a58TYqd@nbEGoyA*Meyb*LVZcHSmrxV98;*q$=t*#}~Nhf0w zUJ-RFj7sAgS}vS35Mf?|P>m##;7@aLt`nY)rRTAd9uL}5LE}S7@uQxa`syBa8?Akc z$Eu00@6ds0!N~y6p_~7(3YJ2tCdlP%LV?)W<(b%*$sim@w(4x_zW9`vM zEHXt1rN%WanqvMiN?%jJ1(cB+R@-p@0d%`T#lJi0-@aXgm+al3YUxO4JH{B$8ziP) z)1wZDXADLO0wRWAhC)Gg?fpabe(}Ap*&NI8J%ma`t$Pj*AHYJX;qhq=d{iW!s%l=A z1FT$k=doB+1L1nQ#hRO}4b5H`Rws58y!XJ&?B;ZGP8*4P-Qx5OR-4`H!Mj9Si=_K# z*^IYd@&0ADRYgAWrDe92MSgLj-@2?QAhsT{E-wm-Rm*MmqL6s8->RzJ5J?i)U?kJ& z*o>yP;d7}Yai3f4-C^yr_&g%B!@Abu^NRnw+_u{8qjv<_{`k>YG7+EA;%Phy2=@xx zdFbKd6*dogz_&*6;0l|yu^ScqE>zf^n4DxR_PfQWSJ)gDzef!1wAw9xuefp*D*MDe zJFO0k`O#Ij#uWjo2WZkL6LZOk24OT4OQ-NOBwko$YijFOchi%-L;DB&Hv3%NfsuHC zh8}RMhXy2~9>zD1Sh?CZ-w;W}L1gKc9<}ArLC9>p68CJgh7<_TcTQ?@9%8x8cE0%4 zHtWjLOTzJV>QtZAzF{POXjn@gjYTxnIWRw~ZBE5V3mA#-O{8NJ^S#V2HZxs_H}zTf zb`xO3$@Jz#+&K}BP0uA!R)03EMK=2}i35OlVlF+LPR!CHuM5vGqT|1*X|&*9>vn64 zxYur5PG2r~)YkmFF^oSLw=h@*Biyj~Sx=4f#u$cd;19zsWLpOIN>(8`MGrHuJdtw_ ztb*HP3f}bx;1ja7voeWd=q*_eVZ)cRvWJalV-Gsr?@DAzb^(_V58YsGZdimthz~5Y ztq@;cW?OF8-(I98QBJj_f3q$EVie`P6h` za<_I=o8BCYPeAxn9SkF%4~Q36*&0?Y-~k}l9QBJ;t1;cjgTICx)bC;4`#R9LC^}(n5|4bnwzJspSH)0TSs{KNQmVw2?^0HZd)}p--|P=% zOZ&Uk>vVP_o_&|HP1N0IU0ELR68rIqr@Ix0*ms|Gov?nhcC8H@=3UTwq3b}@1%gy2 zgg@f3cPj@hL6R0)bdH}2ikbs>486FJq zfZ~uuJctMKd$Y-kb=h8Lb91f$387Z1PPy5He( zld?c-rS8!6E-a$Q9pu-mJ#JcmkDFv83=A?O*(@5JB1t?YI%3ND{T^t0^fS^z|3Rr# zyX0%=Y!_=u2p(1c9+}c2M=&57S4w|4uUo90vO4wS5zn7emQ~A2BQ3Nt9 zt{KRCL<;ZX4aud^Q2}G##S5nnLebPc1>!fiEB4F=RUgu39To(U*29Nd^~Zq8237W1 zSR#KUD43FSAQs{AGSYc{In=vx@iH|7uEwV@#70Ka{ ze29tSp@Bk!!)P}R5>4gM#~UE@StD*7U0PR!2G?0viP_tfQXO}^(D@j5yxojD4W{u( zT=TgYfh@qXSN>XQBn)cIczlwmqUvULB^(-QAqb7MK-GfpCx5x75~(!tx{(%Im*@tp z{vLQL$e7E$qI#0wWS2Ps$StTr4@WX6EO}41@DUb=i&QXPPzRD9W$T2Kktuj~A|SxU z2ABVu>IZpZNiD;y1o$&7El*m7p6q0R5rl!)P%oP~zayIV&LyGIwc zSs3eem`aPbk~09@qf%!xe@4gW5)Mpnqx8`VBmZc0!tL?bK9a zZkpT#@O{w~xuJktJbOav++viH51Hy%zIy^Ds*{j}fPc4cB9BhQrs0m#q7teBraXdb zh4}c6rFG)_b=D?_^h)VK&4DqXlNi9rrR!_h#}klLaImn#$j~?b+aC_n;^+bh=;Wh57dw>)&T7oX@E1M%pSyx z7b|Nmo}g)tuCAK$P>#9iz3beB4gauu1kSLyq1)zWA2iQ?>^y5pnVhwF>`7=bCRPLO zMzWfqoOH$$l9MKH)go*pB$;&X1vfvAk}K6XNLF-6>v3O5=f3yFrw@`8cYFKlX3>tGGK-U^^A8^P4c&# zs2b9pqLRZySu&51zexsTNT>~J>lQE+wT%kOSF-t5{f)hy(I!L|yC+2IttS*Bv=ME) zmaY&VUtwKify`fhP-$5jPsFvWKxq(9;>0rR>INO=IW*{YWqClW8HKy^OdW-S*V zJXu>M_S~whY>azdGHB}()*jl z-KVYCTBulBYw`NUl@C>TN?|Q^yT!wQS$qpy2AKc<6hMqr5+L3y9{ZNkDz3Rr@t^5Z ztj`yV=jyGFGb?MYU#t{m%Zg2asmI(ms4dg6xHg(jz<~sxSQbWJ(ff;vePZi-D*7cT zZ@O7&w!jek`w}Qfr~8$55&nj9p7;q+g8KanairAh6-C#S24wkn-=ch>It~A`ZbqBHqmJKRrFSe|S^?LE`nhU{P?lO7 zN@$ft`%>%v^B01cc3upjCY|wN>S4RCNBb( z7ZhtDSca|EE(w+kPCWq8`be3z-UO94anYShkN9`1b-yV7K-1O|aDtGii!|RVW^aai z_Kth4b^k-yfdF5uC#w8QQCDFdJo9%JYjSA;TdXUjey^B)sKUz(vRk}(SJ8ynadYLx6|_fb5VawDK-Nr7SrT?KABs(=*Z_t`BzXs* z_&%6zQZJHRWuyhpn;exd6<@D}pN|Mnyt&NkG2vX3_|*N1lE(nh>+3Lp-<`(M_8*qU z|1-e4xyw zz?bv#UrUEx6F!ekD~*-xf&oFlUp!TTAVP0;ZdJmVM}0Yu1nMFW9Nr;0aBq+X?lwzS zbQKaCtE|J-Uh+hEd4Ca6K173?FsARVfLd(`*(N7>wh$=AtCxpUXO*x~!Y^C+#6En;u}0(03@l-4==|aMnbVxNEa7Yz61N21XJnlWavR8 z*O&7w;TU1)!7oD41E4dSf>t^`=GEOrq86XSdLY=BqsG9tGN1XIAvZKg6!{zQ&WF5 zoFe=(S;b=Z^W}jO%+)83uWichZQ1G#5%(wYmM`A?qyT?F9JgET;`D7w9Z9|ax#BK= zCsbKr+j|j=0{@4^pxQ_?Fv!rJ#QrBL?3o~ZTZO99e~xOySibbNoj>pb1;tpItyj`> zQE`dB2hW0+%8tEwzjf3^Z9%t z@q{u2iTBhK%C`Cf8R~-!eRPk~gE${(Bn~n4aiz@y*?FaE-6NjfrK}RqJqb@h@h9L3 z0RAm2MszW`^x7#v=;^)KrF{P<6u*_!4#XDo-+<`ar))yhk$OQ5_MECE3H)5^XI-o!^hxOn2z%7G%6sD4rztTXS~qGphHMjFJ}I;$Jv9a_Um znr1oC+y}+Co`g2`{dLv`z=UVm^OO>_a%b4ur|LoRXHO~HX+s}Vd{a@k$Q)EwTM)?r zZNw&qzOF147u~+JQfxh_EG=j6D0CC{j$+mx=^91oG{Lb0oxt(AMr{6!vO;#kE?ZwH zD0{wnR(l~9ETKdWp!U-~f(x$f(h-bxc6rpi9j zwR^D)a7(txZu2uShYtq-Pt5l?E7yDpr01+J2(@G39+=LDpAD(>_Nb#l5DN6z2 z7ry|-=()YH6rb&YizeTHX#x8EpkirX38UP zoWj5L*wCjR11x^1^$-3HoPWwFoKNe&q}10)xTTX6f$7J-?ChwbV!9{rk9hN!usA>X zinYm(9UlP8kBCF#2b*esY_!ylK~sPEOG+m>)YUI5aq@rP`(=17oQ9|O^VZ|!CpHV^;UQ#9f*ruI-M!5*}KL3m|P=L06csN&oRT%`v zCcdg%iUEG{tIDBrLQFr{LC4>KC_&8+y-C?x!F!B;&@pDZj;LT%@mK)RRbNt8)#~vz z`g}phnD1%})Lvu4s)cmlQOxSWXO;5W{0T$(#T?tkcb|oi`$J15K9{1 zB?WPR>3OB!f}OS6zg2?ufr2GPxXxvNtF(d+j{mLF2ga#;d zC$t+&@$j$f+r_=T<%fyyL)Xz|l&^4FC=1(b}NKGEkVlz|G~4f2CqJu8%!5-KW13|y~W zV90Dyzi$#eEZ}m9$KI-h@WCzWrW9{U0Q)^+<=Y^-Fr-73ki=Nv0JQMm{GD=X9g$N2 zp-uER$opkme*i+b=G#gOnqX#Z7qc;?6%`3H0&bUhD5h*$fEfW&(sP&rY|ml_Vtwx_ zx|NWkStL5k?JI-e`Gx9>e=&;4O|Eoyz_r}(&XGgfdfd&6K5FhBqGF_sk1vmyFzE)zU z$G}XlftfxXGnpZG2N*N4xlc%HcqzIpkf^J-h+u|&D zzW;$Hk~<5{W%)|@mXoph6sAI zf6siyI7 z6OpNKd=i2Bh}zc=pW{3QoBp~5p}Og-S-z`noWQ~9Zdd2%04>fcZ&c@!lUh77zZr+n zVaJgN4LKN8wF3w8kfA`ho^f=!5%HZnN6km+_51?4)7Gd)UNNJZnzS|}aJt|?=mt9F zXF32rupKRP9)LwC7cEkZhu)%ws^$zEsEC998lB`1C+GR(0^)0pGt+$DLUbmr6&W-` zZpRMfM?gBIC0D6Nl8PR67H7t_BobiInK#PZpj*y*(4-MBp39JcwkR5d+A_D3)UKS1 zA%aq`Za7Gy6c8_tBO$<^>DYu8nUBz+VwJok9jXjXL`n`md5rWTI5*CRkK=skSan_? zzZz$eah{v1^+uzrxITf{)9|B~rs|P6Jl*(%Z3#qeiMsx}Wf>LHKD|MOGipzGUd8#o z06zD){XK9HI9+}h++pkjhX;I7{P4X7e(*6kw_Fy2LEiv}luZD62%I?585eE0IUHKs z(GHrz(T-012k$l<>IU5M>deh@ytB;15!J>i2T2Oi^zd3tYhh%sXp7@8lYE_;o0y1Q zMUBZt*e@Pd?JJAgMBRsNjkWY#f|e0i6TNrXySK5+{*6N$hxQhQHqPuJatdC46SQ|E zV}Q0N2W^iBpf*7p`X?&+7KgU?HK5ILK)g#JF^=h&eBe_HL=@!WHSqv( z@7Hb3Yh)r6GvT0=L@#~i-A~(^S1(#j!Y4~aD;V8OB#4$IM1b64>keC^ox-AL(l{xX z1h^33ltqiNYib-jsQyei7U#DJe4@95bm}m6H5k^h-8iqV&ZQ_v4VpnXWvXi?9LLwB ztU$YoS?GJPX-4dqj>Q?!U$Z#$y*cy}PrE9&9mooU0tIo=74g5uihw`ST4Y&&=ee=| zY#rkJiQk*=8_k^?(-&)!)+XZfs76fb_hax28hI`5ZaB%i=ER+!YPRk)^LaG%hM7F- zWRa6Od<7kj&!WSTbKvbG@kJN?wYYi?bSB;`)Y}|uMhp440b^6a9vpD}9hf-dnVExI z!NIf0C)xKV`u>LoA_U|@j#j|3&jQ5PV%9kzF2bf0NaJHHi`exOo0a)>DNRo@>$Y=X z)>K+@>Pm2G6IvWzY(@n<`YbbgE$*x@2d|N%kS}Lfikb4N#kg&@h$e_5uQ*$oEXF$) zvSVt);KfaOyqMl_O=84b^B6I`I(IHirFF1^1)IQn4$SwlV6ksCTx$Wh`?|xm#p|{V zKKnj(_2r@1FTc1p_RDA1l~@iPe*8r3o}x_AUyUac$R{Avb4i7Ul1s_+VTjSD^Cd}* z3egT5!~_nmLZOW!!4gtQpM^2oOqs2tD;JsHA5ZIjyn;<`UrkyB?pQ8k~Q z^Hg?Y=DT5rAPs@KTMHxGO7uLVF1-hmqF*16ep;5pxZRzUMC=x^^;dPZXCC;BZKIW9 zh8{oj$~DbDww(Fw-`XCj5NAHMROa$12~kdroZp=)iG5F0EED(qw7mXI+qZ4MDzYIe zffI>fAxP95yc-S#%&Eo0ZFy_>D7-baddbeL8$Z>p{}GgGls209`H~ zz1v<Or1GY)63 z3e18fh5cl8NI%Ca6{Z=o-S86V&$bq_j8R6n1!gE^b)$%j&TrZ}mKWxNLq3xP6VqIZ z>{`18<{2gO>0qpbu^~-SOhcx|)AN^^Ak}&QY<;8n#MhB~D=0o~sc+8UZ1PAPN01Ak zp2M?=FoKX&sNS4Ml5@=EnVG2VQS*SGMbRv8$g4oa5>}7GyA@RvRq{+}*3#_A8LJzh znWi|`U*SZff2`m3nTS|(QjlauM@fUH0rXV^KDfK83f4V>4Hqm@uTlqEZmX)LmJ#gQL5miMb|hOOVbyHSthza$vG_g;45674a5{c!UmI zV)v2gp_j5qW{XhjKz()~D9Y84t)hlGKjvIKicBeK6r%Pi^_@(LSaXlPGP5xIAv)@I znHKSqDdZTW457L^0?ok+k5e|lc`QhLVj3w4kvoucr7~2qFhhTov67~S6*Q09m{=QT zrXrCLk`2c)qfJIWx@4EW;G9e8aY?d2yyF65qFl|ESzh_+PS} zA2&DwRE5{$dA~EmE8utQa@^!K_#FYPp{+mxI45p*A<*Ry$?BHB2CEaVSnJpA2Ck{+ z#?kU%_@Bbgy51l=V?_PqbF$n5{ZZK%xY)Y?LN=zD*_e3v?q-KbeZJi53_>~VhWWA7 z^4l$Yx8MDqf#*wLdPHKs{OTo3{_W6VD-9=mGhaTl&{T)mbY(1{J#hUYqR%do5HSlui>8UkH0+^7QhSMzD zrO9&za{NJ5sPNU#&cRC^ORL+l3s7L)$n4wF!l5IpDl+*TOR24Mvv7mY#Nw^GLP~-u zmYktfuWUSJ$a>NxCWC2*;{jl z*c9Y;&YRyxZ;&2c<`YA(%`^-n6p6Q{Rhb+w6IJK6bbDUbZ!&!JyyIcCPO~vm6ibr_ z!V^V2-Y&R^k(vQgmU5F*vT-&Bp~6uDX0m zjB<68wR~;XEEt?k;Tv+RE}z5p*E~M&hWR4XIA5y{9z{}XO4@vNV-u22)>y?0A75$} zn^!L_7mwQNO2ycd&E*QG2X9AJq*WJ#Tbpe|rsUrAP^HD=)!;I!*X;+gT?@TQE|*Nh z4tr@qGI4XOW3SVyr_%$|@2`KdUek$k@buy1A3dTIgQz|9#-rrC8z>(e^ z&_Z7OJ~*`brv=+{lv_Pl!;3o`0zs_spBXL-Zh)wt{2Q_%^#>2$MOp2emg^n)TnqaV zYyRG`VoLH8ok(aZyimaOHdqn)ZWZ01(#{SQnxNGB$+tqZTfltC7z7RA|1c^{Quce{ z+FM#(%2xx3nhu=I898OEtj`+(r_>k6KY;wg4~0s~Y@`c8eP^U*tyzv#6une(KwS5K z>&}9;RGd^GTI`}+HIS61JMkQsbmm%*!)?Ed}L0g5n z_sz{8dLzw0co(dKT3ydcHrq8h#2{T8r@JM7OUZNWx%u+twOx0pbxn$d<8=v$78UZmNJZdd}LwMUK%Dt zlCW^mvDDdI|Bxb_uztLZ^1Z|ThC2-!Dlu>H)4XdL8YJJ{Gk9_T_Uzlb^K9y%ygFfj zf8MKnN9g9x&d$Y(!Q-JFjV0z%>|y2+_uvu-8$79F1wkBiQw7#5FN|Xh@b=Jb^KC4e zQ=v}@UgNPmf7Tcnx6GKBv*obWO06B7%}@fTLKiHW%{nS5Zh|*qk@>_RlLBw-qAplu z>Zf>-f)ycO1|W3knwB&Y!6S=-_|UfIOU|sm&hbPMoUq7oit{$@Vr;Z&)0t7PZB21~ zl9!itF}*Djf5zbuT|cvdlm5ifTuY;h#5(6^rqz*=wKy~Q%6ScqipO(U4dC`4S13F# z>b~CO5Sizin#HrbnoDt_vdC#~yujPz3G~p0{0Mxxh$oDHlmRP8#4Wf>hT{Y zX9|Y`&>j=1gy%F2WF~p>drcTdpl}%XuC%+UYkv;~JD9q5yJvjnuDj2*Ya|XgL%@xb zE8$-?*(%J_N4~!8O-2k{ zT8yr?FBjWycGzs5U=JNvG}m&^Am_?gcFS7p#hY$)G+Gb_{oxxNYsEv`?9F29*PE6t zL&S#1*Mqo@yt=L#uLs!$;nR6^PeTQDyI!1klcVv}-i8wK*bTT>XwOEttL|OaP%nno z*emR4#oa?mP)sA}p24}!QyXm^;+dNqAq$RIK6DE%DEj$M%=XD!9i=q#fSB22TTKPd z6Q8;nBlP1W*e#B=r&csHi}Z!K%IAOG=xC}3Z$BgN5s-_I^On84npettciiG=6i?lb z2H$myLlqAVHMH@d#20UL;1p(1?7Q7@xk%jVKyK%K82{sUID#~Q{i5wo$B3A@4KK#N z-BcnTyb~|3J>T9c?prv1)t!!2@R<+Yj?Qkr6JyUX&X;>CnqPyTjHc zZnzf<`s}66OU3(7I4%=aCs5@2%K-d$ZMUxxzg^d01CzI(C$2l;Xc2c+HCKrz-{L42 zA3ouj5+6Rl0ZU2c-mwd#dhdmHA5F(j(`gbPy$>};E^k(-TQ^D?J37<>|QNdB3|%dm`|<5 zsGqCGbhwG1tMb^8?HAsw^MiP#RqFp;)^@&<6>+NcxCsWW7fGDCf|w~{=*(y zB^B8#{`IZE&+Tt_M6*Sj#dinnYOxy`9iHB4KPYO?vp0&`%dj$g9|TB@mYT)O7h;Y6 z;X%hGV)dUpn#I6790%mvht>hVzZtY&FZP^v++Pg#F5Lf8-XYY(O&i6}5802MbiJiy zO_wgsTX1BX&VHw3X@v1XG$1!Q8^t0#+29oqeXPDz3>7=xBc2*BZ92L2cx4s#HucKb zgKO#{`QgpUk21aXEAMRnC3RG;mp#*az2kV1NL*~QHie+Ka2Q-ll_c*D9ahCB#_YSA z{rVHL92tifCTzFUiTw9AA1FuApSK6BWS<7eT>swY_u)nFrM7lEb9Xj zwC1bfsONyPJTVpF7}gW@L0w`3T)PJNN2a|eF6$n>08_6vJ`26`vk z=eSf$*H{PSJw(!A$Qe;>L9cHn#WsjhQbsW?D2Tf;U|Qvslm^3sl5Tef8kj6PEcYd) zQ6ozZ_wDN6esDKDF>L|ZDeRo&W@+i*v6LPGMm$-8rbmidM~6f*c?n)SaLDZ2GP@x& z3;d7X-{$k~m99H5b`U!QQmPHBZh^!?jCO0tIx^T*$6` z-TUyc5er*PjBK*nZP?L-N4TRy*O`q?RjR&&w3%hw7D zXTw0&qXu2GM&4+eO5SB_`IQig4ufHh?30303vE-BIQTkY{WJNk5C-4B zuij_r=%8EKR@wAR=qNlyE-O0&Q{tY#s5oj7-h*$^JXMwAiv` z@A%WFg8zIWgv{0WePW9)XUbwr%3`G-hMYNEw((@+lq2ct}8~wVZp({{d?*>${ zroO15!npkRh)_!qKhQtfEhQLG!C*4OX z#gRr4OSHux_g%=U#Qhg=0gdJdhXm-W`#Dj0QIQFu?HZ_MxGV+g7B`FT! z!e!qoJvoN61qTP<@=eWR+lFET4972DOoI6*ljL*8RWqH#jm?*uCV^)3i&Ev7^F~bV zW=Bnd45k(8oJrt{xM(swnTU7JrFw8BE>4ucx2UN!sm&tx=*W7@ip_9d;8s9{J>kM1 zd6Qxa7w#apcSMIdtpQJpAx)T&n~&Icu(5GpG!~wWQ*3_(+;304s4VixC9r2&Ng1 zN5t_9OZvt1D|Jf7*m0v7nAe?w#u!HwfEXYi@BqS^FbU?B9UVIp2@U~FQGnWkvtmYW zs6av;PuVX549?7?WPl-~yhLuCosqbmP9WL^JCY>tDTsz$jyv%#tPT(F0`%rFcMUh3 zIwxakCvKah7(w}-kW+t$Gd|(?+$_Btnx0ERyF$(s>_Fm-RdPNvOB{x~lcJZVkpt8` z&Fr$oTq_uWzDBP_(~DqZM@Jq$c68+Eh{l*Pk&cSPI!VwLEj%QZrf+)~WW+#c2j(S` za(DtvAh?tU5}2IN7_O6-BCl7(ozcE1xuY<-i3zMIxD+l8PGtaLO@boVQY1|K1-k!m zTM`?+09Oj4o^T1siBoDb+z}v?RL2Vn%)}Oezp_KWYU7#%g&x zGZ6I$z*+MtEN>Rz5ugww))jR<26_+)5#BN<0t>=cH-cONIk}Yk=138tkS7X!1+&It zvJ1EutBJ3)qO4NzE^Zzzn<+sjC-ik_DxIE9^>lSHaY&xO7)LqWc8&$xx8RTjytI87 zJPjxfK22MHG^vgby2B&|Q0Sp}B!DDc5Sb8?P%$3%7PW1V0=bBJ%eY2>Nk>K^tS!(x z;lABP7{L8Nf<;xFRGj4bll&mr!h4fooe6GP&U=f5WgqS9gDFNhz%b-rr?%-lE{5GB ziE^|IeC6^692k;3JeH#EJe`x{@@+=VtSE@361CIpP0n>mF}Qy5uOF>{02U_hX-62v zT1)4a9Rs$9%jUG$t2ewe_UesIDZ|2?SyDbz_8cUCxV+~?ZLC3Bn8!Dsc4SS=bGV^? zq2L!a{-V+#A*~KZNpsLd7*YeYh078ZCn9-K=nY9Qx4W}bJip7Lo_cSQb#n@ODN8xY z*|?HkfzHuUaa>GJA$%S)bw~+LqNF*KicQ8z_TY9@Gh+hG3EnKz4TiZP+Z#!GzgYvw z7s&dc3K+;tEHy*v%9%0f)E*hhO5yzG5O;yN4|x}u#LD%UgHA-OnaGQQuw|5r@dd#1 z2J0Y@$`R-@)NLVWHY!%v65D8_9~cz{|Kc-RglVKq5@SL9dj#qetPk_R_Tpw|fGI0| z5os|C*~Pa`K}ym#o(wV}`jJ$gk-8h%$Y66I3aG{)hJ4_p%$*FO+$5A|#LZ9~n%-flep@c2LNI>H>s~~Rw)oe4;1a{XI+D0gTPGjKO#%df>^xg zB~^AEzz`krD4jIu5x?GCv#+4#z*QZkr*{dtDVI<4sNA8waHM~9Xo>H zc5q-^%uafYzDL+k%0W&d;@)RIj)C5sYDNd}6pW|@796r{gIOD<=X5Xv&vh9(Y*15{ zn7EmYXNZ=-V#6ebHV2YEYO*`v|Jfh~`gYoU)d`aXxrs0( zpOmo$aQx7iOAAm+Y*8 z$nC;!Z@nRQZ&^})OPLC>do>)p$F4BMZqeyAhS&`TqEUCZE8+}Ce10eL2M3&CU$EQh z3Iv1UptoDYbp&dCQDdIDy$V;GP6J(8|1cN|=m2)hES6n7{Ev0WE__~5*~!uOm5IdL zit4*i>L_F#vw%GLIIg9SX|TzO6%ap{!2dlZ4h&x$mYJO578emeM1Cgg0h*6~-601! zLc=1R;pjya3I~lglE$sKJ%t#H8%&7~a3mbN?@*=Y;}OndfX~2w!^L1wJzK9^H51%$ zE#_35?~#H&%x9p;0-~e}hOb~IEWCE z8U)rnZAjX)&s=1w&*Wy$gNw^kbi_AETRt1Mux$lbA}PqeJhUI5)9KbT}P@UQ{rSEVFCF zITpN!X@z?vm3#CMn{xXup0RB({ z>Je;9T^Jj7&-Sd_0^(G<3fT%at<6PX+E569ukdGJI&FL4dqMaqi*_d>gx3LO3;k9}L7u{!^Di#?Yi$ELVkue??SL%oH2>;)Z8q5YA~TX zI-oIw6NRTyjd*cK$6#_Y9FJXXC{;(PS{5Kg@w#Ubsx*6BJ32_Kl*P~put)c2uu=`- zs?$Wq;1kjd0VBas|5(q=jYPJEriiEJ+z%ZceTnI5=n)_eQU>!uOan74F5Nk0;1KJw zel^2uMJK3utOg`Gc4`=$ysM9VE9x*542Ge5Aq@2vP1q>?F|2EnrlJFpbX(qgVQJk^ z9N)6G)}L>JdL!9Q+@>xrn#>($16 z&hZIv$mt6ssJ=VY9dri9anFw@f=t^1j%m&l*W#&LDoZo876yMc^IW(sNR?+eICTd( zy9dZrK0v-3+UcQL%*?B7G6ClFjy7G$F-{Gq!t=VP0}tq7Rx2PpqSb&FY?u%_A2mcI z8#3I8G|Z#}%s?2~li}T`Vz_RX?_6dRDccXrnv->3JRhz)NPgILxr-X%28X2w7cCBA z(}j)X>y1ExnE@xrHUPHTistp5f>zy)9;So2$B^4<(kwkCR|gveYvxsF^pzfT2Voys zZgXG-@J`dUk$g_ORb1(@G-O)QJuwNA3Nh%aIv9}pWAK3VG^0mkS7Jgm-kLiffQcBn)s*&9aBw267X6Bng}rv&wQ?K*|I_f3^B<%j}b+oGTYAC1ab7M zq2U>d9DCcdK?PIrC1^OsE>D?A{Rui2ayzm11oa{X^pl$e;*%dQs@{}!r7T2ILLgnVSK>O&2p$l|;&gH@9hid7G&VMN1yFV*j{A2h+DBdoO~F2dJtIaC5XE3m z%OD^o2OsiH$8jG3s#9n1GNU__U;!Zk=9K9GxClN(D?Uaw8)KthJ-UIv*16_hoFN* zDBA>c2>_5lN023UoWE{Z|Kn1!&3?4eX-SYI7-I0I%Mr-vr^oH-oUcISu!=b|zKM0jD znP`slE#xCD7hf{kzXzH5Ky{eC6gzwej*6gYyrc*g9KG0%+p-g>PK_&@>Kdz>dpChlh8g^&NaVT3riI=JW*2xJmF@73>b# zVQ_06(n+Yui?)`Cny(F~N>FI%$VW=ppRcW3VU+b5-rrQG9zvf&R*|IQSV3*b3o>>8 zu(`xzrg)G?Bp=}p!-{HwPhj;a&!|X-!xTTv9n&}tc!-kofj*WNfqnnxuN7`_8 zEf12G$?)yV0n)isV`KyZA7$uMaVaJJD6Sd8RGh~6%XMJuwgESIf z(fvYd-c?vTAv-Z^vuV!LAr*2@Zy$eVy08qtkK=jwkXrkMoqhT#I8X1qYMPjqyYiVql zSMenwg{bfLp`xudK0_76LO3(mf6eL2YC&M^9s{a~sn)JZ@? z)9{z$%bvmQ{iCRY#Xm~G@ypwY^jXctXTwo_l%4wEO@q$h%q=y-2_QyO(4M#nCLO-K zQ}uWAymjL|J8$+dzR_vxR~*SpEc*4xfyJjCki>KD)5ax9^rKH()&uk84G55Pv@JVg z^Ng`Eh$+*Y157J?ahi&Y(}*P)YfWc@;*)K9V_I|+4|)japlM1J%r3Z&J7#L4!qmQ8 z7CCog#lsnOTTG98a%S77rqLm!wi=Lh1HU=kGB_{UxhZyZbUfg@;Mz@*wEltmq(@|g zyA~FpRwTX1zUjN94qSzuAqki5m?wt!LCW|je{^_qRlJa^f{J-{Iw5-Dq9DHvxRVFS zOE+7oLAfN<&@8>7aQ2lGCGu?lq*P;=_eVE zLFK$SJUyrNCzFXJz(*`N&l^nUG~!QM7*2A*xvJ!LI&NpO1Wlul9nE4ihAzQ9(lVEx zaE4?%$O=V28uCHrFJ(RkxC{(H0x9EXaSX_KdKO>+@o~|qt`~#*Un_Z5hN>v48nhLwOc1|qC2RGbBOGZ zCT7~)0m30DrwX)<&bjLLVdsohB7Mai=$6*d3O+XNxo zPHy&_%ba=e3q>nP3+)b(&nuNO*OA4ElXgq`l?$~UndGnvCNrEIhZmcC#_S-+h>gv6 z-G`iAXg5-3kHj}^+C*t}j~ty9T~A{Z5Qm+or&H!FT}lZ87^k}Q#L6%SMx|}oIXf@H zPZzBccPn+iwveaW?G9ddcuVn{w(RTuhraQHW3YaH8hiETk%aC&ExLY5(e?119xdAV z$+>Agw)mzsr`x|C(PN9f6BA)~z>mzGn%n2}xsc~Cb>JJvF9y3Ew>u!Xz!Mm>#nIH zLTsUs=vCSC@E7N!U;Np_rQ~hK*BYi%+WGExqkw(;)5tL;fnU|kE)XsU5W@V4SazYPW8pe&S>C^cj#U_5UUFAY zC&(F$X7dPuwiI*7R0!^qvp?Z@@QUWV__{10#Pe?}YQWwiM}Hd}36nxtz-#j_CU4$W zV4i|>QMj5`$4$f6P1t7gvzs=Dc$hFIq_YZcIF+5NaI#YfGCe0kA2UyM%s^S1a7RaO z%8QXP`_hQj0bbk}Bo@1jXZeT&y2U1PmN9H%7HX0sBk?TFFYfSK+*xMk4U5-1M@L!K zw@qvr-c`9x0O%TVZ2i&$$Iii{;baWAVi%WY)G=e&W6Z1QtI;E#02YWnhLVqyO0)%v zUEpE9oq#zZvUO1gBl1RNIi$h7*yDMC={ff@hr&94%IJGn)CP9i6Nqwkc64NY@usFxnf$MER}K4zdAmlr4IdCudJja6fl|O8xi_H>$FM~~ zJdj?Q-1U7qrq4Ne;X-G91?Q7ktcojdvNX#qJ*&J+74gvBl@(Pvk35Al$GmS)KY=5} zBd&bO(kOoZ)!KC#?oXH6daQu1jyd6y%xY9R!z|FCx^zZ|nRP-P!;b%+ImBs9$N&yN zEUu0pb3!!&@aM*jOlpX?>2~5y60oF1CLZN|!(7r`N)oJMCxQM32r6>>B&)3Fh3_B; zmUqNR8N^*JIE2COL*0AGJICY+;}{bugv_ouNWBb)08{M{YMVSwTsNmHnr2FcyHR)- zOiOb|g_b7D(QPe;5do~DCy0=kp}+-QDW`i3++O3EZYI++U-lVvYkB|~BP+@cD-~C) z=noQb zLG{!P`a*1Tn99)nm>rNjVI^Katd%j;RI}{ocwMS?8QMZP0-qD0OnQ@8B6@y++9u5( zbDD(;1;&Nf*iV<%&AwyUr}TFEIierHB2kDy!zA|c5xA0`n#aQG7L54dKh`x1*HmLw zW*o`E;PFC}Nj~`mu|_skd=i&nAt0m`k#)x29l@|LZXkc>E*qRpEFmwPf%aZ?E6EgP zladi#cZpdAG9?mq6@XbRBOKWiUe#vUmVpa$%bYb)Xy}8N?1g-RbB%Up7VK2(O1q9Ph2tR)f|Pe! zMhYJ@+5u7YJ?hTvR(PI5u&aOnS$bUL7*0XorkE1eE0Ek1oPa{9vou6l&N=gG{SAs? zt5QDzO6Mlonqb@}q+mC+v_Rx}dMH^)ObRzBZYn~2l%6!5Vj`g`8|pp*iIyG;m>y3; zo^JVHlMF&-2P6N?FqsUCp0yV?T0lko7LDmuer#LR4HAHy? zr*F&+4`JFV+BAhw!D$zbv3WX?m^B@Ui6$c0rDt{tYL^!-$mTeLg?KQ>HAb`+xCey) zw553~TQxct3G2s)aOGVVD8z3~Ffjlk2@RtrgQkM5{$rrAIRoC9M~mznVdBOJ zW|1XVi%!EtCs^c8>?V#8KLyvw8HG@O*dAeD$N)4j#FkSWmsA`Pr_eWz2r%f02LR>+ zOgN+uiuxC;nlh{zjdLHD=qw8TQjn?P$S7ol)S}-Io06@{Gg3qWIo)3OVW@2^s)16b6U)FC$^F_# zTDdRyBOpz6>Y@$~#);Vg({Xni$JdC=m+BmcDBPRyfteFUTuG6wS7h;so^o+i>BJV` n0A*l7yg3*EMDC%(@#4-cLw8R7S^R+?!K?aDP>XEdLx=w#7OXG= delta 486 zcmXZZPe>F|90%|>|KH5+dv7Lp*L|#OE}9E%pcZ)%v5;8nuq+E+))qttrAv7d3L;CQ zPSrBfqd;62I}{OX1CC&Ju#4oWK#VS$Bpzf>t{tP_LWj@c_vL%~{$}^@%Zm+S`dYW9G?(VlfwS06-vTW?zPYc`d<;mc9K(Erg? zUAzGKl`jTVzk7RG=D#2LEQJfBx!mBg@k~=)tB$-_{*(VB@0n13Iae<} zsTU<60|lr+13EAO1}3mT2!sIv6fj@|2b=*dAOfNw23kQps26$UT1pb=BKnG?+scme zmxV&R=!A@@4OXHDhG}l%Q*pk+yG5bGyDairZ_$#%p`8$ZCu57VdV`&Ouyx7kXhqD(}q5ej2?X+@~3*_Vcc~_GtneUJRlByMn%XG i(CZTS4tc`(KeAUoii_1lp2B0L=HO$cpy!Bx!~XyxVyoBy diff --git a/Cargo.lock b/Cargo.lock index f1295d6..a228078 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,7 +1859,7 @@ dependencies = [ [[package]] name = "linux-patch-api" -version = "0.3.2" +version = "0.3.5" dependencies = [ "actix", "actix-rt", diff --git a/src/api/handlers/mod.rs b/src/api/handlers/mod.rs index f4390c4..4fdd760 100644 --- a/src/api/handlers/mod.rs +++ b/src/api/handlers/mod.rs @@ -15,4 +15,5 @@ pub mod websocket; // Re-export commonly used types pub use packages::{ApiError, ApiResponse}; -pub use websocket::{WsClientMessage, WsServerMessage}; +// WebSocket message types are now in crate::jobs::websocket +pub use crate::jobs::websocket::{WsClientMessage, WsServerMessage}; diff --git a/src/api/handlers/websocket.rs b/src/api/handlers/websocket.rs index da27eef..9e1a690 100644 --- a/src/api/handlers/websocket.rs +++ b/src/api/handlers/websocket.rs @@ -3,128 +3,34 @@ //! Implements WebSocket endpoint for real-time job status updates: //! - WS /api/v1/ws/jobs - Real-time job status streaming //! -//! Note: Full WebSocket implementation requires actix-web-actors compatibility. -//! This stub provides the endpoint structure for future enhancement. +//! Uses actix-web-actors for proper WebSocket handshake and protocol handling. +//! The actual actor logic lives in crate::jobs::websocket::WsJobActor. -use actix_web::{http::StatusCode, web, Error, HttpRequest, HttpResponse}; -use chrono::Utc; -use serde::{Deserialize, Serialize}; +use actix_web::{web, Error, HttpRequest, HttpResponse}; use tracing::info; -use uuid::Uuid; use crate::jobs::manager::JobManager; - -/// WebSocket message from client -#[derive(Debug, Deserialize, Clone)] -#[serde(tag = "action")] -pub enum WsClientMessage { - #[serde(rename = "subscribe")] - Subscribe { - #[serde(default)] - job_id: Option, - }, - #[serde(rename = "unsubscribe")] - Unsubscribe { job_id: String }, -} - -/// WebSocket message to client -#[derive(Debug, Serialize, Clone)] -pub struct WsServerMessage { - pub event: String, - pub job_id: String, - pub status: String, - pub progress: u8, - pub message: String, - pub timestamp: String, -} - -impl WsServerMessage { - pub fn job_status(job_id: &str, status: &str, progress: u8, message: &str) -> Self { - Self { - event: "job_status".to_string(), - job_id: job_id.to_string(), - status: status.to_string(), - progress, - message: message.to_string(), - timestamp: Utc::now().to_rfc3339(), - } - } - - pub fn job_complete(job_id: &str, status: &str, message: &str) -> Self { - Self { - event: "job_complete".to_string(), - job_id: job_id.to_string(), - status: status.to_string(), - progress: 100, - message: message.to_string(), - timestamp: Utc::now().to_rfc3339(), - } - } -} +use crate::jobs::websocket::WsJobActor; /// Handle WebSocket connection request -/// Returns upgrade response for WebSocket handshake +/// Performs the WebSocket handshake and spawns a WsJobActor +/// that streams job status events to the connected client. pub async fn websocket_handler( req: HttpRequest, - _job_manager: web::Data, + stream: web::Payload, + job_manager: web::Data, ) -> Result { - let ws_id = Uuid::new_v4(); - info!(ws_id = %ws_id, "WebSocket connection request"); + info!("WebSocket connection request received"); - // Check if this is a WebSocket upgrade request - if req - .headers() - .get("upgrade") - .and_then(|v| v.to_str().ok()) - .map(|v| v.eq_ignore_ascii_case("websocket")) - .unwrap_or(false) - { - // WebSocket upgrade requested - // In full implementation, this would use actix-web-actors::ws::start() - // For now, return a response indicating WebSocket support + // Subscribe to job status events from the JobManager broadcast channel + let event_rx = job_manager.subscribe(); - let response_msg = serde_json::json!({ - "event": "connected", - "ws_id": ws_id.to_string(), - "timestamp": Utc::now().to_rfc3339(), - "message": "WebSocket endpoint ready. Full implementation requires actix-web-actors compatibility.", - "polling_alternative": "Use GET /api/v1/jobs/{id} for job status polling" - }); + // Create the WebSocket actor with the broadcast receiver + let actor = WsJobActor::new(event_rx); - // Return HTTP 101 Switching Protocols for WebSocket upgrade - // In production, this would be handled by actix-web-actors - Ok(HttpResponse::build(StatusCode::SWITCHING_PROTOCOLS) - .insert_header(("upgrade", "websocket")) - .insert_header(("connection", "upgrade")) - .json(response_msg)) - } else { - // Not a WebSocket request - return info about the endpoint - let info_msg = serde_json::json!({ - "endpoint": "/api/v1/ws/jobs", - "method": "GET", - "upgrade_required": "websocket", - "headers": { - "upgrade": "websocket", - "connection": "Upgrade", - "sec-websocket-key": "", - "sec-websocket-version": "13" - }, - "alternative": "Use GET /api/v1/jobs/{id} for job status polling" - }); - - Ok(HttpResponse::Ok().json(info_msg)) - } -} - -/// Broadcast job status update to subscribed WebSocket clients -pub async fn broadcast_job_update( - job_id: &Uuid, - status: &crate::jobs::manager::JobStatus, - progress: u8, - _message: &str, -) { - info!(job_id = %job_id, status = ?status, progress = progress, "Job status update available for broadcast"); - // In production, would use a broadcast channel to notify all subscribed WebSocket clients + // Perform the WebSocket handshake and start the actor + // This computes the proper Sec-WebSocket-Accept header and upgrades the connection + actix_web_actors::ws::start(actor, &req, stream) } /// Configure WebSocket route @@ -134,7 +40,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) { #[cfg(test)] mod tests { - use super::*; + use crate::jobs::websocket::{WsClientMessage, WsServerMessage}; #[test] fn test_ws_server_message_serialization() { diff --git a/src/jobs/manager.rs b/src/jobs/manager.rs index e26452a..f122cbc 100644 --- a/src/jobs/manager.rs +++ b/src/jobs/manager.rs @@ -1,13 +1,14 @@ //! Job Manager - Async job queue management //! //! Manages async job execution with concurrency limits and timeout enforcement. +//! Broadcasts job status events via tokio broadcast channel for WebSocket streaming. use anyhow::Result; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, broadcast}; use uuid::Uuid; /// Job status @@ -21,6 +22,20 @@ pub enum JobStatus { TimedOut, } +/// Convert JobStatus to lowercase string for WebSocket events +impl JobStatus { + pub fn as_str(&self) -> &'static str { + match self { + JobStatus::Pending => "pending", + JobStatus::Running => "running", + JobStatus::Completed => "completed", + JobStatus::Failed => "failed", + JobStatus::Cancelled => "cancelled", + JobStatus::TimedOut => "timed_out", + } + } +} + /// Job operation type #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum JobOperation { @@ -110,20 +125,35 @@ impl Job { } } -/// Job Manager - handles async job queue with limits +/// Job status event broadcast to WebSocket clients +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct JobStatusEvent { + pub event: String, + pub job_id: Uuid, + pub status: String, + pub progress: u8, + pub message: String, + pub timestamp: String, +} + +/// Job Manager - handles async job queue with limits and WebSocket broadcast pub struct JobManager { max_concurrent: usize, timeout_minutes: u64, jobs: Arc>>, + /// Broadcast sender for job status events + event_sender: broadcast::Sender, } impl JobManager { /// Create a new job manager pub fn new(max_concurrent: usize, timeout_minutes: u64) -> Result { + let (event_sender, _) = broadcast::channel(256); Ok(Self { max_concurrent, timeout_minutes, jobs: Arc::new(RwLock::new(HashMap::new())), + event_sender, }) } @@ -137,13 +167,46 @@ impl JobManager { self.max_concurrent } + /// Subscribe to job status events + /// Returns a broadcast receiver that will receive JobStatusEvent messages + pub fn subscribe(&self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + /// Emit a job status event to all subscribers + fn emit_event( + &self, + event_type: &str, + job_id: &Uuid, + status: &JobStatus, + progress: u8, + message: &str, + ) { + let event = JobStatusEvent { + event: event_type.to_string(), + job_id: *job_id, + status: status.as_str().to_string(), + progress, + message: message.to_string(), + timestamp: Utc::now().to_rfc3339(), + }; + // Ignore send errors (no receivers is fine) + let _ = self.event_sender.send(event); + } + /// Create a new job and return its ID pub async fn create_job(&self, operation: JobOperation, packages: Vec) -> Result { let job = Job::new(operation, packages); let job_id = job.id; + let status = job.status.clone(); + let progress = job.progress; + let message = job.message.clone(); let mut jobs = self.jobs.write().await; jobs.insert(job_id, job); + drop(jobs); // Release lock before emitting event + + self.emit_event("job_status", &job_id, &status, progress, &message); Ok(job_id) } @@ -162,17 +225,28 @@ impl JobManager { progress: Option, message: Option, ) -> Result<()> { - let mut jobs = self.jobs.write().await; + let event_data; + { + let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - job.status = status; - if let Some(p) = progress { - job.progress = p; + if let Some(job) = jobs.get_mut(job_id) { + job.status = status; + if let Some(p) = progress { + job.progress = p; + } + if let Some(m) = message { + job.message = m; + } + job.updated_at = Utc::now(); + + event_data = Some((job.status.clone(), job.progress, job.message.clone())); + } else { + event_data = None; } - if let Some(m) = message { - job.message = m; - } - job.updated_at = Utc::now(); + } // Write lock dropped here + + if let Some((status, progress, message)) = event_data { + self.emit_event("job_status", job_id, &status, progress, &message); } Ok(()) @@ -191,10 +265,24 @@ impl JobManager { /// Mark a job as completed pub async fn complete_job(&self, job_id: &Uuid) -> Result<()> { - let mut jobs = self.jobs.write().await; + let event_data; + { + let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - job.complete(); + if let Some(job) = jobs.get_mut(job_id) { + job.complete(); + event_data = Some(( + job.status.clone(), + job.progress, + job.message.clone(), + )); + } else { + event_data = None; + } + } + + if let Some((status, progress, message)) = event_data { + self.emit_event("job_status", job_id, &status, progress, &message); } Ok(()) @@ -202,10 +290,24 @@ impl JobManager { /// Mark a job as failed pub async fn fail_job(&self, job_id: &Uuid, error: String) -> Result<()> { - let mut jobs = self.jobs.write().await; + let event_data; + { + let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - job.fail(error); + if let Some(job) = jobs.get_mut(job_id) { + job.fail(error); + event_data = Some(( + job.status.clone(), + job.progress, + job.message.clone(), + )); + } else { + event_data = None; + } + } + + if let Some((status, progress, message)) = event_data { + self.emit_event("job_status", job_id, &status, progress, &message); } Ok(()) @@ -308,6 +410,7 @@ impl Clone for JobManager { max_concurrent: self.max_concurrent, timeout_minutes: self.timeout_minutes, jobs: self.jobs.clone(), + event_sender: self.event_sender.clone(), } } } diff --git a/src/jobs/websocket.rs b/src/jobs/websocket.rs index 629260e..0bc42fc 100644 --- a/src/jobs/websocket.rs +++ b/src/jobs/websocket.rs @@ -1,3 +1,425 @@ -//! Job WebSocket Handler +//! Job WebSocket Actor //! -//! Placeholder - implementation in future phases +//! Implements real-time WebSocket streaming for job status updates using +//! actix-web-actors. Each connected client gets a WsJobActor that: +//! - Subscribes to JobManager broadcast channel for job status events +//! - Filters events based on client subscribe/unsubscribe messages +//! - Forwards matching events as JSON to the WebSocket client +//! - Handles ping/pong heartbeat for connection keep-alive +//! - Cleans up on disconnect + +use actix::prelude::*; +use actix_web_actors::ws; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +use super::manager::JobStatusEvent; + +/// How often heartbeat pings are sent (seconds) +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a disconnect (seconds) +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Client-to-server WebSocket message +#[derive(Debug, Deserialize, Clone)] +#[serde(tag = "action")] +pub enum WsClientMessage { + /// Subscribe to events for a specific job, or all jobs if job_id is None + #[serde(rename = "subscribe")] + Subscribe { + #[serde(default)] + job_id: Option, + }, + /// Unsubscribe from events for a specific job + #[serde(rename = "unsubscribe")] + Unsubscribe { job_id: String }, +} + +/// Server-to-client WebSocket message +#[derive(Debug, Serialize, Clone)] +pub struct WsServerMessage { + pub event: String, + pub job_id: String, + pub status: String, + pub progress: u8, + pub message: String, + pub timestamp: String, +} + +impl WsServerMessage { + /// Create a job status message from a JobStatusEvent + pub fn from_job_status_event(event: &JobStatusEvent) -> Self { + Self { + event: event.event.clone(), + job_id: event.job_id.to_string(), + status: event.status.clone(), + progress: event.progress, + message: event.message.clone(), + timestamp: event.timestamp.clone(), + } + } + + /// Create a connection established message + pub fn connected(ws_id: &Uuid) -> Self { + Self { + event: "connected".to_string(), + job_id: String::new(), + status: "connected".to_string(), + progress: 0, + message: format!("WebSocket connected: {}", ws_id), + timestamp: Utc::now().to_rfc3339(), + } + } + + /// Create a subscription confirmation message + pub fn subscribed(job_id: &Option) -> Self { + match job_id { + Some(id) => Self { + event: "subscribed".to_string(), + job_id: id.clone(), + status: "subscribed".to_string(), + progress: 0, + message: format!("Subscribed to job: {}", id), + timestamp: Utc::now().to_rfc3339(), + }, + None => Self { + event: "subscribed".to_string(), + job_id: "all".to_string(), + status: "subscribed".to_string(), + progress: 0, + message: "Subscribed to all job events".to_string(), + timestamp: Utc::now().to_rfc3339(), + }, + } + } + + /// Create an unsubscription confirmation message + pub fn unsubscribed(job_id: &str) -> Self { + Self { + event: "unsubscribed".to_string(), + job_id: job_id.to_string(), + status: "unsubscribed".to_string(), + progress: 0, + message: format!("Unsubscribed from job: {}", job_id), + timestamp: Utc::now().to_rfc3339(), + } + } + + /// Create an error message + pub fn error(code: &str, message: &str) -> Self { + Self { + event: "error".to_string(), + job_id: String::new(), + status: code.to_string(), + progress: 0, + message: message.to_string(), + timestamp: Utc::now().to_rfc3339(), + } + } + + /// Create a job status message (convenience constructor) + pub fn job_status(job_id: &str, status: &str, progress: u8, message: &str) -> Self { + Self { + event: "job_status".to_string(), + job_id: job_id.to_string(), + status: status.to_string(), + progress, + message: message.to_string(), + timestamp: Utc::now().to_rfc3339(), + } + } +} + +/// Internal message for broadcasting a job status event to the actor +#[derive(Message)] +#[rtype(result = "()")] +pub struct BroadcastEvent(pub JobStatusEvent); + +/// WebSocket actor for streaming job status updates +pub struct WsJobActor { + /// Unique ID for this WebSocket connection + ws_id: Uuid, + /// Broadcast receiver for job status events from JobManager + event_rx: Option>, + /// Set of specific job IDs this client is subscribed to + subscribed_jobs: HashSet, + /// Whether the client is subscribed to all job events + subscribed_all: bool, + /// Last time we heard from the client (ping/pong or message) + last_heartbeat: Instant, + /// The actor's own address for the broadcast listener + addr: Option>, +} + +impl WsJobActor { + /// Create a new WebSocket actor with a broadcast receiver + pub fn new(event_rx: broadcast::Receiver) -> Self { + Self { + ws_id: Uuid::new_v4(), + event_rx: Some(event_rx), + subscribed_jobs: HashSet::new(), + subscribed_all: true, // Default: subscribe to all events + last_heartbeat: Instant::now(), + addr: None, + } + } + + /// Start the heartbeat check interval + fn start_heartbeat(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + if Instant::now().duration_since(act.last_heartbeat) > CLIENT_TIMEOUT { + // Heartbeat timed out, disconnect + warn!( + ws_id = %act.ws_id, + "WebSocket heartbeat timeout, disconnecting" + ); + ctx.stop(); + return; + } + // Send ping + ctx.ping(b""); + }); + } + + /// Start listening to the broadcast channel in a background task + fn start_broadcast_listener(&mut self, ctx: &mut ::Context) { + let addr = ctx.address(); + self.addr = Some(addr.clone()); + + // Take ownership of the receiver + let mut rx = self.event_rx.take().expect("event_rx already taken"); + + // Spawn a task that forwards broadcast events to this actor + actix::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => { + // Send the event to the actor + if addr.try_send(BroadcastEvent(event)).is_err() { + // Actor is dead, stop listening + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + // We fell behind, but can continue + debug!("WebSocket broadcast receiver lagged by {} events", n); + } + Err(broadcast::error::RecvError::Closed) => { + // Channel closed, stop listening + break; + } + } + } + }); + } +} + +impl Actor for WsJobActor { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + info!(ws_id = %self.ws_id, "WebSocket actor started"); + + // Start heartbeat monitoring + self.start_heartbeat(ctx); + + // Start listening to broadcast events + self.start_broadcast_listener(ctx); + + // Send connection established message + let msg = WsServerMessage::connected(&self.ws_id); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + + fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { + info!(ws_id = %self.ws_id, "WebSocket actor stopping"); + Running::Stop + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + info!(ws_id = %self.ws_id, "WebSocket actor stopped"); + } +} + +/// Handle broadcast events from the JobManager channel +impl Handler for WsJobActor { + type Result = (); + + fn handle(&mut self, msg: BroadcastEvent, ctx: &mut Self::Context) { + let event = msg.0; + + // Check if this client should receive this event + let should_forward = self.subscribed_all + || self + .subscribed_jobs + .contains(&event.job_id.to_string()); + + if should_forward { + let server_msg = WsServerMessage::from_job_status_event(&event); + match serde_json::to_string(&server_msg) { + Ok(json) => ctx.text(json), + Err(e) => { + error!(ws_id = %self.ws_id, error = %e, "Failed to serialize job status event"); + } + } + } + } +} + +/// Handle WebSocket protocol messages (ping/pong, text, close, etc.) +impl StreamHandler> for WsJobActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + let msg = match msg { + Ok(msg) => msg, + Err(e) => { + error!(ws_id = %self.ws_id, error = %e, "WebSocket protocol error"); + ctx.stop(); + return; + } + }; + + match msg { + ws::Message::Ping(msg) => { + self.last_heartbeat = Instant::now(); + ctx.pong(&msg); + } + ws::Message::Pong(_) => { + self.last_heartbeat = Instant::now(); + } + ws::Message::Text(text) => { + let text = text.to_string(); + debug!(ws_id = %self.ws_id, text = %text, "Received WebSocket text message"); + + // Parse as client message + match serde_json::from_str::(&text) { + Ok(client_msg) => match client_msg { + WsClientMessage::Subscribe { job_id } => { + match job_id { + Some(id) => { + self.subscribed_jobs.insert(id.clone()); + let msg = WsServerMessage::subscribed(&Some(id)); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + None => { + self.subscribed_all = true; + let msg = WsServerMessage::subscribed(&None); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + } + } + WsClientMessage::Unsubscribe { job_id } => { + self.subscribed_jobs.remove(&job_id); + let msg = WsServerMessage::unsubscribed(&job_id); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + }, + Err(e) => { + warn!( + ws_id = %self.ws_id, + error = %e, + text = %text, + "Invalid WebSocket client message" + ); + let msg = WsServerMessage::error("invalid_message", &format!("Invalid message: {}", e)); + if let Ok(json) = serde_json::to_string(&msg) { + ctx.text(json); + } + } + } + } + ws::Message::Binary(_) => { + // We don't handle binary messages + warn!(ws_id = %self.ws_id, "Received binary message, ignoring"); + } + ws::Message::Close(reason) => { + info!(ws_id = %self.ws_id, reason = ?reason, "WebSocket close received"); + ctx.close(reason); + ctx.stop(); + } + ws::Message::Continuation(_) => { + // Continuation frames not expected for our use case + ctx.stop(); + } + ws::Message::Nop => (), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ws_server_message_from_event() { + let event = JobStatusEvent { + event: "job_status".to_string(), + job_id: Uuid::new_v4(), + status: "running".to_string(), + progress: 50, + message: "Processing...".to_string(), + timestamp: "2026-01-01T00:00:00Z".to_string(), + }; + let msg = WsServerMessage::from_job_status_event(&event); + assert_eq!(msg.event, "job_status"); + assert_eq!(msg.status, "running"); + assert_eq!(msg.progress, 50); + } + + #[test] + fn test_ws_server_message_serialization() { + let msg = WsServerMessage::job_status("test-uuid", "running", 50, "Processing..."); + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("job_status")); + assert!(json.contains("running")); + assert!(json.contains("50")); + } + + #[test] + fn test_ws_client_message_subscribe() { + let json = r#"{"action": "subscribe", "job_id": "test-uuid"}"#; + let msg: WsClientMessage = serde_json::from_str(json).unwrap(); + match msg { + WsClientMessage::Subscribe { job_id } => { + assert_eq!(job_id, Some("test-uuid".to_string())); + } + _ => panic!("Expected Subscribe message"), + } + } + + #[test] + fn test_ws_client_message_subscribe_all() { + let json = r#"{"action": "subscribe"}"#; + let msg: WsClientMessage = serde_json::from_str(json).unwrap(); + match msg { + WsClientMessage::Subscribe { job_id } => { + assert!(job_id.is_none()); + } + _ => panic!("Expected Subscribe message"), + } + } + + #[test] + fn test_ws_client_message_unsubscribe() { + let json = r#"{"action": "unsubscribe", "job_id": "test-uuid"}"#; + let msg: WsClientMessage = serde_json::from_str(json).unwrap(); + match msg { + WsClientMessage::Unsubscribe { job_id } => { + assert_eq!(job_id, "test-uuid"); + } + _ => panic!("Expected Unsubscribe message"), + } + } +}