Skip to main content

AI & Agents

10 min read

Streaming

Process agent responses token-by-token with real-time guardrails.


Orchestrator Streaming

The simplest way to stream – use orchestrator.runStream() which wraps the agent run with guardrails, approval checks, and state tracking:

import { createAgentOrchestrator } from '@directive-run/ai';
import type { AgentLike } from '@directive-run/ai';

const orchestrator = createAgentOrchestrator({
  runner,
  autoApproveToolCalls: true,
});

const agent: AgentLike = {
  name: 'assistant',
  instructions: 'You are a helpful assistant.',
  model: 'gpt-4',
};

// Start a streaming run – returns the stream, a result promise, and an abort handle
const { stream, result, abort } = orchestrator.runStream<string>(agent, 'Explain WebAssembly');

for await (const chunk of stream) {
  switch (chunk.type) {
    case 'token':
      process.stdout.write(chunk.data);                // Append each token as it arrives
      break;
    case 'tool_start':
      console.log(`\nCalling tool: ${chunk.tool}`);    // Agent is invoking a tool
      break;
    case 'tool_end':
      console.log(`Tool done: ${chunk.result}`);       // Tool returned a result
      break;
    case 'guardrail_triggered':
      console.warn(`Guardrail ${chunk.guardrailName}: ${chunk.reason}`);  // Safety check fired
      break;
    case 'approval_required':
      // Pause and show UI – call orchestrator.approve(chunk.requestId) to continue
      break;
    case 'done':
      console.log(`\n\nDone: ${chunk.totalTokens} tokens in ${chunk.duration}ms`);
      break;
    case 'error':
      console.error(chunk.error);
      break;
  }
}

// Await the completed result after the stream closes
const finalResult = await result;

Chunk Types

Every stream chunk has a type discriminant:

TypeFieldsWhen
tokendata, tokenCountEach token from the agent
tool_starttool, toolCallId, argumentsAgent starts calling a tool
tool_endtool, toolCallId, resultTool call completes
messagemessageFull message added to conversation
guardrail_triggeredguardrailName, reason, partialOutput, stoppedA guardrail blocked content
approval_requiredrequestId, toolNameTool call needs approval
approval_resolvedrequestId, approvedApproval decision made
progressphase, messageStatus update (starting, generating, tool_calling, finishing)
donetotalTokens, duration, droppedTokensStream completed
errorerror, partialOutput?An error occurred

Cancellation

Abort a stream at any time:

const { stream, result, abort } = orchestrator.runStream(agent, input);

// Cancel after a timeout using the abort handle
setTimeout(() => abort(), 5000);

// Or pass an AbortSignal for external cancellation control
const controller = new AbortController();
const { stream: s2 } = orchestrator.runStream(agent, input, {
  signal: controller.signal,
});

// Trigger cancellation from anywhere that holds the controller
controller.abort();

Stack Streaming

The AgentStack offers two streaming methods. stack.stream() yields raw token strings, while stack.streamChunks() yields the same rich StreamChunk types as the orchestrator:

import { createAgentStack } from '@directive-run/ai';

const stack = createAgentStack({
  runner,
  streaming: { runner: myStreamingRunner },
  agents: { chat: { agent: chatAgent, capabilities: ['chat'] } },
});

// Simple stream – yields one raw token string at a time
const tokenStream = stack.stream('chat', 'Hello!');
for await (const token of tokenStream) {
  process.stdout.write(token);
}

// Rich stream – yields typed chunks (tokens, tool calls, guardrails, progress)
const { stream, result, abort } = stack.streamChunks('chat', 'Hello!');
for await (const chunk of stream) {
  if (chunk.type === 'token') process.stdout.write(chunk.data);
}
const finalResult = await result;

Both methods automatically track tokens, record observability spans, and publish to the message bus.


Provider Streaming Runners

Directive ships pre-built streaming runners for OpenAI and Anthropic. These handle SSE parsing, token extraction, and lifecycle hooks automatically:

OpenAI Streaming

import { createOpenAIRunner, createOpenAIStreamingRunner } from '@directive-run/ai/openai';

const runner = createOpenAIRunner({ apiKey: process.env.OPENAI_API_KEY! });
const streamingRunner = createOpenAIStreamingRunner({
  apiKey: process.env.OPENAI_API_KEY!,
  hooks: {
    onAfterCall: ({ durationMs, tokenUsage }) => {
      console.log(`${durationMs}ms – ${tokenUsage.inputTokens}in/${tokenUsage.outputTokens}out`);
    },
  },
});

const stack = createAgentStack({
  runner,
  streaming: { runner: streamingRunner },
  agents: { chat: { instructions: 'You are a helpful assistant.' } },
});

Anthropic Streaming

import { createAnthropicRunner, createAnthropicStreamingRunner } from '@directive-run/ai/anthropic';

const runner = createAnthropicRunner({ apiKey: process.env.ANTHROPIC_API_KEY! });
const streamingRunner = createAnthropicStreamingRunner({
  apiKey: process.env.ANTHROPIC_API_KEY!,
});

const stack = createAgentStack({
  runner,
  streaming: { runner: streamingRunner },
  agents: { chat: { instructions: 'You are a helpful assistant.' } },
});

Both streaming runners return tokenUsage with input/output breakdown and support the same hooks interface as the standard runners.


Standalone Streaming

For streaming outside the orchestrator (e.g., direct agent runs without guardrails/approvals), use createStreamingRunner:

import { createStreamingRunner } from '@directive-run/ai';
import type { StreamRunOptions } from '@directive-run/ai';

// Build a streaming runner by wrapping your LLM SDK's streaming API
const streamRunner = createStreamingRunner(
  async (agent, input, callbacks) => {
    // Start a streaming completion request
    const stream = await openai.chat.completions.create({
      model: agent.model ?? 'gpt-4',
      messages: [
        { role: 'system', content: agent.instructions ?? '' },
        { role: 'user', content: input },
      ],
      stream: true,
    });

    const messages = [];
    let fullContent = '';

    for await (const chunk of stream) {
      if (callbacks.signal?.aborted) break;    // Stop if the caller cancelled

      const token = chunk.choices[0]?.delta?.content ?? '';
      if (token) {
        callbacks.onToken?.(token);            // Push each token to the stream
        fullContent += token;
      }
    }

    // Return the final assembled result
    return {
      output: fullContent,
      messages,
      toolCalls: [],
      totalTokens: Math.ceil(fullContent.length / 4),
    };
  }
);

// Use the runner with backpressure and guardrail options
const { stream, result, abort } = streamRunner(agent, 'Hello', {
  backpressure: 'buffer',
  guardrailCheckInterval: 50,
});

Backpressure

Control what happens when the consumer is slower than the producer:

// Buffer – keeps all tokens in memory (lossless, default)
const { stream } = streamRunner(agent, input, {
  backpressure: 'buffer',
});

// Drop – discards tokens when the buffer fills up (lossy, but fast)
const { stream: s2 } = streamRunner(agent, input, {
  backpressure: 'drop',
  bufferSize: 100,
});

// Block – pauses the producer until the consumer catches up (lossless, may slow response)
const { stream: s3 } = streamRunner(agent, input, {
  backpressure: 'block',
  bufferSize: 500,
});

The done chunk includes droppedTokens count when using the drop strategy.


Streaming Guardrails

Evaluate guardrails on partial output as tokens arrive, without waiting for the full response:

import {
  createStreamingRunner,
  createLengthStreamingGuardrail,
  createPatternStreamingGuardrail,
  combineStreamingGuardrails,
} from '@directive-run/ai';

const streamRunner = createStreamingRunner(baseRunner, {
  streamingGuardrails: [
    // Halt the stream if the output grows too long
    createLengthStreamingGuardrail({
      maxTokens: 2000,
      warnAt: 1500,               // Emit a warning chunk at 75%
    }),

    // Halt the stream when sensitive data patterns appear
    createPatternStreamingGuardrail({
      patterns: [
        { regex: /\b\d{3}-\d{2}-\d{4}\b/, name: 'SSN' },
        { regex: /\bpassword\s*[:=]/i, name: 'Password leak' },
      ],
    }),
  ],
});

const { stream } = streamRunner(agent, input, {
  guardrailCheckInterval: 50,     // Evaluate guardrails every 50 tokens
  stopOnGuardrail: true,          // Terminate the stream on any guardrail failure
});

for await (const chunk of stream) {
  if (chunk.type === 'guardrail_triggered') {
    console.warn(`${chunk.guardrailName}: ${chunk.reason}`);
    if (chunk.stopped) break;     // Stream was halted by the guardrail
  }
}

Combining Guardrails

Merge multiple streaming guardrails into one:

import { combineStreamingGuardrails } from '@directive-run/ai';

// Merge multiple streaming guardrails into a single checker
const combined = combineStreamingGuardrails([
  createLengthStreamingGuardrail({ maxTokens: 2000 }),
  createPatternStreamingGuardrail({ patterns: [...] }),
]);

const streamRunner = createStreamingRunner(baseRunner, {
  streamingGuardrails: [combined],
});

Adapting Output Guardrails

Reuse existing output guardrails as streaming guardrails:

import { adaptOutputGuardrail } from '@directive-run/ai';

// Reuse an existing output guardrail as a streaming guardrail
const streamingVersion = adaptOutputGuardrail(
  'pii-streaming',        // Name for logging and error messages
  myOutputGuardrail,       // Your existing guardrail function
  { minTokens: 100 },     // Wait for 100 tokens before first check
);

Stream Operators

Transform, filter, and inspect streams with composable operators:

Collect Tokens

Consume an entire stream and return the concatenated text:

import { collectTokens } from '@directive-run/ai';

// Consume the entire stream and return the concatenated text
const { stream } = orchestrator.runStream(agent, input);
const fullOutput = await collectTokens(stream);

Tap

Observe chunks without modifying the stream (logging, metrics):

import { tapStream } from '@directive-run/ai';

const { stream } = orchestrator.runStream(agent, input);

// Observe each chunk for side effects (logging, metrics) without modifying it
const logged = tapStream(stream, (chunk) => {
  if (chunk.type === 'token') tokenCount++;
  if (chunk.type === 'error') reportError(chunk.error);
});

