One Channel: SSE for Everything

Current State — 3 channels, overlapping events

flowchart LR
  subgraph server [Server Emitters]
    emitBotEvent["emitBotEvent()"]
    ioProxy["ioProxy.emit() / broadcast()"]
    broadcastToBot["broadcastToBot()"]
    broadcastAllWC["broadcastToAllWebchat()"]
    broadcastStatus["broadcastBotStatus()"]
  end

  subgraph channels [Channels]
    SSE["SSE /api/events"]
    DashWS["Dash WS /ws/dash"]
    BotWS["Per-bot WS /ws/bots/:id"]
  end

  subgraph clients [Clients]
    Chat["Chat Frontend"]
    Dashboard["Dashboard"]
  end

  emitBotEvent --> SSE
  emitBotEvent --> DashWS
  ioProxy --> DashWS
  broadcastToBot --> DashWS
  broadcastAllWC --> BotWS
  broadcastStatus --> BotWS
  broadcastStatus --> DashWS

  SSE --> Chat
  DashWS --> Dashboard
  BotWS --> Chat

Problem: Dashboard gets events from dash-ws only. SSE exists and is better (Redis-buffered, replay, works through Caddy) but the dashboard doesn't use it. Events are split across 3 channels with different naming conventions.

Target State — 1 event channel

flowchart LR
  subgraph server [Server Emit API]
    emit["emit()"]
    emitMessage["emitMessage()"]
    emitDelta["emitDelta()"]
    emitAlert["emitAlert()"]
    emitStatus["emitStatus()"]
    emitEvent["emitEvent()"]
    emitTool["emitTool()"]
  end

  subgraph core [Core]
    pushBotEvent["pushBotEvent() — SSE + Redis"]
  end

  subgraph channel [Channel]
    SSE["SSE /api/events"]
  end

  subgraph rpc [RPC Only]
    BotWS["Per-bot WS /ws/bots/:id"]
  end

  subgraph clients [Clients]
    Chat["Chat Frontend"]
    Dashboard["Dashboard"]
  end

  emitMessage --> pushBotEvent
  emitDelta --> pushBotEvent
  emitAlert --> pushBotEvent
  emitStatus --> pushBotEvent
  emitEvent --> pushBotEvent
  emitTool --> pushBotEvent
  emit --> pushBotEvent
  pushBotEvent --> SSE
  SSE --> Chat
  SSE --> Dashboard
  BotWS -.->|"chat.send, chat.history, connect"| Chat

Implementation

Step 1: Server — one core, named wrappers

Create the emit API in a new lib/emit.ts — one core function (emit) that calls pushBotEvent, and named wrappers for each event category. Every wrapper adds a debug log so the full event flow is traceable.

import { pushBotEvent } from './ws-manager';
import createDebug from './shared-debug';

const debug = createDebug('emit');

// ── Core ─────────────────────────────────────────────────────────────────────
// The ONE function. Everything goes through here.
function emit(type: string, botId: string, data: Record<string, unknown> = {}): void {
  pushBotEvent(botId, { type, ...data });
}

// ── Named wrappers ───────────────────────────────────────────────────────────
// Each wrapper: debug log + emit. That's it.

/** Chat message stored and ready to display */
export function emitMessage(botId: string, data: Record<string, unknown>): void {
  debug('message %s role=%s src=%s', botId, data.role, data.source);
  emit('new_message', botId, data);
}

/** Streaming chat delta (partial text) */
export function emitDelta(botId: string, data: Record<string, unknown>): void {
  debug('delta %s len=%d', botId, (data.delta as string)?.length ?? 0);
  emit('event', botId, { event: 'chat', payload: { state: 'delta', ...data } });
}

/** Chat status change (final, error, done) */
export function emitChatStatus(botId: string, state: string, data: Record<string, unknown> = {}): void {
  debug('chat-status %s state=%s', botId, state);
  emit('event', botId, { event: 'chat', payload: { state, ...data } });
}

/** Bot working/idle status broadcast */
export function emitStatus(botId: string, data: Record<string, unknown>): void {
  debug('status %s state=%s', botId, data.state);
  emit('bot-activity', botId, data);
}

