Skip to main content

Multi-Agent Orchestrator

5 min read

Communication

Message bus, agent network, and structured communication patterns for decentralized agent coordination.

For centralized orchestration, use Execution Patterns. For decentralized coordination where agents communicate directly, use the message bus and agent network.


Message Bus

The low-level pub/sub transport for agent-to-agent messaging:

import { createMessageBus } from '@directive-run/ai';
import type { MessageBus, TypedAgentMessage } from '@directive-run/ai';

const bus = createMessageBus({
  maxHistory: 1000,
  defaultTtlMs: 3600000,       // 1 hour message TTL
  maxPendingPerAgent: 100,
  onDelivery: (message, recipients) => {
    console.log(`Delivered ${message.type} to ${recipients.join(', ')}`);
  },
  onDeliveryError: (message, error) => {
    console.error(`Failed to deliver ${message.id}:`, error);
  },
});

Publishing and Subscribing

// Subscribe with filters
const sub = bus.subscribe('writer', (message) => {
  console.log(`Writer received: ${message.type} from ${message.from}`);
}, {
  types: ['DELEGATION', 'REQUEST'],
  from: ['researcher'],
  priority: ['high', 'urgent'],
});

// Publish a message
const messageId = bus.publish({
  type: 'DELEGATION',
  from: 'researcher',
  to: 'writer',
  task: 'Write a summary',
  context: { data: '...' },
  priority: 'high',
});

// Query history
const history = bus.getHistory({ types: ['DELEGATION'] }, 50);
const specific = bus.getMessage(messageId);
const pending = bus.getPending('offline-agent');

// Cleanup
sub.unsubscribe();
bus.dispose();

Configuration

OptionTypeDefaultDescription
maxHistorynumberMessages to retain in history
defaultTtlMsnumberDefault message time-to-live (ms)
maxPendingPerAgentnumberQueue cap for offline agents
persistenceMessagePersistenceStorage backend for durability
onDelivery(message, recipients) => voidDelivery confirmation callback
onDeliveryError(message, error) => voidDelivery error callback

Message Queuing

When a recipient has no active subscription, messages are queued (up to maxPendingPerAgent). Queued messages are delivered immediately when the agent subscribes. Expired messages (past ttlMs) are skipped during delivery.

Persistence

Plug in your own persistence layer to survive restarts:

const bus = createMessageBus({
  persistence: {
    save: async (message) => { await db.insert('messages', message); },
    load: async (agentId, since) => { return db.query('messages', { to: agentId, after: since }); },
    delete: async (messageId) => { await db.delete('messages', messageId); },
    clear: async (agentId) => { await db.deleteAll('messages', agentId); },
  },
});

Message Types

Every message has id, from, to (single agent, array, or "*" for broadcast), timestamp, optional correlationId for request-response matching, optional priority, and optional ttlMs.

TypeDescription
REQUESTAsk another agent to perform an action
RESPONSEReply to a request
DELEGATIONDelegate a task with context and constraints
DELEGATION_RESULTResult of a delegated task with metrics
QUERYAsk for information
INFORMShare information (fire-and-forget)
SUBSCRIBESubscribe to topic updates
UNSUBSCRIBEUnsubscribe from topics
UPDATEPush update to subscribers
ACK / NACKAcknowledgment / rejection
PING / PONGHealth check
CUSTOMCustom message type

Agent Network

Higher-level coordination built on the message bus with structured request-response, delegation, and capability-based discovery:

import { createAgentNetwork, createMessageBus } from '@directive-run/ai';
import type { AgentNetwork } from '@directive-run/ai';

const network = createAgentNetwork({
  bus: createMessageBus(),
  agents: {
    researcher: { capabilities: ['search', 'summarize'] },
    writer: { capabilities: ['draft', 'edit'] },
    reviewer: { capabilities: ['review', 'approve'] },
  },
  defaultTimeout: 30000,
  onAgentOnline: (agentId) => console.log(`${agentId} connected`),
  onAgentOffline: (agentId) => console.log(`${agentId} disconnected`),
});

Request-Response

const answer = await network.request(
  'writer', 'reviewer',
  'check-accuracy',
  { paragraph: 'WebAssembly compiles to...' },
  15000  // timeout
);
console.log(answer.success, answer.result);

Delegation

const result = await network.delegate(
  'researcher', 'writer',
  'Write an article about AI safety',
  { research: findingsData }
);
console.log(result.success, result.metrics?.durationMs);

Query

const info = await network.query(
  'writer', 'reviewer',
  'Is this paragraph technically accurate?',
  { text: '...' }
);

Fire-and-Forget

network.send('researcher', 'writer', {
  type: 'INFORM',
  topic: 'research-complete',
  content: { documentId: 'doc-123' },
});

Broadcast

network.broadcast('system', {
  type: 'INFORM',
  topic: 'shutdown',
  content: { reason: 'maintenance' },
});

Capability Discovery

const writers = network.findByCapability('draft');
console.log(writers.map((a) => a.id));

Dynamic Registration

network.register('editor', { capabilities: ['proofread', 'format'] });
network.unregister('editor');

Network API

MethodReturnsDescription
register(id, info)voidRegister an agent
unregister(id)voidRemove an agent
getAgent(id)AgentInfoGet agent info
getAgents()AgentInfo[]List all agents
findByCapability(cap)AgentInfo[]Find by capability
send(from, to, msg)stringFire-and-forget message
request(from, to, action, payload, timeout?)Promise<ResponseMessage>Request-response
delegate(from, to, task, context)Promise<DelegationResultMessage>Delegation with metrics
query(from, to, question, context?)Promise<ResponseMessage>Query shorthand
broadcast(from, msg)stringBroadcast to all
listen(agentId, handler, filter?)SubscriptionListen for messages
getBus()MessageBusAccess underlying bus
dispose()voidCleanup

Communication Patterns

Three pre-built patterns for common coordination strategies.

Responder

Auto-handles incoming REQUEST messages and sends back RESPONSE:

import { createResponder } from '@directive-run/ai';

const responder = createResponder(network, 'writer');

responder.onRequest('draft', async (payload) => {
  const draft = await generateDraft(payload.topic as string);

  return { success: true, result: draft };
});

responder.onRequest('edit', async (payload) => {
  const edited = await editDocument(payload.content as string);

  return { success: true, result: edited };
});

responder.offRequest('edit');
responder.dispose();

Delegator

Auto-handles incoming DELEGATION messages and sends back DELEGATION_RESULT with metrics:

import { createDelegator } from '@directive-run/ai';

const delegator = createDelegator(network, 'writer');

delegator.onDelegation(async (task, context) => {
  const start = Date.now();
  const result = await executeTask(task, context);

  return {
    success: true,
    result,
    metrics: {
      durationMs: Date.now() - start,
      tokensUsed: 500,
      cost: 0.003,
    },
  };
});

delegator.offDelegation();
delegator.dispose();

Pub/Sub

Topic-based publish/subscribe using SUBSCRIBE and UPDATE messages:

import { createPubSub } from '@directive-run/ai';

const pubsub = createPubSub(network, 'analyst');

const unsub = pubsub.subscribe(
  ['market-updates', 'alerts'],
  (topic, content) => {
    console.log(`[${topic}]`, content);
  }
);

pubsub.publish('market-updates', { price: 100, change: 5 });

unsub();
pubsub.dispose();

Handoffs

Transfer work between agents in a multi-agent orchestrator with tracking:

const research = await orchestrator.runAgent('researcher', 'What is Directive?');

const draft = await orchestrator.handoff(
  'researcher', 'writer',
  `Write an article based on this:\n\n${research.output}`,
  { sourceTokens: research.totalTokens }
);

const review = await orchestrator.handoff(
  'writer', 'reviewer',
  `Review this article:\n\n${draft.output}`
);

Each handoff gets a unique ID and fires onHandoff / onHandoffComplete hooks.

const pending = orchestrator.getPendingHandoffs();

Configuration

OptionTypeDefaultDescription
onHandoff(request: HandoffRequest) => voidCalled when a handoff starts
onHandoffComplete(result: HandoffResult) => voidCalled when a handoff finishes
maxHandoffHistorynumber1000Max completed handoff results to retain

Types

interface HandoffRequest {
  id: string;
  fromAgent: string;
  toAgent: string;
  input: string;
  context?: Record<string, unknown>;
  requestedAt: number;
}

interface HandoffResult {
  request: HandoffRequest;
  result: RunResult<unknown>;
  completedAt: number;
}

Next Steps

Previous
Execution Patterns

We care about your data. We'll never share your email.

Powered by Directive. This signup uses a Directive module with facts, derivations, constraints, and resolvers – zero useState, zero useEffect. Read how it works

Directive - Constraint-Driven State Management for TypeScript