for await (const chunk of logged) {
  // Chunks are unchanged – tap only inspects them
}

Filter

Keep only specific chunk types:

import { filterStream } from '@directive-run/ai';

const { stream } = orchestrator.runStream(agent, input);

// Keep only the chunk types you care about
const tokensOnly = filterStream(stream, ['token', 'done']);

for await (const chunk of tokensOnly) {
  // TypeScript narrows chunk.type to 'token' | 'done'
}

Map

Transform chunks:

import { mapStream } from '@directive-run/ai';

const { stream } = orchestrator.runStream(agent, input);

// Transform each chunk as it flows through the stream
const uppercased = mapStream(stream, (chunk) => {
  if (chunk.type === 'token') {
    return { ...chunk, data: chunk.data.toUpperCase() };
  }
  return chunk;  // Pass non-token chunks through unchanged
});

Framework Integration

The streaming API is framework-agnostic – orchestrator.runStream() works the same everywhere. The framework layer handles reactive UI updates as chunks arrive.

React

import { useState, useCallback } from 'react';
import { useAgentOrchestrator, useFact } from '@directive-run/react';

function ChatStream() {
  const orchestrator = useAgentOrchestrator({ runner, autoApproveToolCalls: true });
  const agent = useFact(orchestrator.system, '__agent');
  const [output, setOutput] = useState('');

  const send = useCallback(async (input: string) => {
    setOutput('');  // Clear previous output before starting a new stream

    const { stream } = orchestrator.runStream(myAgent, input);

    // Append each token to state as it arrives
    for await (const chunk of stream) {
      if (chunk.type === 'token') setOutput((prev) => prev + chunk.data);
    }
  }, [orchestrator]);

  return (
    <div>
      <p>{output}</p>
      {agent?.status === 'running' && <span className="cursor" />}
    </div>
  );
}

Vue

<script setup>
import { ref, onUnmounted } from 'vue';
import { createAgentOrchestrator } from '@directive-run/ai';
import { useFact } from '@directive-run/vue';

const orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
onUnmounted(() => orchestrator.dispose());

const agent = useFact(orchestrator.system, '__agent');
const output = ref('');

async function send(input: string) {
  output.value = '';  // Reset before each new stream

  const { stream } = orchestrator.runStream(myAgent, input);
  for await (const chunk of stream) {
    if (chunk.type === 'token') output.value += chunk.data;  // Append tokens reactively
  }
}
</script>

<template>
  <p>{{ output }}</p>
  <span v-if="agent?.status === 'running'" class="cursor" />
</template>

Svelte

<script>
import { createAgentOrchestrator } from '@directive-run/ai';
import { useFact } from '@directive-run/svelte';
import { onDestroy } from 'svelte';

const orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
onDestroy(() => orchestrator.dispose());

const agent = useFact(orchestrator.system, '__agent');
let output = '';

async function send(input) {
  output = '';  // Clear previous response

  const { stream } = orchestrator.runStream(myAgent, input);
  for await (const chunk of stream) {
    if (chunk.type === 'token') output += chunk.data;  // Svelte reactively updates the template
  }
}
</script>

<p>{output}</p>
{#if $agent?.status === 'running'}<span class="cursor" />{/if}

Solid

import { createSignal } from 'solid-js';
import { createAgentOrchestrator } from '@directive-run/ai';
import { useFact } from '@directive-run/solid';
import { onCleanup } from 'solid-js';

function ChatStream() {
  const orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
  onCleanup(() => orchestrator.dispose());

  const agent = useFact(orchestrator.system, '__agent');
  const [output, setOutput] = createSignal('');

  async function send(input: string) {
    setOutput('');  // Reset signal before streaming

    const { stream } = orchestrator.runStream(myAgent, input);
    for await (const chunk of stream) {
      if (chunk.type === 'token') setOutput((prev) => prev + chunk.data);
    }
  }

  return (
    <div>
      <p>{output()}</p>
      {agent()?.status === 'running' && <span class="cursor" />}
    </div>
  );
}

Lit

import { LitElement, html } from 'lit';
import { createAgentOrchestrator } from '@directive-run/ai';
import { FactController } from '@directive-run/lit';

class ChatStream extends LitElement {
  private orchestrator = createAgentOrchestrator({ runner, autoApproveToolCalls: true });
  private agent = new FactController(this, this.orchestrator.system, '__agent');
  private output = '';

  disconnectedCallback() {
    super.disconnectedCallback();
    this.orchestrator.dispose();
  }

  async send(input: string) {
    this.output = '';
    this.requestUpdate();  // Clear the display immediately

    const { stream } = this.orchestrator.runStream(myAgent, input);
    for await (const chunk of stream) {
      if (chunk.type === 'token') {
        this.output += chunk.data;
        this.requestUpdate();  // Re-render after each token
      }
    }
  }

  render() {
    return html`
      <p>${this.output}</p>
      ${this.agent.value?.status === 'running' ? html`<span class="cursor"></span>` : ''}
    `;
  }
}

Next Steps

Previous
Guardrails

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript