Multi-agent systems let you break complex tasks into specialized sub-tasks, each handled by a focused AI agent. This guide covers the architecture patterns and implementation for building reliable multi-agent systems with Claude.


When Multi-Agent Makes Sense

Use multi-agent systems when:

  • Long tasks exceed a single context window
  • Parallelization is possible and would speed things up
  • Specialized roles benefit from dedicated context/instructions
  • Verification requires an independent agent checking work

A single agent with a good system prompt often suffices. Add multi-agent complexity only when the benefits outweigh the coordination overhead.


Core Architecture Patterns

Orchestrator-Worker Pattern

The most common pattern: an orchestrator agent plans and delegates; worker agents execute specific tasks.

import anthropic
from typing import Callable

client = anthropic.Anthropic()

def create_agent(name: str, system_prompt: str) -> Callable:
    """Factory for creating specialized agents."""
    def agent(task: str, context: str = "") -> str:
        messages = [{"role": "user", "content": f"{context}\n\nTask: {task}" if context else f"Task: {task}"}]
        
        response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4096,
            system=system_prompt,
            messages=messages
        )
        return response.content[0].text
    
    agent.__name__ = name
    return agent

# Create specialized agents
researcher = create_agent(
    "researcher",
    "You are a research specialist. Find and synthesize information on given topics. Be thorough and cite sources when possible."
)

writer = create_agent(
    "writer",
    "You are a professional writer. Transform research and outlines into polished, engaging content."
)

editor = create_agent(
    "editor",
    "You are an editor. Review content for clarity, accuracy, and quality. Provide specific improvements."
)

orchestrator = create_agent(
    "orchestrator",
    """You are a project orchestrator. Given a goal, create a step-by-step plan.
    Available agents: researcher, writer, editor.
    Output your plan as JSON with format:
    {"steps": [{"agent": "name", "task": "description", "depends_on": []}]}"""
)

Pipeline Pattern

Linear chain of agents, each building on the previous output:

def run_pipeline(initial_input: str, stages: list[tuple[str, Callable]]) -> dict:
    """Run a sequential pipeline of agents."""
    results = {}
    current_input = initial_input
    
    for stage_name, agent in stages:
        print(f"Running stage: {stage_name}")
        result = agent(current_input)
        results[stage_name] = result
        current_input = result
    
    return results

# Example: blog post pipeline
results = run_pipeline(
    "Write about the impact of AI on software development in 2026",
    [
        ("research", researcher),
        ("outline", lambda x: orchestrator(f"Create detailed outline based on: {x}")),
        ("draft", writer),
        ("edit", editor),
    ]
)

Tool-Using Agents

Agents become powerful when they can use tools:

def research_agent_with_tools(query: str) -> str:
    """Agent that can search and read web pages."""
    
    tools = [
        {
            "name": "web_search",
            "description": "Search the web for current information",
            "input_schema": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"}
                },
                "required": ["query"]
            }
        },
        {
            "name": "read_url",
            "description": "Read the content of a URL",
            "input_schema": {
                "type": "object",
                "properties": {
                    "url": {"type": "string"}
                },
                "required": ["url"]
            }
        }
    ]
    
    messages = [{"role": "user", "content": query}]
    
    while True:
        response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4096,
            system="You are a research agent. Use tools to find accurate, current information.",
            tools=tools,
            messages=messages
        )
        
        if response.stop_reason == "end_turn":
            return response.content[0].text
        
        # Handle tool calls
        messages.append({"role": "assistant", "content": response.content})
        tool_results = []
        
        for block in response.content:
            if block.type == "tool_use":
                # Execute the tool
                result = execute_tool(block.name, block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result
                })
        
        messages.append({"role": "user", "content": tool_results})

def execute_tool(name: str, inputs: dict) -> str:
    """Execute tool calls — replace with real implementations."""
    if name == "web_search":
        # Replace with actual search implementation
        return f"Search results for: {inputs['query']}"
    elif name == "read_url":
        # Replace with actual URL reading
        return f"Content from: {inputs['url']}"
    return "Tool not found"

State Management

Multi-agent systems need shared state:

from dataclasses import dataclass, field
from typing import Any
import json

@dataclass
class AgentState:
    """Shared state across agents."""
    goal: str
    completed_tasks: list[str] = field(default_factory=list)
    artifacts: dict[str, Any] = field(default_factory=dict)
    messages: list[dict] = field(default_factory=list)
    
    def add_artifact(self, key: str, value: Any):
        self.artifacts[key] = value
        self.completed_tasks.append(key)
    
    def get_context_summary(self) -> str:
        return f"""
Goal: {self.goal}
Completed: {', '.join(self.completed_tasks)}
Available artifacts: {list(self.artifacts.keys())}
"""

class MultiAgentOrchestrator:
    def __init__(self, goal: str):
        self.state = AgentState(goal=goal)
        self.agents = {}
    
    def register_agent(self, name: str, agent: Callable):
        self.agents[name] = agent
    
    def run_agent(self, agent_name: str, task: str) -> str:
        agent = self.agents[agent_name]
        context = self.state.get_context_summary()
        
        # Include relevant artifacts in context
        relevant_artifacts = "\n".join([
            f"{k}: {str(v)[:500]}"
            for k, v in self.state.artifacts.items()
        ])
        
        full_context = f"{context}\n\nPrevious work:\n{relevant_artifacts}"
        result = agent(task, full_context)
        
        self.state.add_artifact(f"{agent_name}_{task[:20]}", result)
        return result

Parallel Agent Execution

Run independent tasks concurrently:

import asyncio
import anthropic

async_client = anthropic.AsyncAnthropic()

async def run_agent_async(name: str, system: str, task: str) -> tuple[str, str]:
    """Run a single agent asynchronously."""
    response = await async_client.messages.create(
        model="claude-3-haiku-20240307",  # Use cheaper model for parallel workers
        max_tokens=2048,
        system=system,
        messages=[{"role": "user", "content": task}]
    )
    return name, response.content[0].text

async def run_parallel_research(topics: list[str]) -> dict[str, str]:
    """Research multiple topics simultaneously."""
    tasks = [
        run_agent_async(
            topic,
            "You are a research specialist. Be concise and factual.",
            f"Research and summarize: {topic}"
        )
        for topic in topics
    ]
    
    results = await asyncio.gather(*tasks)
    return dict(results)

# Usage
async def main():
    topics = ["AI in healthcare", "AI in finance", "AI in education"]
    results = await run_parallel_research(topics)
    
    for topic, research in results.items():
        print(f"\n=== {topic} ===")
        print(research[:200])

asyncio.run(main())

Error Handling and Retry Logic

import time
from anthropic import RateLimitError, APIError

def resilient_agent_call(agent_fn: Callable, task: str, max_retries: int = 3) -> str:
    """Call an agent with retry logic."""
    for attempt in range(max_retries):
        try:
            return agent_fn(task)
        except RateLimitError:
            wait = 2 ** attempt
            print(f"Rate limited. Waiting {wait}s...")
            time.sleep(wait)
        except APIError as e:
            if attempt == max_retries - 1:
                raise
            print(f"API error: {e}. Retrying...")
            time.sleep(1)
    
    raise Exception(f"Agent failed after {max_retries} attempts")

Production Considerations

Token costs: Each agent call has its own cost. Use cheaper models (Haiku) for worker agents and more capable models (Sonnet) for orchestrators.

Latency: Sequential agents compound latency. Parallelize where possible.

Context contamination: Each agent should get only the context it needs. Avoid passing everything to every agent.

Logging: Log all agent calls, inputs, outputs, and token usage. You need observability to debug multi-agent systems.

Timeouts: Set explicit timeouts. Hanging agents will block your entire pipeline.


Model Selection for Multi-Agent

RoleModelReason
Orchestratorclaude-3-5-sonnetComplex planning needs best model
Research workersclaude-3-haikuFast, cheap, good enough for factual tasks
Writing workersclaude-3-5-sonnetWriting quality matters
Verification agentsclaude-3-5-sonnetAccuracy is critical
Classification/routingclaude-3-haikuSimple decision, maximize speed

Using the right model per agent tier significantly reduces cost without sacrificing overall quality.