Skip to main content

AI & Agents

7 min read

SSE Transport

Turn any AgentStack token stream into a Server-Sent Events HTTP response.


Overview

createSSETransport converts the output of stack.stream() into a standard SSE byte stream. It works with any WinterCG-compatible runtime (Node 18+, Deno, Bun, Cloudflare Workers, Next.js App Router).

import { createSSETransport, createAgentStack } from '@directive-run/ai';

const transport = createSSETransport({
  maxResponseChars: 10_000,
  heartbeatIntervalMs: 15_000,
  errorMessages: {
    INPUT_GUARDRAIL_FAILED: 'Your message was flagged by our safety filter.',
  },
});

// Next.js route handler
export async function POST(request: Request) {
  const { message } = await request.json();

  return transport.toResponse(stack, 'docs-qa', message);
}

API

createSSETransport(config?)

Returns an SSETransport with two methods: toResponse() and toStream().

SSETransportConfig

PropertyTypeDefaultDescription
maxResponseCharsnumberInfinityTruncate the response after this many characters
truncationMessagestring'\n\n*[Response truncated]*'Text appended when truncation occurs
heartbeatIntervalMsnumber0 (disabled)Send a heartbeat event at this interval to keep the connection alive
errorMessagesRecord<string, string> | (error) => stringMap error codes to user-facing messages, or provide a function
headersRecord<string, string>Extra headers merged into the SSE response

SSEEvent

The transport emits a discriminated union of five event types. Import for client-side type safety:

import type { SSEEvent } from '@directive-run/ai';
type SSEEvent =
  | { type: 'text'; text: string }
  | { type: 'truncated'; text: string }
  | { type: 'done' }
  | { type: 'error'; message: string }
  | { type: 'heartbeat'; timestamp: number };

toResponse(stack, agentId, input, opts?)

Creates a full Response object with SSE headers (Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive). Pass it directly as the return value from a route handler.

export async function POST(request: Request) {
  const { message } = await request.json();

  return transport.toResponse(stack, 'docs-qa', message);
}

toStream(stack, agentId, input, opts?)

Returns just the ReadableStream<Uint8Array> for frameworks like Express or Koa where you pipe the stream into res.write() manually.

app.post('/api/chat', async (req, res) => {
  const stream = transport.toStream(stack, 'docs-qa', req.body.message);
  const reader = stream.getReader();
  res.setHeader('Content-Type', 'text/event-stream');

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    res.write(value);
  }
  res.end();
});

Both methods accept an optional { signal?: AbortSignal } for cancellation.


SSE Event Types

Each SSE frame is a JSON-encoded data: line. Clients parse the type discriminant to handle each event:

TypeFieldsWhen
texttext: stringEach token from the agent stream
truncatedtext: stringThe response exceeded maxResponseChars and was cut short
doneThe stream completed successfully
errormessage: stringAn error occurred (message is user-facing)
heartbeattimestamp: numberKeep-alive ping at the configured interval (Unix ms)

Client-side parsing

Since the transport uses data: framing with custom JSON event types (not named SSE events), use fetch with a streaming reader rather than EventSource. EventSource only supports GET requests and expects standard SSE event: fields, which this transport does not use.

const res = await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message }),
});

if (!res.ok) {
  // Handle HTTP errors (429, 400, etc.) before parsing SSE
  const err = await res.json();
  showError(err.error);

  return;
}

const reader = res.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  // Append new bytes — a single SSE frame may split across two reads
  buffer += decoder.decode(value, { stream: true });

  const lines = buffer.split('\n');
  buffer = lines.pop() ?? '';  // Retain the incomplete trailing line

  for (const line of lines) {
    if (!line.startsWith('data: ')) continue;
    const data = line.slice(6).trim();
    if (!data) continue;

    const event = JSON.parse(data);

    switch (event.type) {
      case 'text':
        appendToUI(event.text);
        break;
      case 'truncated':
        appendToUI(event.text);  // Show truncation notice
        break;
      case 'done':
        finishStream();
        break;
      case 'error':
        showError(event.message);
        break;
      case 'heartbeat':
        // Connection is alive, no action needed
        break;
    }
  }
}

Buffering is required

A single SSE frame can split across two reader.read() calls. Always retain the incomplete trailing line in a buffer (as shown above) rather than splitting on '\n' and parsing every fragment. Without buffering, JSON.parse will throw on partial frames.


Error Mapping

Map internal error codes to user-friendly messages. Pass a record of code-to-message pairs, or a function for full control:

// Record-based mapping
const transport = createSSETransport({
  errorMessages: {
    INPUT_GUARDRAIL_FAILED: 'Your message was flagged by our safety filter.',
    RATE_LIMIT_EXCEEDED: 'Too many requests. Please wait a moment.',
    CIRCUIT_OPEN: 'The service is temporarily unavailable.',
  },
});

// Function-based mapping
const transport = createSSETransport({
  errorMessages: (error) => {
    if (error instanceof RateLimitError) {
      return 'Slow down, please.';
    }

    return 'Something went wrong. Please try again.';
  },
});

When an error has a code property that matches a key in the record, that message is sent. Otherwise the default message is used: "AI service temporarily unavailable. Please try again."

Throwing error mappers

If a function-based errorMessages mapper throws, the transport catches the exception and falls back to the default error message. This prevents a broken mapper from crashing the SSE stream.


Truncation

Protect against runaway responses by capping the total character count:

const transport = createSSETransport({
  maxResponseChars: 8_000,
  truncationMessage: '\n\n---\n*Response limit reached.*',
});

When the limit is hit, the transport sends the truncation message as a truncated event, sends a done event, and aborts the underlying token stream. The final stack.stream().result is still awaited to ensure metrics and token counts are recorded.

Truncation sizing

The truncationMessage length is not counted against maxResponseChars. Suggested values:

Use casemaxResponseChars
Chat widget8,000–12,000
Docs Q&A15,000–25,000
Summarization3,000–5,000

Heartbeat

Long-running responses can be dropped by proxies and load balancers that enforce idle timeouts. Enable heartbeat to send periodic keep-alive events:

const transport = createSSETransport({
  heartbeatIntervalMs: 15_000,  // Send a heartbeat every 15 seconds
});

Heartbeat events are { type: "heartbeat", timestamp: 1707836400000 } where timestamp is Unix milliseconds (Date.now()). The timer is cleaned up automatically when the stream closes or errors.

Proxy idle timeouts

Most reverse proxies enforce idle-connection timeouts: nginx defaults to 60s, AWS ALB to 60s, and Cloudflare to 100s. Set heartbeatIntervalMs to 15,000–25,000 ms to stay well within these limits.


Abort Signal Propagation

Pass an AbortSignal to cancel the stream from the server side. This is useful for tying the stream lifetime to the HTTP request:

export async function POST(request: Request) {
  const { message } = await request.json();

  return transport.toResponse(stack, 'docs-qa', message, {
    signal: request.signal,  // Cancels the agent stream if the client disconnects
  });
}

The signal is forwarded to stack.stream(), which aborts the underlying LLM call.


createAnthropicStreamingRunner

A built-in streaming runner that calls the Anthropic Messages API with server-sent events. It is defined in helpers.ts but re-exported from the same @directive-run/ai entry point. Pair it with the SSE transport for an end-to-end Anthropic streaming pipeline.

import {
  createAnthropicRunner,
  createAnthropicStreamingRunner,
  createAgentStack,
} from '@directive-run/ai';

const streamingRunner = createAnthropicStreamingRunner({
  apiKey: process.env.ANTHROPIC_API_KEY!,
  model: 'claude-sonnet-4-5-20250929',
  maxTokens: 4096,
});

const stack = createAgentStack({
  runner: createAnthropicRunner({ apiKey: process.env.ANTHROPIC_API_KEY! }),
  streaming: { runner: streamingRunner },
  agents: {
    chat: {
      agent: { name: 'chat', instructions: 'You are helpful.', model: 'claude-sonnet-4-5-20250929' },
      capabilities: ['chat'],
    },
  },
});

AnthropicStreamingRunnerOptions

PropertyTypeDefaultDescription
apiKeystringrequiredAnthropic API key
modelstring'claude-sonnet-4-5-20250929'Default model (overridden by agent.model)
maxTokensnumber4096Maximum tokens to generate
baseURLstring'https://api.anthropic.com/v1'API base URL
fetchtypeof fetchglobalThis.fetchCustom fetch implementation

The runner reads each SSE event from the Anthropic API, emits tokens via callbacks.onToken(), tracks input/output token counts from message_start and message_delta events, and returns the assembled result.


Full Example: Next.js Route Handler

A complete Next.js App Router endpoint combining RAG enrichment, SSE transport, and AgentStack:

// app/api/chat/route.ts
import {
  createAgentStack,
  createAnthropicRunner,
  createAnthropicStreamingRunner,
  createRAGEnricher,
  createJSONFileStore,
  createOpenAIEmbedder,
  createSSETransport,
} from '@directive-run/ai';

const apiKey = process.env.ANTHROPIC_API_KEY!;

const enricher = createRAGEnricher({
  embedder: createOpenAIEmbedder({ apiKey: process.env.OPENAI_API_KEY! }),
  storage: createJSONFileStore({ filePath: './embeddings.json' }),
  topK: 5,
  minSimilarity: 0.3,
});

const stack = createAgentStack({
  runner: createAnthropicRunner({ apiKey }),
  streaming: {
    runner: createAnthropicStreamingRunner({ apiKey }),
  },
  agents: {
    'docs-qa': {
      agent: {
        name: 'docs-qa',
        instructions: 'Answer questions using the provided documentation context.',
        model: 'claude-sonnet-4-5-20250929',
      },
      capabilities: ['chat'],
    },
  },
  memory: { maxMessages: 20 },
});

const transport = createSSETransport({
  maxResponseChars: 10_000,
  heartbeatIntervalMs: 15_000,
  errorMessages: {
    INPUT_GUARDRAIL_FAILED: 'Your message was flagged by our safety filter.',
  },
});

export async function POST(request: Request) {
  const { message, history } = await request.json();

  const enrichedInput = await enricher.enrich(message, {
    history,
    filter: (chunk) => chunk.metadata.type === 'docs',
  });

  return transport.toResponse(stack, 'docs-qa', enrichedInput, {
    signal: request.signal,
  });
}

Next Steps

  • RAG Enricher – Retrieval-augmented generation pipeline
  • Agent Stack – Compose all AI features in one factory
  • Streaming – Token streaming, backpressure, and stream operators
  • Guardrails – Input/output validation and safety
Previous
MCP Integration

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript