Guides
•4 min read
DAG Pipeline
Build workflows where agents run in parallel, depend on upstream results, and branch conditionally.
The Problem
Your pipeline isn't linear. A researcher and a fact-checker can run in parallel. A writer depends on both of their outputs. If the fact-checker fails, you want to skip the writer but still get the research. Sequential pipelines waste time; manual Promise orchestration is brittle.
The Solution
Use the dag() pattern to define nodes with dependencies, conditional edges, and error strategies:
import { dag } from '@directive-run/ai';
const contentPipeline = dag(
{
researcher: {
agent: 'researcher',
},
factChecker: {
agent: 'fact-checker',
},
writer: {
agent: 'writer',
deps: ['researcher', 'factChecker'],
transform: (context) => {
const research = context.outputs.researcher;
const facts = context.outputs.factChecker;
return `Research:\n${research}\n\nVerified facts:\n${facts}`;
},
},
editor: {
agent: 'editor',
deps: ['writer'],
},
},
// Merge function: combine all outputs into a final result
(context) => ({
research: context.outputs.researcher,
facts: context.outputs.factChecker,
draft: context.outputs.writer,
final: context.outputs.editor,
}),
{
onNodeError: 'skip-downstream',
maxConcurrent: 3,
},
);
// Run with: orchestrator.runPattern('contentPipeline', input) — see Full Example below
How It Works
┌────────────┐ ┌──────────────┐
│ researcher │ │ factChecker │
└──────┬─────┘ └──────┬───────┘
└────────┬───────┘
▼
┌────────────┐
│ writer │
└──────┬─────┘
▼
┌────────────┐
│ editor │
└──────┬─────┘
╎ conditional
▼
┌────────────┐
│ seo │
└────────────┘
- Nodes define agents and their dependencies. Nodes with no
depsrun immediately. Nodes withdepswait for all dependencies to complete. transformshapes the input for a node based on upstream outputs. Thecontextobject hasoutputs,statuses,errors, and the originalinput.depscreates edges in the DAG.writerwaits for bothresearcherandfactChecker.onNodeErrorcontrols failure behavior:"fail"— abort the entire DAG on any node failure"skip-downstream"— skip nodes that depend on the failed node, but run everything else"continue"— run everything, passingundefinedfor failed upstream outputs
maxConcurrentlimits how many nodes run simultaneously.whenadds conditional edges — a node only runs if the condition is true.
Full Example
A content pipeline with conditional review and timeout handling:
import { createMultiAgentOrchestrator, dag } from '@directive-run/ai';
const orchestrator = createMultiAgentOrchestrator({
runner, // See Running Agents (/ai/running-agents) for setup
agents: {
researcher: {
agent: { name: 'researcher', instructions: 'Research the topic. Return structured findings.' },
},
'fact-checker': {
agent: { name: 'fact-checker', instructions: 'Verify claims. Return confirmed and unconfirmed facts.' },
},
writer: {
agent: { name: 'writer', instructions: 'Write a blog post from research and verified facts.' },
},
editor: {
agent: { name: 'editor', instructions: 'Edit for clarity and grammar. Return final version.' },
},
'seo-optimizer': {
agent: { name: 'seo-optimizer', instructions: 'Optimize the final post for SEO. Add meta description and keywords.' },
},
},
patterns: {
contentPipeline: dag(
{
researcher: {
agent: 'researcher',
timeout: 30000,
},
factChecker: {
agent: 'fact-checker',
timeout: 20000,
},
writer: {
agent: 'writer',
deps: ['researcher', 'factChecker'],
transform: (context) => {
const research = context.outputs.researcher;
const facts = context.outputs.factChecker;
return `Research:\n${research}\n\nFacts:\n${facts}`;
},
},
editor: {
agent: 'editor',
deps: ['writer'],
},
seo: {
agent: 'seo-optimizer',
deps: ['editor'],
// Only run SEO if the input requested it
when: (context) => context.input.includes('[SEO]'),
},
},
(context) => ({
research: context.outputs.researcher,
facts: context.outputs.factChecker,
draft: context.outputs.writer,
final: context.outputs.editor ?? context.outputs.writer,
seo: context.outputs.seo,
statuses: context.statuses,
}),
{
onNodeError: 'skip-downstream',
maxConcurrent: 2,
timeout: 120000,
},
),
},
});
// Run the pipeline
const result = await orchestrator.runPattern(
'contentPipeline',
'[SEO] Write about quantum computing breakthroughs in 2025'
);
console.log('Statuses:', result.statuses);
// { researcher: 'completed', factChecker: 'completed', writer: 'completed', editor: 'completed', seo: 'completed' }
// If fact-checker failed with skip-downstream:
// { researcher: 'completed', factChecker: 'failed', writer: 'skipped', editor: 'skipped', seo: 'skipped' }
Related
- Execution Patterns — DAG, reflect, and other patterns reference
- Multi-Agent Orchestrator — orchestrator configuration
- Multi-Step Pipeline guide — simpler sequential pipelines
- Self-Improving Agents guide — iterative refinement with reflect

