Data Fetching
•3 min read
Subscriptions
Subscriptions handle push-based data – WebSocket connections, Server-Sent Events, AI streaming responses. They produce the same ResourceState<T> as queries, with automatic lifecycle management.
Basic Subscription
import { createSubscription } from "@directive-run/query";
const prices = createSubscription({
name: "prices",
key: (facts) => facts.ticker ? { ticker: facts.ticker } : null,
subscribe: (params, { onData, onError, signal }) => {
const ws = new WebSocket(`wss://api.example.com/${params.ticker}`);
ws.onmessage = (e) => onData(JSON.parse(e.data));
ws.onerror = () => onError(new Error("Connection lost"));
signal.addEventListener("abort", () => ws.close());
return () => ws.close(); // cleanup function
},
});
How It Works
- When
keyreturns non-null, thesubscribefunction is called - Push data via
onData()– each call updates theResourceState - Report errors via
onError()– sets error state - The
signalfires when the key changes or the system stops - Return a cleanup function for resource teardown
Callbacks
Replace Data
subscribe: (params, { onData }) => {
ws.onmessage = (e) => onData(JSON.parse(e.data)); // replaces current data
};
Accumulate Data
subscribe: (params, { onData }) => {
ws.onmessage = (e) => {
onData((current) => [...(current || []), JSON.parse(e.data)]); // updater function
};
};
Server-Sent Events
const notifications = createSubscription({
name: "notifications",
key: () => ({ all: true }),
subscribe: (params, { onData, onError, signal }) => {
const source = new EventSource("/api/notifications");
source.onmessage = (e) => onData(JSON.parse(e.data));
source.onerror = () => onError(new Error("SSE connection lost"));
signal.addEventListener("abort", () => source.close());
return () => source.close();
},
});
AI Streaming
Subscriptions are ideal for streaming LLM responses:
const chat = createSubscription({
name: "chat",
key: (facts) => facts.prompt ? { prompt: facts.prompt } : null,
subscribe: (params, { onData, onError, signal }) => {
let fullResponse = "";
fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ prompt: params.prompt }),
signal,
}).then(async (res) => {
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
fullResponse += decoder.decode(value);
onData(fullResponse); // update on each chunk
}
}).catch((err) => {
if (!signal.aborted) onError(err);
});
},
});
With createQuerySystem
const app = createQuerySystem({
facts: { ticker: "" },
subscriptions: {
prices: {
key: (f) => f.ticker ? { ticker: f.ticker } : null,
subscribe: (params, { onData, onError, signal }) => {
const ws = new WebSocket(`wss://api.example.com/${params.ticker}`);
ws.onmessage = (e) => onData(JSON.parse(e.data));
ws.onerror = () => onError(new Error("Connection lost"));
signal.addEventListener("abort", () => ws.close());
return () => ws.close();
},
},
},
});
app.facts.ticker = "AAPL"; // subscription starts
app.read("prices"); // { data: { price: 150.25 }, isSuccess: true, ... }
app.subscriptions.prices.setData({ price: 200 }); // manual data push
With AI Single-Agent Orchestrator
Combine subscriptions with @directive-run/ai for streaming agent responses:
import { createQuerySystem } from "@directive-run/query";
import { createAgentOrchestrator } from "@directive-run/ai";
const app = createQuerySystem({
facts: { prompt: "" },
subscriptions: {
agent: {
key: (f) => f.prompt ? { prompt: f.prompt } : null,
subscribe: (params, { onData, onError, signal }) => {
const orchestrator = createAgentOrchestrator({ /* config */ });
orchestrator.stream(params.prompt, {
onToken: (token) => onData((prev) => (prev || "") + token),
onError: (err) => onError(err),
signal,
});
return () => orchestrator.abort();
},
},
},
});
app.facts.prompt = "Analyze this data..."; // agent starts streaming