/** Tool call or tool result */
export function emitTool(botId: string, data: Record<string, unknown>): void {
  debug('tool %s role=%s', botId, data.role);
  emit('tool', botId, data);
}

/** Health alert, auth warning, system alert */
export function emitAlert(type: string, botId: string, data: Record<string, unknown>): void {
  debug('alert %s %s', type, botId);
  emit(type, botId, data);
}

/** Generic event — for anything without a dedicated wrapper */
export function emitEvent(type: string, data: Record<string, unknown> = {}, botId: string = '*'): void {
  debug('event %s %s', type, botId);
  emit(type, botId, data);
}

Every server module imports the wrapper it needs: import { emitMessage, emitEvent } from '../lib/emit'. No more broadcast, broadcastToBot, _ioProxy.emit, dashIo.emit, io.emit. One module, one import, one debug namespace.

Fix SSE subscription in routes/api.ts — dashboard gets sseKey = '*' (line ~259):

// Before: const sseKey = (originHost === config.chatDomain) ? '*' : originHost;
const sseKey = '*';  // All authenticated clients get all events

Migrate every server-side emitter to the named wrappers:

Current call File New call
_ioProxy.emit('bot-activity', {...}) lib/ws-manager.ts broadcastBotStatus emitStatus(botId, {...})
_ioProxy.emit('bots-status-update', {...}) lib/ws-manager.ts broadcastBotStatus emitEvent('bots-status-update', {...}, botId)
broadcastToBot('chat-delta', botId, {...}) lib/ws-manager.ts emitBotEvent Remove (SSE already carries this via pushBotEvent)
broadcastToBot('chat-status', botId, {...}) lib/ws-manager.ts emitBotEvent Remove (same)
sendToDashboard(botId, event) lib/ws-manager.ts emitBotEvent Remove (SSE already carries new_message)
dashIo.emit('chat-message', {...}) lib/message-capture.ts Remove (already goes through emitBotEvent as new_message)
bridge.ts delta emission lib/gate-runtime/bridge.ts Already uses emitBotEvent — no change needed (SSE carries it)
bridge.ts tool emission lib/gate-runtime/bridge.ts Already uses emitBotEvent — no change needed
broadcastToBot('code-session:status', ...) lib/code-session.ts emitEvent('code-session:status', {...}, botId)
broadcastToBot('code-session:event', ...) lib/code-session.ts emitEvent('code-session:event', {...}, botId)
broadcast('docker:status', {...}) api/docker-env.ts (~10 places) emitEvent('docker:status', {...})
broadcast('auth-profiles:refreshed', {...}) lib/jobs/token-refresh.ts emitEvent('auth-profiles:refreshed', {...})
broadcast('auth-profiles:usage-updated', {...}) lib/usage-cache.ts (2 places) emitEvent('auth-profiles:usage-updated', {...})
dashIo.emit('bot-health-alert', alert) lib/bot-health.ts emitAlert('bot-health-alert', botId, alert)
dashIo.emit('bot-auth-event', evt) lib/auth-health-monitor.ts emitAlert('bot-auth-event', '*', evt)
dashIo.emit('bot-heartbeat', {...}) lib/heartbeat-capture.ts (2 places) emitEvent('bot-heartbeat', {...}, botId)
io.emit('heartbeat-updated', {...}) api/bots-data.ts emitEvent('heartbeat-updated', {...}, botId)
io.emit('provision-log', {...}) api/bot-provision.ts emitEvent('provision-log', {...})
io.emit('provision-done', {...}) api/bot-provision.ts (5 places) emitEvent('provision-done', {...})
io.emit('task:created', {...}) api/task-mutations.ts (2 places) emitEvent('task:created', {...})
io.emit('task:updated', {...}) api/task-mutations.ts + api/task-sub-resources.ts + api/task-execution.ts emitEvent('task:updated', {...})
io.emit('task:claimed/completed/...', ...) api/task-mutations.ts emitEvent('task:...', {...})
io.emit('task:item:*', ...) api/task-sub-resources.ts emitEvent('task:item:...', {...})
io.emit('task:comment/feedback/...', ...) api/task-sub-resources.ts emitEvent('task:...', {...})

Remove from broadcastBotStatus: the loop over webchatConnections that pushes bot:status to every per-bot WS client. Chat gets status via sendBotStatus on connect + RPC responses.

Remove broadcastToAllWebchat calls from lib/jobs/token-refresh.ts and lib/usage-cache.ts. Those events go through SSE now; chat already handles them via SSE in handleSseEvent.

Remove io parameter from task route factories, bot-provision, docker-env, etc. They import { emitEvent } from '../lib/emit' directly instead of receiving a dash-ws proxy.

Step 2: Dashboard — EventSource client

Rewrite clients/dashboard/src/lib/socket.ts — replace WebSocket to /ws/dash with EventSource to /api/events:

let es: EventSource | null = null

function connect() {
  const url = `${location.protocol}//${location.host}/api/events`
  es = new EventSource(url, { withCredentials: true })
  
  es.onopen = () => { connected = true; emit("connect", undefined) }
  
  es.onmessage = (e) => {
    const data = JSON.parse(e.data)
    const mapped = mapEvent(data)
    if (mapped) emit(mapped.event, mapped.data)
  }
  
  es.onerror = () => { /* reconnect handled by EventSource spec */ }
}

Event mapping — SSE events use a type field. Map to the dash-ws event names that components already listen for:

function mapEvent(sseData: any): { event: string; data: any } | null {
  const { type } = sseData
  
  // Chat streaming: { type: 'event', event: 'chat', payload: { state: 'delta', ... } }
  if (type === 'event' && sseData.event === 'chat') {
    const state = sseData.payload?.state
    if (state === 'delta') return { event: 'chat-delta', data: { botId: sseData.botId, ...sseData.payload } }
    return { event: 'chat-status', data: { botId: sseData.botId, state, ...sseData.payload } }
  }
  
  // New message: { type: 'new_message', botId, role, ... }
  if (type === 'new_message') return { event: 'chat-message', data: sseData }
  
  // Everything else: type IS the event name, data is the whole object
  return { event: type, data: sseData }
}

Zero component changes. useSocketEvent hook and every socket.on() call works unchanged — only the transport inside socket.ts changes.

EventSource auto-reconnects per the SSE spec. Last-Event-ID header is sent automatically on reconnect, and the server already supports replay via replaySseEvents. This gives us reconnection with zero missed events for free — an upgrade from the current dash-ws which has no replay.

Step 3: Cleanup

Server removals:

Dashboard removals:

Files touched (complete list)

New file:

Server (15 files modified):

Dashboard (1 file):

Chat (5 files):

Dashboard (2 files):

Step 4: Kill per-bot WS — HTTP for all client-to-server

Per-bot WS is a duplicate of existing HTTP endpoints. Every RPC operation already has an HTTP equivalent:

WS RPC HTTP endpoint File
chat.send POST /api/chat/:botId/send api/message-queue.ts
chat.history GET /api/bots/:botId/messages api/messages.ts
chat.abort POST /api/bots/:botId/stop-agent api/bot-control.ts
sessions.list GET /api/bots/:botId/sessions api/messages.ts
sendBotStatus GET /api/bots/working-status api/bots-status.ts

Chat frontend changes:

Dashboard frontend changes:

Server removals:

What stays

Access control

SSE with sseKey = '*' sends all events to all authenticated clients. The dashboard already has access control via the API — components only render bots the user has access to. Admin-only event types (tool, ndjson, compaction) are already gated server-side in ADMIN_ONLY_EVENT_TYPES.

This matches how chat already works (also uses sseKey = '*', filters client-side by active bot).

End state

  Server                          Clients
  ──────                          ───────
  pushBotEvent() ──→ SSE ──────→ Dashboard (EventSource)
                          └────→ Chat (EventSource)

  HTTP API ←──────────────────── Dashboard (fetch)
           ←──────────────────── Chat (fetch/apiFetch)

Two arrows. That's it. SSE down, HTTP up. No WebSocket for chat. No dash-ws. No per-bot WS. No "kept for fallback." No "on-demand only." No "just for this one thing."