Appearance
Fase 2: Real-Time con SSE
← Fase 1 MVP | Fase 3 Production →
Objetivo de la Fase
Reemplazar el polling del MVP con Server-Sent Events (SSE) para actualizaciones en tiempo real. El frontend recibirá notificaciones instantáneas cuando cambia el estado de un job, usando PostgreSQL NOTIFY/LISTEN.
Entregables principales:
- Endpoint SSE con fallback a polling
- PostgreSQL NOTIFY en worker cuando cambia estado
- Hook React
useJobStreamcon EventSource - Progress tracking granular (0-100%)
- UI con progress bar en tiempo real
Subfases
2.1 SSE Endpoint con PostgreSQL NOTIFY
Tareas: 3 tarjetas Estimación: 13 horas Objetivo: Implementar SSE endpoint y NOTIFY en worker
Tareas principales:
- JobStreamController con SSE (5h)
- Modificar worker para emitir NOTIFY (4h)
- Hook useJobStream frontend (4h)
Dependencias: Requiere completar Fase 1 completa
2.2 Progress Tracking
Tareas: 3 tarjetas Estimación: 8 horas Objetivo: Agregar columna progress y actualizar UI en tiempo real
Tareas principales:
- Migración columna progress (2h)
- JobRunner actualiza progress (3h)
- Componente JobProgressBar (3h)
Dependencias: Requiere completar Fase 2.1
Dependencias entre Subfases
Fase 1 MVP completa
↓
2.1 SSE Endpoint con PostgreSQL NOTIFY
↓
2.2 Progress TrackingSecuencia recomendada:
- Validar que Fase 1 está completa y tests pasan
- Implementar 2.1 (SSE + NOTIFY)
- Implementar 2.2 (progress)
- Testing integral con real-time updates
Estimación Total de la Fase
Total de tarjetas: 6 tarjetas Total de horas: ~21 horas Duración estimada (1 dev full-time): 2.5-3 días laborables Duración estimada (1 dev part-time 50%): 5-6 días laborables
Criterios de Completitud
La Fase 2 se considera completa cuando:
- [ ] SSE endpoint funcional con Content-Type: text/event-stream
- [ ] PostgreSQL NOTIFY emitido al cambiar status
- [ ] Canales específicos por schema (suc0001_background_jobs)
- [ ] Hook useJobStream con auto-reconnect
- [ ] Fallback a polling si SSE no disponible
- [ ] Columna progress agregada y migrada
- [ ] JobRunner actualiza progress durante ejecución
- [ ] UI muestra progress bar 0-100% en tiempo real
- [ ] Tests con mock EventSource
- [ ] Tests fallback mode
- [ ] Backward compatible con polling (Fase 1)
Notas Técnicas
Arquitectura SSE
┌──────────────┐
│ Frontend │ ──── EventSource /jobs/:id/stream ───┐
│ │ (persistent connection) │
└──────────────┘ ▼
▲ ┌──────────────┐
│ │ JobStream │
└──── SSE events ◄──────────────────────│ Controller │
(status_changed, progress_update) └──────────────┘
▲
│
┌───────┴──────┐
│ PostgreSQL │
│ LISTEN │
│ channel │
└───────▲──────┘
│
┌──────────────┐ │
│ bin/worker │ ────► UPDATE status ──────► NOTIFY ───┘
│ CLI │ background_jobs background_jobs_channel
└──────────────┘ SET progress = 50 { jobId, status, progress }PostgreSQL NOTIFY
sql
-- En JobRunner al cambiar status
NOTIFY suc0001_background_jobs, '{"jobId": "uuid-123", "status": "completed", "progress": 100}';php
// JobStreamController
$pdo->exec("LISTEN suc0001_background_jobs");
while (true) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000);
if ($notification) {
echo "data: " . json_encode($notification['payload']) . "\n\n";
ob_flush();
flush();
}
}EventSource (Frontend)
typescript
// useJobStream.ts
const eventSource = new EventSource(`/jobs/${jobId}/stream`);
eventSource.addEventListener('status_changed', (e) => {
const data = JSON.parse(e.data);
setStatus(data.status);
});
eventSource.addEventListener('progress_updated', (e) => {
const data = JSON.parse(e.data);
setProgress(data.progress);
});
// Cleanup
return () => eventSource.close();Progress Tracking
php
// BatchInvoicingJobHandler
public function handle(array $payload): array
{
$facturas = $payload['facturas'];
$total = count($facturas);
foreach ($facturas as $index => $facturaId) {
$this->procesarFactura($facturaId);
$progress = (($index + 1) / $total) * 100;
$this->jobRunner->setProgress($this->jobId, $progress);
// ↑ Emite NOTIFY con progress update
}
}Fallback a Polling
typescript
// useJobStream.ts
const [useSSE, setUseSSE] = useState(true);
useEffect(() => {
if (!useSSE || !window.EventSource) {
// Fallback: polling cada 2s
const interval = setInterval(() => {
fetch(`/jobs/${jobId}`).then(/* ... */);
}, 2000);
return () => clearInterval(interval);
}
// SSE normal
const es = new EventSource(`/jobs/${jobId}/stream`);
// ...
}, [useSSE, jobId]);Testing SSE
Mock EventSource (Frontend)
typescript
// tests/mocks/EventSource.ts
class MockEventSource {
addEventListener(type: string, callback: Function) {
this.listeners[type] = callback;
}
simulateEvent(type: string, data: any) {
this.listeners[type]?.({ data: JSON.stringify(data) });
}
}
global.EventSource = MockEventSource;Integration Tests (Backend)
php
// JobStreamControllerTest.php
public function testSSEEmitsStatusChangedEvent(): void
{
$response = $this->client->request('GET', "/jobs/{$jobId}/stream");
// En otro thread/proceso, cambiar status
$this->worker->processJob($jobId);
// Verificar que response stream contiene evento
$this->assertStringContainsString('event: status_changed', $response->getBody());
}