Skip to content

Fase 2: Real-Time con SSE

← Fase 1 MVP | Fase 3 Production →


Objetivo de la Fase

Reemplazar el polling del MVP con Server-Sent Events (SSE) para actualizaciones en tiempo real. El frontend recibirá notificaciones instantáneas cuando cambia el estado de un job, usando PostgreSQL NOTIFY/LISTEN.

Entregables principales:

  • Endpoint SSE con fallback a polling
  • PostgreSQL NOTIFY en worker cuando cambia estado
  • Hook React useJobStream con EventSource
  • Progress tracking granular (0-100%)
  • UI con progress bar en tiempo real

Subfases

2.1 SSE Endpoint con PostgreSQL NOTIFY

Tareas: 3 tarjetas Estimación: 13 horas Objetivo: Implementar SSE endpoint y NOTIFY en worker

Tareas principales:

  • JobStreamController con SSE (5h)
  • Modificar worker para emitir NOTIFY (4h)
  • Hook useJobStream frontend (4h)

Dependencias: Requiere completar Fase 1 completa


2.2 Progress Tracking

Tareas: 3 tarjetas Estimación: 8 horas Objetivo: Agregar columna progress y actualizar UI en tiempo real

Tareas principales:

  • Migración columna progress (2h)
  • JobRunner actualiza progress (3h)
  • Componente JobProgressBar (3h)

Dependencias: Requiere completar Fase 2.1


Dependencias entre Subfases

Fase 1 MVP completa

2.1 SSE Endpoint con PostgreSQL NOTIFY

2.2 Progress Tracking

Secuencia recomendada:

  1. Validar que Fase 1 está completa y tests pasan
  2. Implementar 2.1 (SSE + NOTIFY)
  3. Implementar 2.2 (progress)
  4. Testing integral con real-time updates

Estimación Total de la Fase

Total de tarjetas: 6 tarjetas Total de horas: ~21 horas Duración estimada (1 dev full-time): 2.5-3 días laborables Duración estimada (1 dev part-time 50%): 5-6 días laborables


Criterios de Completitud

La Fase 2 se considera completa cuando:

  • [ ] SSE endpoint funcional con Content-Type: text/event-stream
  • [ ] PostgreSQL NOTIFY emitido al cambiar status
  • [ ] Canales específicos por schema (suc0001_background_jobs)
  • [ ] Hook useJobStream con auto-reconnect
  • [ ] Fallback a polling si SSE no disponible
  • [ ] Columna progress agregada y migrada
  • [ ] JobRunner actualiza progress durante ejecución
  • [ ] UI muestra progress bar 0-100% en tiempo real
  • [ ] Tests con mock EventSource
  • [ ] Tests fallback mode
  • [ ] Backward compatible con polling (Fase 1)

Notas Técnicas

Arquitectura SSE

┌──────────────┐
│   Frontend   │ ──── EventSource /jobs/:id/stream ───┐
│              │         (persistent connection)        │
└──────────────┘                                        ▼
       ▲                                        ┌──────────────┐
       │                                        │ JobStream    │
       └──── SSE events ◄──────────────────────│ Controller   │
             (status_changed, progress_update) └──────────────┘


                                                ┌───────┴──────┐
                                                │ PostgreSQL   │
                                                │ LISTEN       │
                                                │ channel      │
                                                └───────▲──────┘

┌──────────────┐                                        │
│ bin/worker   │ ────► UPDATE status ──────► NOTIFY ───┘
│     CLI      │       background_jobs      background_jobs_channel
└──────────────┘       SET progress = 50    { jobId, status, progress }

PostgreSQL NOTIFY

sql
-- En JobRunner al cambiar status
NOTIFY suc0001_background_jobs, '{"jobId": "uuid-123", "status": "completed", "progress": 100}';
php
// JobStreamController
$pdo->exec("LISTEN suc0001_background_jobs");
while (true) {
    $notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000);
    if ($notification) {
        echo "data: " . json_encode($notification['payload']) . "\n\n";
        ob_flush();
        flush();
    }
}

EventSource (Frontend)

typescript
// useJobStream.ts
const eventSource = new EventSource(`/jobs/${jobId}/stream`);

eventSource.addEventListener('status_changed', (e) => {
  const data = JSON.parse(e.data);
  setStatus(data.status);
});

eventSource.addEventListener('progress_updated', (e) => {
  const data = JSON.parse(e.data);
  setProgress(data.progress);
});

// Cleanup
return () => eventSource.close();

Progress Tracking

php
// BatchInvoicingJobHandler
public function handle(array $payload): array
{
    $facturas = $payload['facturas'];
    $total = count($facturas);

    foreach ($facturas as $index => $facturaId) {
        $this->procesarFactura($facturaId);

        $progress = (($index + 1) / $total) * 100;
        $this->jobRunner->setProgress($this->jobId, $progress);
        // ↑ Emite NOTIFY con progress update
    }
}

Fallback a Polling

typescript
// useJobStream.ts
const [useSSE, setUseSSE] = useState(true);

useEffect(() => {
  if (!useSSE || !window.EventSource) {
    // Fallback: polling cada 2s
    const interval = setInterval(() => {
      fetch(`/jobs/${jobId}`).then(/* ... */);
    }, 2000);
    return () => clearInterval(interval);
  }

  // SSE normal
  const es = new EventSource(`/jobs/${jobId}/stream`);
  // ...
}, [useSSE, jobId]);

Testing SSE

Mock EventSource (Frontend)

typescript
// tests/mocks/EventSource.ts
class MockEventSource {
  addEventListener(type: string, callback: Function) {
    this.listeners[type] = callback;
  }

  simulateEvent(type: string, data: any) {
    this.listeners[type]?.({ data: JSON.stringify(data) });
  }
}

global.EventSource = MockEventSource;

Integration Tests (Backend)

php
// JobStreamControllerTest.php
public function testSSEEmitsStatusChangedEvent(): void
{
    $response = $this->client->request('GET', "/jobs/{$jobId}/stream");

    // En otro thread/proceso, cambiar status
    $this->worker->processJob($jobId);

    // Verificar que response stream contiene evento
    $this->assertStringContainsString('event: status_changed', $response->getBody());
}

← Fase 1 MVP | Fase 3 Production →