Skip to main content

Observability

7 min read

Pattern Checkpoints

Save execution state mid-pattern for fault tolerance, resume from failures, fork workflows, and track progress.

Pattern checkpoints capture the full internal state of any execution pattern at configurable intervals. If a process crashes, you resume exactly where you left off — no wasted tokens, no re-running completed agents.


Quick Start

Add a checkpoint config to any pattern:

import { createMultiAgentOrchestrator, sequential, InMemoryCheckpointStore } from '@directive-run/ai';

const store = new InMemoryCheckpointStore({ maxCheckpoints: 50 });

const orchestrator = createMultiAgentOrchestrator({
  runner,
  agents: {
    researcher: { agent: researcher },
    writer: { agent: writer },
    reviewer: { agent: reviewer },
  },
  checkpointStore: store,
});

// Checkpoint every 1 step
const result = await orchestrator.runPattern(
  'pipeline',
  'Write about AI safety',
);

// Or inline with checkpoint config:
const result2 = await orchestrator.runSequential(
  ['researcher', 'writer', 'reviewer'],
  'Write about AI safety',
  {
    checkpoint: { everyN: 1, store },
  },
);

Supported Patterns

All 6 multi-step patterns support checkpointing:

PatternCheckpoint granularityState captured
sequentialEvery N agentsCurrent step, accumulated results, current input
supervisorEvery N roundsRound number, supervisor output, worker results
reflectEvery N iterationsIteration count, history (scores, feedback), producer outputs
debateEvery N roundsRound number, proposals, judgements, tokens consumed
dagEvery N completed nodesPer-node statuses, outputs, errors, results
goalEvery N stepsFacts, completed nodes, failure counts, step metrics, relaxation state

Configuration

interface PatternCheckpointConfig {
  /** Save a checkpoint every N steps/rounds/iterations. @default 5 */
  everyN?: number;
  /** Checkpoint store. Uses the orchestrator's store if not provided. */
  store?: CheckpointStore;
  /** Label prefix for checkpoints. @default pattern type name */
  labelPrefix?: string;
  /** Conditional: only save when this returns true */
  when?: (context: CheckpointContext) => boolean;
}

Checkpoint Context

The when predicate receives a CheckpointContext:

interface CheckpointContext {
  step: number;            // Current step/round/iteration
  patternType: string;     // "sequential", "supervisor", "reflect", etc.
  facts?: Record<string, unknown>;  // Goal pattern only
  satisfaction?: number;            // Goal pattern only (0-1)
}

Per-Pattern Examples

Sequential

const result = await orchestrator.runSequential(
  ['researcher', 'writer', 'reviewer'],
  'Write about WASM',
  {
    checkpoint: { everyN: 1, labelPrefix: 'article-pipeline' },
  },
);

Captures: agent index, current input, results collected so far.

Supervisor

const result = await orchestrator.runSupervisor(
  'manager',
  ['researcher', 'writer'],
  'Research and write about AI',
  {
    maxRounds: 5,
    checkpoint: { everyN: 2 },
  },
);

Captures: round number, supervisor output, worker results, current input.

Reflect

const result = await orchestrator.runReflect(
  'writer',
  'evaluator',
  'Write a blog post',
  {
    maxIterations: 5,
    threshold: 0.8,
    checkpoint: { everyN: 1 },
  },
);

Captures: iteration, effective input, history (scores, feedback, durations), producer outputs.

Debate

const result = await orchestrator.runDebate(
  {
    agents: ['optimist', 'pessimist', 'realist'],
    evaluator: 'judge',
    maxRounds: 5,
    checkpoint: { everyN: 1 },
  },
  'Should we use microservices?',
);

Captures: round number, current input, proposals and judgements per round, tokens consumed.

DAG

const result = await orchestrator.runDag(
  {
    fetch: { agent: 'fetcher' },
    analyze: { agent: 'analyzer', deps: ['fetch'] },
    summarize: { agent: 'summarizer', deps: ['fetch'] },
    report: { agent: 'reporter', deps: ['analyze', 'summarize'] },
  },
  'Research AI safety',
  (context) => context.outputs.report,
  {
    checkpoint: { everyN: 2 },
  },
);

Captures: per-node statuses, outputs, errors, node results, completed count, original input.

Goal

const result = await orchestrator.runGoal(
  {
    fetcher: { agent: 'fetcher', produces: ['data'] },
    analyzer: { agent: 'analyzer', produces: ['analysis'], requires: ['data'] },
  },
  { query: 'market trends' },
  (facts) => facts.analysis != null,
  {
    checkpoint: {
      everyN: 3,
      labelPrefix: 'market-analysis',
      when: (context) => (context.satisfaction ?? 0) > 0.3,
    },
  },
);

Captures: facts snapshot, completed nodes, failure counts, node input hashes, execution order, step metrics, relaxation state, agent metrics.


Checkpoint Store

InMemoryCheckpointStore

Built-in in-memory store with configurable retention:

import { InMemoryCheckpointStore } from '@directive-run/ai';

const store = new InMemoryCheckpointStore({ maxCheckpoints: 50 });

await store.save(checkpoint);
const loaded = await store.load(checkpoint.id);
const all = await store.list();
await store.delete(checkpoint.id);
await store.clear();

Custom Store

Implement CheckpointStore for persistent backends (database, S3, Redis):

import type { CheckpointStore, Checkpoint } from '@directive-run/ai';

const store: CheckpointStore = {
  save: async (checkpoint) => {
    await db.insert('checkpoints', checkpoint);

    return checkpoint.id;
  },
  load: async (id) => {
    return await db.findOne('checkpoints', { id });
  },
  list: async () => {
    return await db.find('checkpoints', {}, {
      select: ['id', 'label', 'createdAt'],
    });
  },
  delete: async (id) => {
    const deleted = await db.delete('checkpoints', { id });

    return deleted > 0;
  },
  clear: async () => {
    await db.deleteAll('checkpoints');
  },
};

Resume

Each pattern has a dedicated resume* method that accepts the checkpoint state and the original pattern definition:

// Load checkpoint
const checkpoint = await store.load(checkpointId);
const state = JSON.parse(checkpoint.systemExport);

// Resume based on pattern type
switch (state.type) {
  case 'sequential':
    await orchestrator.resumeSequential(state, pattern);
    break;
  case 'supervisor':
    await orchestrator.resumeSupervisor(state, pattern);
    break;
  case 'reflect':
    await orchestrator.resumeReflect(state, pattern);
    break;
  case 'debate':
    await orchestrator.resumeDebate(state, pattern);
    break;
  case 'dag':
    await orchestrator.resumeDag(state, pattern);
    break;
  case 'goal':
    await orchestrator.resumeGoal(state, pattern);
    break;
}

Replay

replay() auto-detects the pattern type and resumes with optional input override:

const result = await orchestrator.replay(
  checkpointId,
  pattern,
  { input: 'Modified input for this replay' },
);

This loads the checkpoint from the orchestrator's store, restores state, and continues execution from that point.


Fork

Create an independent orchestrator from a checkpoint to explore alternative execution paths:

import { forkFromCheckpoint } from '@directive-run/ai';

// Fork from a saved checkpoint
const forked = await forkFromCheckpoint(
  orchestratorOptions,
  store,
  'ckpt_abc123',
);

// The forked orchestrator is fully independent — changes don't affect the original
const result = await forked.runGoal(nodes, newInput, when);

The forked orchestrator receives a deep clone of the checkpoint state, so mutations in the fork never affect the original.


Progress

Compute progress from any checkpoint state:

import { getCheckpointProgress } from '@directive-run/ai';

const progress = getCheckpointProgress(checkpointState);

console.log(progress.percentage);              // 0-100
console.log(progress.stepsCompleted);          // Steps/rounds/iterations completed
console.log(progress.stepsTotal);              // Total expected (null for unbounded)
console.log(progress.tokensConsumed);          // Tokens used so far
console.log(progress.estimatedTokensRemaining); // Estimated remaining (null when unknowable)
console.log(progress.estimatedStepsRemaining);  // Estimated steps left (null when unknowable)

Progress UI Example

function renderProgress(state: PatternCheckpointState) {
  const progress = getCheckpointProgress(state);

  return `
    <div class="progress-bar" style="width: ${progress.percentage}%"></div>
    <span>${progress.stepsCompleted} / ${progress.stepsTotal ?? '?'} steps</span>
    <span>${progress.tokensConsumed.toLocaleString()} tokens</span>
  `;
}

getPatternStep

Get the current step number from any checkpoint state:

import { getPatternStep } from '@directive-run/ai';

const step = getPatternStep(checkpointState);
// Returns: step (sequential/goal), round (supervisor/debate),
//          iteration (reflect), completedCount (dag)

Diff

Compare two checkpoint states to see what changed between them:

import { diffCheckpoints } from '@directive-run/ai';

const diff = diffCheckpoints(checkpointA, checkpointB);

console.log(diff.patternType);    // Pattern type
console.log(diff.stepDelta);      // Steps advanced
console.log(diff.tokensDelta);    // Tokens consumed between checkpoints

// Goal-specific: fact changes
if (diff.facts) {
  console.log(diff.facts.added);   // New fact keys
  console.log(diff.facts.removed); // Removed fact keys
  console.log(diff.facts.changed); // Changed values: [{ key, before, after }]
}

// DAG/goal: nodes completed between checkpoints
if (diff.nodesCompleted) {
  console.log(diff.nodesCompleted); // Node IDs completed between A and B
}

Both checkpoints must be from the same pattern type — diffCheckpoints throws if types differ.


Conditional Checkpointing

Use the when predicate to save checkpoints only when conditions are met:

// Only checkpoint when satisfaction exceeds 50%
checkpoint: {
  everyN: 1,
  when: (context) => (context.satisfaction ?? 0) > 0.5,
}

// Only checkpoint during specific steps
checkpoint: {
  everyN: 1,
  when: (context) => context.step > 2,
}

// Combine with everyN: check every 3 steps, but only save if condition met
checkpoint: {
  everyN: 3,
  when: (context) => context.patternType === 'goal',
}

Lifecycle Hooks

The multi-agent orchestrator fires hooks when checkpoints are saved or fail:

const orchestrator = createMultiAgentOrchestrator({
  runner,
  agents: { /* ... */ },
  hooks: {
    onCheckpointSave: ({ checkpointId, patternType, step, timestamp }) => {
      console.log(`Checkpoint saved: ${checkpointId} at step ${step}`);
    },
    onCheckpointError: ({ patternType, step, error, timestamp }) => {
      console.error(`Checkpoint failed at step ${step}:`, error);
    },
  },
});

Timeline Events

Checkpoints emit timeline events for debugging:

EventFields
checkpoint_savecheckpointId, patternType, step
checkpoint_restorecheckpointId, patternType, step
const saves = timeline.getEventsByType('checkpoint_save');
const restores = timeline.getEventsByType('checkpoint_restore');

API Reference

Functions

FunctionDescription
getPatternStep(state)Get current step/round/iteration from any checkpoint state
getCheckpointProgress(state)Compute progress (percentage, tokens, estimates)
diffCheckpoints(a, b)Compare two checkpoint states
forkFromCheckpoint(options, store, id)Create independent orchestrator from checkpoint

Resume Methods (on MultiAgentOrchestrator)

MethodPattern
resumeSequential(state, pattern)Sequential
resumeSupervisor(state, pattern, options?)Supervisor
resumeReflect(state, pattern, options?)Reflect
resumeDebate(state, pattern)Debate
resumeDag(state, pattern, options?)DAG
resumeGoal(state, pattern)Goal
replay(checkpointId, pattern, options?)Auto-detect

Types

TypeDescription
PatternCheckpointConfigCheckpoint configuration
CheckpointContextContext passed to when predicate
PatternCheckpointStateUnion of all per-pattern states
SequentialCheckpointStateSequential checkpoint
SupervisorCheckpointStateSupervisor checkpoint
ReflectCheckpointStateReflect checkpoint
DebateCheckpointStateDebate checkpoint
DagCheckpointStateDAG checkpoint
GoalCheckpointStateGoal checkpoint
CheckpointProgressProgress computed from state
CheckpointDiffDiff between two states
CheckpointStoreStore interface
InMemoryCheckpointStoreBuilt-in in-memory store

Next Steps

Previous
Debug Timeline

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript