Skip to main content

Guides

4 min read

Guide: Goal-Directed Pipeline

Build a pipeline that declares what it needs, not how to run.

The goal pattern infers agent execution order from produces/requires declarations and drives toward a when() condition. Use it when you want the runtime to handle ordering, stall detection, and progress tracking automatically.


When to Use Goal vs DAG

DAGGoal
TopologyStatic — you wire edges with depsDynamic — inferred from produces/requires
CompletionAll nodes runwhen() condition is met
Stall handlingNoneProgressive relaxation
ProgressNode countSatisfaction score (0–1)

Use DAG when you know the exact execution order. Use Goal when you want the runtime to figure it out.


Step 1: Define Agents

import type { AgentLike, AgentRunner } from '@directive-run/ai';

const researcher: AgentLike = {
  name: 'researcher',
  instructions: 'Research the given topic. Return key findings as a structured summary.',
  model: 'gpt-4',
};

const analyst: AgentLike = {
  name: 'analyst',
  instructions: 'Analyze the research findings. Return insights and recommendations.',
  model: 'gpt-4',
};

const writer: AgentLike = {
  name: 'writer',
  instructions: 'Write a report from the analysis. Return the final report text.',
  model: 'gpt-4',
};

Step 2: Create the Orchestrator

import { createMultiAgentOrchestrator, goal } from '@directive-run/ai';

const orchestrator = createMultiAgentOrchestrator({
  runner,
  agents: {
    researcher: { agent: researcher },
    analyst: { agent: analyst },
    writer: { agent: writer },
  },
});

Step 3: Run the Goal

const result = await orchestrator.runGoal(
  // Node declarations: what each agent produces and requires
  {
    researcher: {
      agent: 'researcher',
      produces: ['findings'],
      extractOutput: (r) => ({ findings: r.output }),
    },
    analyst: {
      agent: 'analyst',
      produces: ['analysis'],
      requires: ['findings'],
      buildInput: (facts) => `Analyze these findings: ${facts.findings}`,
      extractOutput: (r) => ({ analysis: r.output }),
    },
    writer: {
      agent: 'writer',
      produces: ['report'],
      requires: ['analysis'],
      buildInput: (facts) => `Write a report from this analysis: ${facts.analysis}`,
      extractOutput: (r) => ({ report: r.output }),
    },
  },

  // Initial facts (seed input)
  { topic: 'AI safety trends in 2026' },

  // Goal condition — stop when this returns true
  (facts) => facts.report != null,

  // Options
  {
    maxSteps: 10,
    extract: (facts) => facts.report,
  },
);

console.log(result.output);

The runtime:

  1. Checks which nodes have their requires satisfied
  2. Runs ready nodes in parallel
  3. Merges extractOutput results into the fact pool
  4. Checks the when() goal condition
  5. Repeats until goal is met or maxSteps is reached

Step 4: Add Checkpointing

Save progress so you can resume after failures:

import { InMemoryCheckpointStore } from '@directive-run/ai';

const store = new InMemoryCheckpointStore({ maxCheckpoints: 20 });

const result = await orchestrator.runGoal(
  nodes,
  { topic: 'AI safety' },
  (facts) => facts.report != null,
  {
    maxSteps: 10,
    extract: (facts) => facts.report,
    checkpoint: { everyN: 1, store },
  },
);

Resume from a checkpoint:

const checkpoint = await store.load(checkpointId);
const state = JSON.parse(checkpoint.systemExport);
await orchestrator.resumeGoal(state, { nodes, when: goalCondition });

Step 5: Track Progress

import { getCheckpointProgress } from '@directive-run/ai';

const progress = getCheckpointProgress(checkpointState);
console.log(`${progress.percentage}% complete`);
console.log(`${progress.stepsCompleted} steps, ${progress.tokensConsumed} tokens`);

Named Goal Pattern

Register the goal as a reusable pattern:

const orchestrator = createMultiAgentOrchestrator({
  runner,
  agents: { researcher: { agent: researcher }, analyst: { agent: analyst }, writer: { agent: writer } },
  patterns: {
    report: goal(
      {
        researcher: { agent: 'researcher', produces: ['findings'], extractOutput: (r) => ({ findings: r.output }) },
        analyst: { agent: 'analyst', produces: ['analysis'], requires: ['findings'], extractOutput: (r) => ({ analysis: r.output }) },
        writer: { agent: 'writer', produces: ['report'], requires: ['analysis'], extractOutput: (r) => ({ report: r.output }) },
      },
      (facts) => facts.report != null,
      { maxSteps: 10, extract: (facts) => facts.report },
    ),
  },
});

const result = await orchestrator.runPattern('report', 'AI safety trends');

GoalNode Options

FieldTypeDescription
agentstringAgent ID from the registry
producesstring[]Fact keys this node writes
requiresstring[]Fact keys this node needs (must be produced by other nodes)
extractOutput(result) => Record<string, unknown>Map agent output to fact keys
buildInput(facts) => stringBuild agent input from current facts
allowRerunbooleanAllow the node to run again if goal is not yet met
maxRunsnumberMaximum times this node can execute

Next Steps

Previous
DAG Pipeline

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript