- הבנה מעמיקה של 5 דפוסי תזמור -- Sequential, Parallel, Supervisor, Swarm, Debate -- ומתי להשתמש בכל אחד
- מימוש עובד של כל 5 הדפוסים ב-Python ו-TypeScript, עם השוואת תוצאות ועלויות
- הבנה של 4 דפוסי תקשורת בין סוכנים -- Shared State, Message Passing, Blackboard, Event-Driven
- מערכת עובדת עם Shared State Management -- conflict resolution, locking, state schema
- מימוש Delegation ו-Escalation -- ניתוב חכם של משימות בין סוכנים מתמחים
- ידע מעשי ב-multi-agent ב-6 SDKs -- Claude Agent SDK, Vercel AI SDK, OpenAI SDK, Google ADK, LangGraph, CrewAI
- Multi-Agent Cost Estimator -- חיזוי עלויות לפני הרצה
- Deliverable סופי: מערכת multi-agent מלאה עם 4+ סוכנים, shared state, delegation, tracing
- תוכלו לנתח משימה ולהחליט האם צריך סוכן אחד או מערכת multi-agent -- ולבחור את דפוס התזמור הנכון
- תוכלו לבנות מערכת multi-agent עם כל אחד מ-5 דפוסי התזמור -- Sequential, Parallel, Supervisor, Swarm, Debate
- תוכלו לתכנן תקשורת בין סוכנים -- shared state, message passing, או event-driven -- ולנהל conflicts
- תוכלו ליישם delegation ו-escalation patterns שמנתבים משימות לסוכן המתאים ביותר
- תוכלו לעקוב ולתקן באגים במערכות multi-agent עם distributed tracing, correlation IDs, וויזואליזציה
- פרקים קודמים: פרק 1-2 (מה זה סוכן AI וארכיטקטורה), פרקים 5-11 (SDKs ו-tool use), פרק 12 (Memory & State -- multi-agent בנוי על זיכרון משותף)
- מה תצטרכו: Python 3.11+ ו/או Node.js 18+, מפתח API (Anthropic / OpenAI / Google), עורך קוד
- ידע נדרש: Python או TypeScript ברמה בינונית, הכרת agent loops ו-tool calling, הבנה בסיסית של memory patterns
- זמן משוער: 4-5 שעות (כולל תרגילים)
- עלות API משוערת: $10-20 (ריבוי סוכנים = ריבוי קריאות API)
בפרק 12 הוספתם זיכרון לסוכן שלכם -- conversation memory, RAG, ו-persistent state. עכשיו אתם משדרגים מסוכן בודד לצוות של סוכנים. בפרק הזה תפרקו את המשימות של הפרויקט שלכם לתת-משימות, תקצו סוכן מתמחה לכל אחת, ותבנו orchestrator שמתאם ביניהם. הזיכרון מפרק 12 ישמש כ-shared context שמאפשר לסוכנים לשתף מידע. בפרק 14 תוסיפו Human-in-the-Loop -- בקרת אדם על ההחלטות שהסוכנים מקבלים.
| מונח (English) | עברית | הסבר |
|---|---|---|
| Multi-Agent System (MAS) | מערכת רב-סוכנית | מערכת שבה מספר סוכני AI עובדים יחד כדי לפתור משימה מורכבת. כל סוכן מתמחה בתפקיד ספציפי |
| Orchestrator | מתזמר | סוכן (או קוד) שמנהל את העבודה של סוכנים אחרים -- מחלק משימות, אוסף תוצאות, מתאם |
| Sequential Pipeline | צינור רציף | דפוס שבו סוכן A מעביר פלט לסוכן B, שמעביר לסוכן C. כמו פס ייצור. הפשוט ביותר |
| Parallel Fan-Out | פיזור מקבילי | דפוס שבו orchestrator שולח משימות לכמה סוכנים במקביל ואוסף את כל התוצאות |
| Supervisor Pattern | דפוס מפקח | סוכן-מנהל שמחליט דינמית איזה worker agent מטפל בכל משימה, מבוסס על תוכן הבקשה |
| Swarm / Handoff | נחיל / העברה | סוכנים מעבירים שליטה ביניהם ללא orchestrator מרכזי. דצנטרלי. דומה לניתוב שירות לקוחות |
| Debate / Adversarial | דיון / יריבות | סוכנים טוענים עמדות שונות, מבקרים זה את זה, ומייצרים פלט טוב יותר דרך ויכוח מובנה |
| Shared State | מצב משותף | אובייקט state שכל הסוכנים קוראים וכותבים אליו. פשוט אבל דורש ניהול conflicts |
| Message Passing | העברת הודעות | סוכנים שולחים הודעות מובנות זה לזה. Decoupled אבל מורכב יותר. דומה למיקרו-שירותים |
| Delegation | האצלה | סוכן A מבקש מסוכן B לטפל בתת-משימה ומחכה לתוצאה. דומה למנהל שמאציל לעובד |
| Escalation | הסלמה | סוכן מזהה שהוא לא יכול לטפל במשימה ומעביר אותה לסוכן יותר מסוגל (או לאדם) |
| Correlation ID | מזהה מעקב | מזהה ייחודי שעובר בין כל הסוכנים ב-run אחד, מאפשר לעקוב ולקשר לוגים |
| Handoff | העברת שליטה | רגע שבו סוכן אחד מעביר את השליטה לסוכן אחר, כולל context רלוונטי |
| Triage Agent | סוכן מיון | סוכן שתפקידו לנתח את הבקשה ולנתב אותה לסוכן המתמחה הנכון. עלות נמוכה, דיוק גבוה |
| Worker Agent | סוכן עובד | סוכן שמתמחה בתחום ספציפי ומבצע משימות שהוקצו לו ע"י ה-orchestrator או supervisor |
| Blackboard Pattern | דפוס לוח | knowledge base משותף שסוכנים קוראים ממנו וכותבים אליו. גמיש אך דורש תיאום |
למה Multi-Agent -- ומתי סוכן אחד מספיק
חשבו על המשימה הבאה: לקוח שולח מייל בעברית עם שאלה על החשבון שלו. המערכת צריכה לתרגם את המייל (אם צריך), לנתב אותו למחלקה הנכונה, לבדוק את פרטי החשבון ב-database, לנסח תשובה מקצועית, לוודא שהתשובה מדויקת, ולשלוח אותה.
סוכן בודד יכול לעשות את כל זה -- עם prompt ארוך, הרבה tools, ו-context window מלא. אבל יש בעיה: ככל שמוסיפים אחריויות לסוכן אחד, הביצועים יורדים. ה-prompt ארוך מדי, ה-tools רבים מדי, והמודל מתבלבל.
Multi-Agent Systems פותרים את זה בדיוק כמו שאנחנו פותרים בעיות מורכבות בעולם האמיתי: צוות של מומחים, כל אחד מתמחה במשהו אחד.
מתי ללכת על Multi-Agent?
המבחן הפשוט: "האם הייתם שוכרים אדם אחד או צוות לזה?"
| סימן | משמעות | דוגמה |
|---|---|---|
| יותר מ-5 tools | סוכן אחד עם יותר מ-5 tools מתחיל לטעות בבחירת tool | סוכן עם 12 tools לשירות לקוחות |
| כישורים שונים | המשימה דורשת "מומחים" שונים | כתיבה + עריכה + SEO + עיצוב |
| שלבים עצמאיים | אפשר לעשות חלקים במקביל | חיפוש ב-5 מקורות שונים |
| בקרת איכות | צריך "עין נוספת" לבדוק את הפלט | כותב + מבקר + fact-checker |
| Models שונים | חלקים מסוימים צריכים model חזק, אחרים זול | Haiku לניתוב, Opus לייצור |
Framework: "The Multi-Agent Decision Matrix"
לפני שקופצים ל-multi-agent, בדקו 10 use cases נפוצים -- רק חלקם דורשים מערכת רב-סוכנית:
| # | Use Case | Single Agent | Multi-Agent | המלצה |
|---|---|---|---|---|
| 1 | Chatbot פשוט לשאלות ותשובות | מספיק | Overkill | Single |
| 2 | RAG על מסמכי חברה | מספיק | אפשרי | Single |
| 3 | שירות לקוחות עם 8 מחלקות | מתקשה | טבעי | Multi (triage + specialists) |
| 4 | ניתוח קוד (lint + review + test) | מתקשה | טבעי | Multi (pipeline) |
| 5 | כתיבת תוכן (research + write + edit) | מספיק | עדיף | Multi (pipeline) |
| 6 | חיפוש ב-10 מקורות במקביל | איטי | מהיר | Multi (fan-out) |
| 7 | סיכום מסמך בודד | מספיק | Overkill | Single |
| 8 | ניתוח פיננסי (טכני + בסיסי + דוח) | מתקשה | טבעי | Multi (supervisor) |
| 9 | תרגום טקסט קצר | מספיק | Overkill | Single |
| 10 | פיתוח תוכנה (PM + arch + dev + review) | לא מספיק | הכרחי | Multi (supervisor) |
הכלל: התחילו תמיד עם סוכן אחד. עברו ל-multi-agent רק כשהסוכן הבודד מגיע לגבולות שלו -- ביצועים ירודים, prompts ארוכים מדי, או צורך בהתמחויות שונות.
חשבו על הפרויקט שאתם בונים לאורך הקורס. רשמו 3 תת-משימות שהסוכן שלכם מבצע. עבור כל אחת שאלו: "האם מומחה נפרד היה עושה את זה טוב יותר?" אם התשובה "כן" ל-2 מתוך 3 -- אתם מועמדים ל-multi-agent.
היתרונות והעלויות
| יתרונות | עלויות | |
|---|---|---|
| Specialization | כל סוכן מתמחה בתחום אחד, prompts קצרים ומדויקים | צריך לכתוב ולתחזק prompts לכל סוכן |
| Parallel Execution | ריצה במקביל = מהירות | אותה עלות כספית (לפעמים פחות בגלל contexts קצרים) |
| Separation of Concerns | קל לבדוק, לשנות, ולהחליף כל סוכן בנפרד | communication overhead בין סוכנים |
| Quality Control | סוכן-מבקר יכול לבדוק פלט של סוכן-יוצר | ריבוי סוכנים = ריבוי עלויות ($) |
| Flexibility | אפשר להשתמש ב-models שונים לסוכנים שונים | complexity בדיבוג ובניטור |
דפוסי תזמור -- 5 ארכיטקטורות
כל מערכת multi-agent מבוססת על אחד (או שילוב) מ-5 דפוסי תזמור. לכל דפוס יש חוזקות, חולשות, ו-use cases שבהם הוא מצטיין.
דפוס 1: Sequential Pipeline (צינור רציף)
סוכן A מעביר את הפלט שלו כקלט לסוכן B, שמעביר לסוכן C. כמו פס ייצור במפעל.
| יתרון | חיסרון | Use Case |
|---|---|---|
| הכי פשוט לבנות ולתקן | איטי -- כל שלב חוסם את הבא | Data collection -> Analysis -> Report |
from anthropic import Anthropic
client = Anthropic()
def run_agent(system_prompt: str, user_input: str, model: str = "claude-sonnet-4-20250514") -> str:
"""Run a single agent and return its response."""
response = client.messages.create(
model=model,
max_tokens=2048,
system=system_prompt,
messages=[{"role": "user", "content": user_input}]
)
return response.content[0].text
def content_pipeline(topic: str) -> dict:
"""Sequential pipeline: Research -> Write -> Edit."""
# Agent 1: Researcher
research = run_agent(
system_prompt="You are a research specialist. Gather key facts, "
"statistics, and insights about the given topic. "
"Output a structured research brief.",
user_input=f"Research the following topic thoroughly: {topic}"
)
print(f"[Researcher] Done: {len(research)} chars")
# Agent 2: Writer
draft = run_agent(
system_prompt="You are a professional content writer. Using the "
"research brief provided, write a compelling article. "
"Use clear structure, engaging tone, and examples.",
user_input=f"Write an article based on this research:\n\n{research}"
)
print(f"[Writer] Done: {len(draft)} chars")
# Agent 3: Editor
final = run_agent(
system_prompt="You are a meticulous editor. Review the article for "
"clarity, accuracy, grammar, and flow. Fix any issues "
"and return the polished final version.",
user_input=f"Edit and improve this article:\n\n{draft}"
)
print(f"[Editor] Done: {len(final)} chars")
return {
"research": research,
"draft": draft,
"final": final
}
# Usage
result = content_pipeline("AI agents in Israeli startups 2026")
print(result["final"])
import { generateText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
async function runAgent(
systemPrompt: string,
userInput: string,
model = 'claude-sonnet-4-20250514'
): Promise<string> {
const { text } = await generateText({
model: anthropic(model),
system: systemPrompt,
prompt: userInput,
maxTokens: 2048,
});
return text;
}
async function contentPipeline(topic: string) {
// Agent 1: Researcher
const research = await runAgent(
'You are a research specialist. Gather key facts and insights.',
`Research the following topic thoroughly: ${topic}`
);
console.log(`[Researcher] Done: ${research.length} chars`);
// Agent 2: Writer
const draft = await runAgent(
'You are a professional content writer. Write a compelling article.',
`Write an article based on this research:\n\n${research}`
);
console.log(`[Writer] Done: ${draft.length} chars`);
// Agent 3: Editor
const final = await runAgent(
'You are a meticulous editor. Polish the article for clarity and accuracy.',
`Edit and improve this article:\n\n${draft}`
);
console.log(`[Editor] Done: ${final.length} chars`);
return { research, draft, final: final };
}
// Usage
const result = await contentPipeline('AI agents in Israeli startups 2026');
הריצו את ה-Sequential Pipeline עם נושא שמעניין אתכם. שנו את ה-prompt של ה-Editor כדי שיוסיף ציון איכות מ-1 עד 10 בתחילת הפלט. האם הציון משתפר כשמוסיפים שלב רביעי -- Fact Checker?
דפוס 2: Parallel Fan-Out (פיזור מקבילי)
ה-orchestrator שולח משימות לכמה סוכנים במקביל ואוסף את כל התוצאות. מצוין כשיש תת-משימות עצמאיות.
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def run_agent_async(name: str, system_prompt: str, user_input: str) -> dict:
"""Run a single agent asynchronously."""
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_input}]
)
result = response.content[0].text
print(f"[{name}] Done")
return {"agent": name, "result": result}
async def parallel_research(query: str) -> list[dict]:
"""Fan-out: search multiple sources in parallel."""
agents = [
("Market Analyst", "You analyze market trends and competitive landscape."),
("Tech Researcher", "You analyze technology stacks and technical feasibility."),
("Financial Analyst", "You analyze costs, revenue models, and financial viability."),
("User Researcher", "You analyze user needs, pain points, and demand signals."),
]
tasks = [
run_agent_async(name, prompt, f"Analyze: {query}")
for name, prompt in agents
]
# All agents run in parallel
results = await asyncio.gather(*tasks)
return results
async def fan_out_with_aggregation(query: str) -> str:
"""Fan-out + aggregation: parallel research then synthesize."""
# Step 1: Fan-out (parallel)
results = await parallel_research(query)
# Step 2: Aggregate (single agent synthesizes all results)
combined = "\n\n".join(
f"## {r['agent']}\n{r['result']}" for r in results
)
synthesis = await run_agent_async(
"Synthesizer",
"You synthesize multiple research reports into a coherent summary "
"with key insights and recommendations.",
f"Synthesize these reports:\n\n{combined}"
)
return synthesis["result"]
# Usage
result = asyncio.run(fan_out_with_aggregation("Building an AI tutoring platform for Israel"))
print(result)
import { generateText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
interface AgentResult {
agent: string;
result: string;
}
async function runAgentAsync(
name: string, systemPrompt: string, userInput: string
): Promise<AgentResult> {
const { text } = await generateText({
model: anthropic('claude-sonnet-4-20250514'),
system: systemPrompt,
prompt: userInput,
});
console.log(`[${name}] Done`);
return { agent: name, result: text };
}
async function parallelResearch(query: string): Promise<string> {
const agents = [
{ name: 'Market Analyst', prompt: 'You analyze market trends.' },
{ name: 'Tech Researcher', prompt: 'You analyze technology feasibility.' },
{ name: 'Financial Analyst', prompt: 'You analyze costs and revenue.' },
{ name: 'User Researcher', prompt: 'You analyze user needs and demand.' },
];
// All agents run in parallel with Promise.all
const results = await Promise.all(
agents.map(a => runAgentAsync(a.name, a.prompt, `Analyze: ${query}`))
);
// Synthesize
const combined = results.map(r => `## ${r.agent}\n${r.result}`).join('\n\n');
const synthesis = await runAgentAsync(
'Synthesizer',
'Synthesize multiple reports into a coherent summary.',
`Synthesize:\n\n${combined}`
);
return synthesis.result;
}
דפוס 3: Supervisor / Manager (מפקח)
סוכן אחד משמש כמנהל -- הוא מקבל את המשימה, מחליט דינמית איזה worker מטפל בה, מפקח על התוצאות, ומחליט מתי המשימה סיימה. זה הדפוס הגמיש ביותר אבל גם המורכב ביותר.
import json
from anthropic import Anthropic
client = Anthropic()
WORKERS = {
"developer": {
"description": "Writes code, fixes bugs, implements features",
"system": "You are an expert software developer. Write clean, tested code.",
},
"designer": {
"description": "Creates UI/UX designs, wireframes, style guides",
"system": "You are a UI/UX designer. Create clear, user-friendly designs.",
},
"tester": {
"description": "Writes tests, finds bugs, validates functionality",
"system": "You are a QA engineer. Write thorough tests and find edge cases.",
},
"documenter": {
"description": "Writes docs, README files, API documentation",
"system": "You are a technical writer. Write clear, comprehensive docs.",
},
}
def supervisor_loop(task: str, max_steps: int = 6) -> list[dict]:
"""Supervisor dynamically assigns tasks to workers."""
worker_list = "\n".join(
f"- {name}: {w['description']}" for name, w in WORKERS.items()
)
history = []
supervisor_messages = []
for step in range(max_steps):
supervisor_prompt = f"""You are a project manager. Your team:
{worker_list}
Task: {task}
Work done so far:
{json.dumps(history, indent=2) if history else "Nothing yet."}
Decide the next action. Respond with JSON:
{{"action": "delegate", "worker": "", "instruction": ""}}
or if the task is complete:
{{"action": "complete", "summary": ""}}"""
supervisor_messages.append({"role": "user", "content": supervisor_prompt})
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system="You are a project supervisor. Respond with valid JSON only.",
messages=supervisor_messages
)
decision_text = response.content[0].text
supervisor_messages.append({"role": "assistant", "content": decision_text})
try:
decision = json.loads(decision_text)
except json.JSONDecodeError:
break
if decision["action"] == "complete":
print(f"[Supervisor] Task complete: {decision['summary']}")
history.append({"step": step + 1, "action": "complete", "summary": decision["summary"]})
break
worker_name = decision["worker"]
instruction = decision["instruction"]
print(f"[Supervisor] Step {step+1}: Delegating to {worker_name}")
# Run the worker
worker = WORKERS[worker_name]
worker_response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=worker["system"],
messages=[{"role": "user", "content": instruction}]
)
result = worker_response.content[0].text
history.append({
"step": step + 1, "worker": worker_name,
"instruction": instruction,
"result": result[:500] # Truncate for context management
})
return history
# Usage
results = supervisor_loop("Build a REST API for a todo app with tests and docs")
דפוס 4: Swarm / Handoff (נחיל / העברה)
בדפוס הזה, אין orchestrator מרכזי. כל סוכן יכול להחליט להעביר שליטה לסוכן אחר. זה דומה לניתוב שירות לקוחות -- "אני מעביר אותך למחלקה הטכנית".
from anthropic import Anthropic
import json
client = Anthropic()
AGENTS = {
"triage": {
"system": """You are a customer service triage agent. Analyze the customer's
request and decide which department handles it best.
Respond with JSON: {"handoff_to": "billing|technical|sales|none", "reason": "..."}
If you can answer directly (simple greetings, general info), use "none".""",
"can_handoff_to": ["billing", "technical", "sales"]
},
"billing": {
"system": """You are a billing specialist. Handle payment, invoice, and
account balance questions. If the issue is technical, hand off.
To hand off, include in your response: HANDOFF:technical or HANDOFF:triage""",
"can_handoff_to": ["technical", "triage"]
},
"technical": {
"system": """You are a technical support specialist. Handle product issues,
bugs, and how-to questions. If it's a billing issue, hand off.
To hand off, include: HANDOFF:billing or HANDOFF:triage""",
"can_handoff_to": ["billing", "triage"]
},
"sales": {
"system": """You are a sales specialist. Handle pricing questions, plan
upgrades, and new feature inquiries. Hand off if needed.
To hand off, include: HANDOFF:billing or HANDOFF:triage""",
"can_handoff_to": ["billing", "triage"]
},
}
def swarm_conversation(user_message: str, max_handoffs: int = 3) -> str:
"""Process a customer request through the swarm."""
current_agent = "triage"
handoff_count = 0
context = f"Customer message: {user_message}"
while handoff_count <= max_handoffs:
agent = AGENTS[current_agent]
print(f"[{current_agent.upper()}] Processing...")
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=agent["system"],
messages=[{"role": "user", "content": context}]
)
result = response.content[0].text
# Check for handoff
if "HANDOFF:" in result:
target = result.split("HANDOFF:")[1].strip().split()[0]
if target in agent["can_handoff_to"]:
print(f"[{current_agent.upper()}] Handing off to {target}")
current_agent = target
context = f"Handed off from previous agent.\nOriginal: {user_message}\nContext: {result}"
handoff_count += 1
continue
# Triage returns JSON
if current_agent == "triage":
try:
decision = json.loads(result)
if decision["handoff_to"] != "none":
current_agent = decision["handoff_to"]
context = f"Routed by triage: {decision['reason']}\nCustomer: {user_message}"
handoff_count += 1
continue
except (json.JSONDecodeError, KeyError):
pass
return result
return "I apologize, let me connect you with a human agent."
# Usage
print(swarm_conversation("I was charged twice for my subscription last month"))
print(swarm_conversation("How do I reset my password?"))
דפוס 5: Debate / Adversarial (דיון / יריבות)
שני סוכנים (או יותר) מתווכחים -- אחד כותב, השני מבקר. הוויכוח מייצר פלט טוב יותר כי כל סוכן "מאתגר" את השני. מצוין להפחתת hallucinations.
from anthropic import Anthropic
client = Anthropic()
def debate(topic: str, rounds: int = 3) -> str:
"""Two agents debate a topic. A judge synthesizes the best answer."""
writer_history = []
critic_history = []
for round_num in range(1, rounds + 1):
print(f"\n--- Round {round_num} ---")
# Writer produces or revises
if round_num == 1:
writer_input = f"Write a thorough analysis of: {topic}"
else:
writer_input = (
f"The critic said:\n{criticism}\n\n"
f"Revise your analysis to address these points."
)
writer_history.append({"role": "user", "content": writer_input})
writer_resp = client.messages.create(
model="claude-sonnet-4-20250514", max_tokens=2048,
system="You are an expert analyst. Write accurate, well-sourced analysis.",
messages=writer_history
)
analysis = writer_resp.content[0].text
writer_history.append({"role": "assistant", "content": analysis})
print(f"[Writer] {len(analysis)} chars")
# Critic reviews
critic_input = (
f"Critically review this analysis. Find errors, gaps, weak arguments, "
f"and unsupported claims:\n\n{analysis}"
)
critic_history.append({"role": "user", "content": critic_input})
critic_resp = client.messages.create(
model="claude-sonnet-4-20250514", max_tokens=1024,
system="You are a rigorous critic and fact-checker. Find every flaw.",
messages=critic_history
)
criticism = critic_resp.content[0].text
critic_history.append({"role": "assistant", "content": criticism})
print(f"[Critic] Found issues")
# Judge synthesizes the final answer
judge_input = (
f"Two experts debated '{topic}' for {rounds} rounds.\n\n"
f"Final analysis:\n{analysis}\n\n"
f"Final criticism:\n{criticism}\n\n"
f"Synthesize the best possible answer, incorporating valid criticisms."
)
judge_resp = client.messages.create(
model="claude-sonnet-4-20250514", max_tokens=2048,
system="You are an impartial judge. Synthesize the strongest answer.",
messages=[{"role": "user", "content": judge_input}]
)
return judge_resp.content[0].text
# Usage
result = debate("Will AI agents replace most SaaS products by 2028?", rounds=3)
print(result)
הריצו את ה-Debate pattern עם שאלה שאתם יודעים את התשובה לה (למשל: "Is Python better than JavaScript for AI agents?"). השוו את התשובה לאחרי 1 round, 2 rounds, ו-3 rounds. האם האיכות באמת משתפרת? מתי הדיון הופך ל-"circular"?
השוואה: מתי להשתמש בכל דפוס
| דפוס | מתי להשתמש | Complexity | Speed | Cost |
|---|---|---|---|---|
| Sequential | שלבים תלויים זה בזה, pipeline ברור | Low | Slow | $ |
| Parallel | תת-משימות עצמאיות, צריך מהירות | Medium | Fast | $$ |
| Supervisor | משימות לא צפויות, צריך גמישות | High | Variable | $$$ |
| Swarm | ניתוב טבעי, שירות לקוחות, workflows | Medium | Fast | $$ |
| Debate | צריך דיוק גבוה, הפחתת hallucinations | Low | Slow | $$$ |
Pro tip: אפשר לשלב דפוסים. לדוגמה: Supervisor שמפעיל Parallel fan-out לחלק מהמשימות, ו-Debate לשלב הביקורת.
תקשורת בין סוכנים
כשסוכנים עובדים יחד, הם צריכים לתקשר. יש 4 דפוסי תקשורת עיקריים, וכל אחד מתאים למצבים שונים.
1. Shared State -- כולם קוראים וכותבים לאותו אובייקט
הדרך הפשוטה ביותר: אובייקט state אחד שכל הסוכנים ניגשים אליו. זה הגישה של LangGraph.
| יתרון | חיסרון |
|---|---|
| פשוט להבנה ולמימוש | Race conditions כשסוכנים כותבים במקביל |
| כל סוכן רואה את התמונה המלאה | Coupling -- שינוי ב-schema משפיע על כולם |
from dataclasses import dataclass, field
from typing import Any
@dataclass
class PipelineState:
"""Shared state accessible by all agents."""
task: str = ""
research: str = ""
draft: str = ""
review: str = ""
final: str = ""
metadata: dict = field(default_factory=dict)
errors: list[str] = field(default_factory=list)
status: str = "pending" # pending, in_progress, complete, failed
def log(self, agent: str, message: str):
"""Log an action to shared state."""
if "log" not in self.metadata:
self.metadata["log"] = []
self.metadata["log"].append({"agent": agent, "message": message})
# Each agent reads from and writes to the same state
def researcher(state: PipelineState) -> PipelineState:
state.status = "in_progress"
state.log("researcher", "Starting research")
# ... call LLM, write to state.research ...
state.research = "Research results here..."
state.log("researcher", "Research complete")
return state
def writer(state: PipelineState) -> PipelineState:
state.log("writer", "Starting draft")
# ... use state.research to write draft ...
state.draft = f"Article based on: {state.research[:100]}..."
state.log("writer", "Draft complete")
return state
2. Message Passing -- סוכנים שולחים הודעות זה לזה
כל סוכן שולח הודעות מובנות לסוכנים אחרים. Decoupled -- סוכנים לא צריכים לדעת על ה-internal state של סוכנים אחרים.
interface AgentMessage {
from: string;
to: string;
type: 'task' | 'result' | 'error' | 'handoff';
payload: any;
correlationId: string;
timestamp: Date;
}
class MessageBus {
private queues: Map<string, AgentMessage[]> = new Map();
private listeners: Map<string, ((msg: AgentMessage) => void)[]> = new Map();
send(message: AgentMessage): void {
const queue = this.queues.get(message.to) || [];
queue.push(message);
this.queues.set(message.to, queue);
// Notify listeners
const handlers = this.listeners.get(message.to) || [];
handlers.forEach(handler => handler(message));
}
receive(agentName: string): AgentMessage | undefined {
const queue = this.queues.get(agentName) || [];
return queue.shift();
}
subscribe(agentName: string, handler: (msg: AgentMessage) => void): void {
const handlers = this.listeners.get(agentName) || [];
handlers.push(handler);
this.listeners.set(agentName, handlers);
}
}
// Usage
const bus = new MessageBus();
const correlationId = crypto.randomUUID();
bus.send({
from: 'orchestrator',
to: 'researcher',
type: 'task',
payload: { topic: 'AI agents market in Israel' },
correlationId,
timestamp: new Date(),
});
3. Blackboard Pattern -- לוח משותף
כל הסוכנים ניגשים לknowledge base משותף. כל סוכן קורא מה שרלוונטי לו, מעבד, וכותב את התוצאה חזרה. הסוכנים לא מדברים זה עם זה ישירות.
4. Event-Driven -- סוכנים מפרסמים אירועים, אחרים נרשמים
דומה ל-pub/sub. סוכן A מפרסם אירוע ("research_complete"), סוכנים שנרשמו לאירוע הזה מופעלים אוטומטית. הכי decoupled, אבל קשה לדבג.
לכל אחד מ-4 דפוסי התקשורת, כתבו משפט אחד שמתאר מתי תשתמשו בו בפרויקט שלכם. למשל: "Shared State -- כשכל הסוכנים צריכים לגשת לנתוני הלקוח".
בחירת דפוס תקשורת
| דפוס | Coupling | Debugging | Best For |
|---|---|---|---|
| Shared State | High | Easy | Pipeline, LangGraph, פרויקטים קטנים |
| Message Passing | Low | Medium | מיקרו-שירותים, מערכות גדולות |
| Blackboard | Medium | Medium | ניתוח מורכב, סוכנים שעובדים בקצב שונה |
| Event-Driven | Very Low | Hard | מערכות real-time, scalable architectures |
ניהול Shared State
כששני סוכנים כותבים ל-state באותו זמן -- מי מנצח? זו בעיית ה-consistency, והיא חריפה במערכות multi-agent.
הבעיה: Race Conditions
דמיינו שני סוכנים שרצים במקביל. שניהם קוראים את state.summary, שניהם מעדכנים אותו. הסוכן שסיים אחרון דורס את העדכון של הראשון. נתונים אבדו.
אסטרטגיות פתרון
| אסטרטגיה | איך עובד | מתי להשתמש |
|---|---|---|
| Last-Write-Wins | העדכון האחרון דורס הכל | כשלא אכפת מאיבוד עדכונים (לוגים, metrics) |
| Append-Only | כל סוכן מוסיף לרשימה, לא דורס | הודעות, לוגים, תוצאות -- הגישה הנפוצה ביותר |
| Lock | סוכן נועל את ה-state, מעדכן, משחרר | עדכונים קריטיים שלא צריך לאבד |
| Per-Agent Namespace | כל סוכן כותב רק לשדות שלו | הגישה הפשוטה ביותר -- אין conflicts כלל |
from dataclasses import dataclass, field
from typing import Any
import threading
@dataclass
class MultiAgentState:
"""Thread-safe shared state with per-agent namespaces."""
# Per-agent namespaces -- no conflicts
agent_outputs: dict[str, Any] = field(default_factory=dict)
# Shared append-only log
messages: list[dict] = field(default_factory=list)
# Lock for thread-safe writes
_lock: threading.Lock = field(default_factory=threading.Lock)
def set_output(self, agent_name: str, key: str, value: Any):
"""Each agent writes to its own namespace."""
with self._lock:
if agent_name not in self.agent_outputs:
self.agent_outputs[agent_name] = {}
self.agent_outputs[agent_name][key] = value
def get_output(self, agent_name: str, key: str) -> Any:
"""Read any agent's output."""
return self.agent_outputs.get(agent_name, {}).get(key)
def add_message(self, from_agent: str, content: str):
"""Append-only message log -- never loses data."""
with self._lock:
self.messages.append({
"from": from_agent,
"content": content,
"index": len(self.messages)
})
def get_all_outputs(self) -> dict:
"""Get a snapshot of all agent outputs."""
with self._lock:
return dict(self.agent_outputs)
# Usage
state = MultiAgentState()
state.set_output("researcher", "findings", "Found 3 key trends...")
state.set_output("analyst", "recommendation", "Invest in vertical AI...")
state.add_message("researcher", "Research phase complete")
state.add_message("analyst", "Analysis phase complete")
# No conflicts -- each agent has its own namespace
print(state.get_output("researcher", "findings"))
print(state.get_all_outputs())
LangGraph פותר את בעיית ה-state בצורה אלגנטית עם reducers. במקום ש-state.messages יידרס, ה-reducer add_messages מוסיף הודעות חדשות לרשימה הקיימת. זה מונע את רוב ה-race conditions:
from langgraph.graph import StateGraph
from typing import Annotated
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # Append-only reducer
research: str
draft: str
כל node שמחזיר {"messages": [new_msg]} -- ההודעה מתווספת לרשימה, לא דורסת.
קחו את ה-Sequential Pipeline מ-Section 2 ושנו אותו כך שכל הסוכנים כותבים ל-MultiAgentState משותף (עם per-agent namespaces). הוסיפו add_message בכל שלב כדי ליצור log מלא של העבודה. הדפיסו את ה-log בסוף.
Delegation ו-Escalation
Delegation ו-Escalation הם שני דפוסים משלימים שמנהלים את זרימת העבודה בין סוכנים.
Delegation: "תעשה את זה בשבילי"
סוכן A מזהה שהוא צריך עזרה בתת-משימה, שולח אותה לסוכן B, ומחכה לתוצאה. כמו מנהל שמאציל משימה לעובד.
Escalation: "אני לא יכול לטפל בזה"
סוכן מזהה שהמשימה מעבר ליכולותיו ומעביר אותה לסוכן "חזק" יותר (model גדול יותר, יותר tools, או אדם).
from anthropic import Anthropic
client = Anthropic()
MAX_DELEGATION_DEPTH = 3 # Prevent infinite delegation chains
def specialist_agent(
name: str,
system_prompt: str,
task: str,
available_specialists: dict[str, str],
depth: int = 0
) -> str:
"""Agent that can delegate to specialists, with depth limit."""
if depth >= MAX_DELEGATION_DEPTH:
# Escalate to human instead of delegating further
return f"[ESCALATION] Task too complex after {depth} delegations. Need human review: {task}"
specialist_list = "\n".join(
f"- {k}: {v}" for k, v in available_specialists.items()
)
full_prompt = f"""{system_prompt}
You can delegate subtasks to specialists:
{specialist_list}
To delegate, respond with: DELEGATE:specialist_name:instruction
To answer directly, just respond normally.
Current delegation depth: {depth}/{MAX_DELEGATION_DEPTH}"""
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=full_prompt,
messages=[{"role": "user", "content": task}]
)
result = response.content[0].text
# Check for delegation
if "DELEGATE:" in result:
parts = result.split("DELEGATE:")[1].strip().split(":", 1)
target = parts[0].strip()
instruction = parts[1].strip() if len(parts) > 1 else task
if target in available_specialists:
print(f"[{name}] Delegating to {target} (depth {depth+1})")
sub_result = specialist_agent(
name=target,
system_prompt=available_specialists[target],
task=instruction,
available_specialists=available_specialists,
depth=depth + 1
)
# Agent can use the sub-result
return f"[{name}] used [{target}]'s work:\n{sub_result}"
return f"[{name}]: {result}"
# Usage
specialists = {
"coder": "You write Python code. Clean, tested, documented.",
"reviewer": "You review code for bugs, security, and best practices.",
"documenter": "You write clear technical documentation.",
}
result = specialist_agent(
name="project_lead",
system_prompt="You are a project lead. Break tasks into subtasks and delegate.",
task="Create a function to validate Israeli phone numbers with tests and docs",
available_specialists=specialists
)
print(result)
בלי depth limit, סוכן A יכול להאציל לסוכן B, שמאציל חזרה ל-A, שמאציל ל-B... לולאה אינסופית שעולה כסף. תמיד הגדירו MAX_DELEGATION_DEPTH (מומלץ: 3-5). כשמגיעים לגבול -- escalate לאדם, לא ממשיכים להאציל.
Specialist Routing Pattern
הדפוס הנפוץ ביותר: triage agent שמנתח את הבקשה ומנתב לסוכן המתמחה הנכון. ה-triage רץ על model זול (Haiku / GPT-4o-mini), והמומחים על model חזק (Sonnet / Opus). זה חוסך עלויות משמעותיות.
import { generateText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
interface Specialist {
name: string;
description: string;
systemPrompt: string;
model: string; // Different models per specialist
}
const specialists: Specialist[] = [
{
name: 'code_expert',
description: 'Programming questions, debugging, code review',
systemPrompt: 'You are an expert programmer. Write clean, tested code.',
model: 'claude-sonnet-4-20250514',
},
{
name: 'data_analyst',
description: 'Data analysis, SQL, visualization, statistics',
systemPrompt: 'You are a data analyst. Provide clear, data-driven insights.',
model: 'claude-sonnet-4-20250514',
},
{
name: 'writer',
description: 'Content writing, editing, copywriting, translation',
systemPrompt: 'You are a professional writer. Create compelling content.',
model: 'claude-haiku-4-20250514', // Cheaper model for writing
},
];
async function triageAndRoute(userQuery: string): Promise<string> {
// Step 1: Triage with cheap model
const specialistList = specialists
.map(s => `- ${s.name}: ${s.description}`)
.join('\n');
const { text: routing } = await generateText({
model: anthropic('claude-haiku-4-20250514'), // Cheap triage
system: `Route the user's request to the right specialist.
Available specialists:\n${specialistList}
Respond with just the specialist name, nothing else.`,
prompt: userQuery,
});
const selected = specialists.find(s => s.name === routing.trim());
if (!selected) {
// Fallback to first specialist
return triageAndRoute(userQuery);
}
console.log(`[Triage] Routed to: ${selected.name}`);
// Step 2: Run specialist with appropriate model
const { text: answer } = await generateText({
model: anthropic(selected.model),
system: selected.systemPrompt,
prompt: userQuery,
});
return answer;
}
כתבו 3 כללי escalation לפרויקט שלכם. למשל: "אם הסוכן נתקל בשגיאת database -- escalate ל-senior agent. אם אין תשובה אחרי 3 ניסיונות -- escalate לאדם." חשבו מה הסימנים שהסוכן צריך לזהות.
Multi-Agent בכל SDK
כל SDK מטפל ב-multi-agent אחרת. הנה איך כל אחד מממש את הדפוסים, ולמה כל אחד מצטיין:
Claude Agent SDK: Sub-Agents ו-Orchestrator
ה-Claude Agent SDK תומך ב-sub-agents דרך supportedAgents -- סוכן ראשי שיכול "לקרוא" לסוכנים אחרים כ-tools.
from claude_agent_sdk import Agent, Tool
# Define specialist agents
researcher = Agent(
name="researcher",
model="claude-sonnet-4-20250514",
system="You are a research specialist. Find facts and data.",
tools=[web_search_tool, database_tool]
)
writer = Agent(
name="writer",
model="claude-sonnet-4-20250514",
system="You are a professional writer.",
tools=[] # No tools needed -- pure generation
)
# Orchestrator agent that can delegate to sub-agents
orchestrator = Agent(
name="orchestrator",
model="claude-sonnet-4-20250514",
system="""You coordinate a team. Available sub-agents:
- researcher: finds facts and data
- writer: writes polished content
Delegate tasks as needed.""",
supported_agents=[researcher, writer]
)
# The orchestrator decides which sub-agent to call
result = orchestrator.run("Write an article about AI adoption in Israeli healthcare")
OpenAI SDK: Handoffs ו-Triage
ה-OpenAI Agents SDK מציע handoff כ-primitive מובנה. סוכן יכול להעביר שליטה לסוכן אחר עם handoff().
from agents import Agent, handoff, Runner
billing_agent = Agent(
name="Billing Specialist",
instructions="Handle billing questions. If technical, hand off to tech.",
handoffs=["technical_agent"]
)
technical_agent = Agent(
name="Technical Specialist",
instructions="Handle technical issues. If billing, hand off to billing.",
handoffs=["billing_agent"]
)
triage_agent = Agent(
name="Triage",
instructions="Route to the right specialist based on the question.",
handoffs=[billing_agent, technical_agent]
)
# Run -- the triage agent routes automatically
result = await Runner.run(triage_agent, "I was charged twice for my subscription")
print(result.final_output) # Handled by billing_agent
Google ADK: SequentialAgent, ParallelAgent, LoopAgent
Google's Agent Development Kit מציע 3 built-in multi-agent patterns כ-classes:
from google.adk.agents import (
LlmAgent, SequentialAgent, ParallelAgent, LoopAgent
)
# Define individual agents
researcher = LlmAgent(name="researcher", model="gemini-2.5-pro",
instruction="Research the given topic.")
writer = LlmAgent(name="writer", model="gemini-2.5-pro",
instruction="Write an article from the research.")
reviewer = LlmAgent(name="reviewer", model="gemini-2.5-pro",
instruction="Review and score the article 1-10.")
# Sequential: researcher -> writer -> reviewer
pipeline = SequentialAgent(
name="content_pipeline",
sub_agents=[researcher, writer, reviewer]
)
# Parallel: run multiple researchers at once
multi_research = ParallelAgent(
name="parallel_research",
sub_agents=[market_researcher, tech_researcher, user_researcher]
)
# Loop: keep refining until quality score >= 8
refinement_loop = LoopAgent(
name="refinement",
sub_agents=[writer, reviewer],
max_iterations=3,
stop_condition=lambda state: state.get("score", 0) >= 8
)
LangGraph: Sub-Graphs ו-Supervisor
LangGraph מאפשר לבנות sub-graphs -- כל סוכן הוא graph בפני עצמו, וה-orchestrator הוא graph שמפעיל sub-graphs כ-nodes.
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
class TeamState(TypedDict):
messages: Annotated[list, add_messages]
next_agent: str
def supervisor_node(state: TeamState) -> dict:
"""Supervisor decides which worker to call next."""
# Call LLM to decide routing
response = llm.invoke(
f"Given the conversation, who should act next? "
f"Options: researcher, writer, reviewer, FINISH"
)
return {"next_agent": response.content}
def researcher_node(state: TeamState) -> dict:
result = llm.invoke("Research: " + state["messages"][-1].content)
return {"messages": [{"role": "assistant", "content": f"[Research] {result.content}"}]}
def writer_node(state: TeamState) -> dict:
result = llm.invoke("Write article from: " + state["messages"][-1].content)
return {"messages": [{"role": "assistant", "content": f"[Writer] {result.content}"}]}
def route(state: TeamState) -> Literal["researcher", "writer", "reviewer", "__end__"]:
if state["next_agent"] == "FINISH":
return "__end__"
return state["next_agent"]
# Build the graph
graph = StateGraph(TeamState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
graph.set_entry_point("supervisor")
graph.add_conditional_edges("supervisor", route)
# Workers always return to supervisor
for worker in ["researcher", "writer", "reviewer"]:
graph.add_edge(worker, "supervisor")
app = graph.compile()
CrewAI: Crews עם Process Types
CrewAI מציע את ה-API הכי "מוכן" ל-multi-agent. מגדירים agents, tasks, ו-crew עם process type.
from crewai import Agent, Task, Crew, Process
researcher = Agent(
role="Senior Researcher",
goal="Find comprehensive data about the topic",
backstory="Expert researcher with 10 years of experience",
llm="claude-sonnet-4-20250514",
allow_delegation=True
)
writer = Agent(
role="Content Writer",
goal="Write engaging, accurate articles",
backstory="Award-winning journalist",
llm="claude-sonnet-4-20250514"
)
editor = Agent(
role="Editor",
goal="Ensure quality, accuracy, and clarity",
backstory="Veteran editor at a major publication",
llm="claude-sonnet-4-20250514"
)
research_task = Task(
description="Research AI agent adoption in Israeli tech companies",
agent=researcher,
expected_output="Structured research brief with facts and sources"
)
write_task = Task(
description="Write a 1000-word article based on the research",
agent=writer,
expected_output="Polished article ready for review"
)
edit_task = Task(
description="Review and polish the article",
agent=editor,
expected_output="Publication-ready article"
)
# Hierarchical process: a manager agent coordinates
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, write_task, edit_task],
process=Process.hierarchical,
manager_llm="claude-sonnet-4-20250514"
)
result = crew.kickoff()
השוואה: איזה SDK למה
| SDK | חוזקה ב-Multi-Agent | חולשה | הכי טוב ל- |
|---|---|---|---|
| Claude Agent SDK | Sub-agents נקיים, deep Anthropic integration | Ecosystem צעיר | Anthropic-first projects |
| Vercel AI SDK | TypeScript native, multi-step per agent | אין handoff מובנה | Web apps, Next.js |
| OpenAI SDK | Handoff primitive מובנה, guardrails | OpenAI lock-in | Swarm/triage patterns |
| Google ADK | Built-in Sequential/Parallel/Loop | חדש, documentation limited | Google Cloud ecosystem |
| LangGraph | Graphs, sub-graphs, state management | Learning curve תלול | Complex workflows, production |
| CrewAI | Highest abstraction, crew metaphor | Less control, "magic" | Rapid prototyping, teams |
בחרו את ה-SDK שאתם הכי מכירים (מפרקים 5-11). כתבו pseudo-code של 10 שורות שמתאר מערכת multi-agent לפרויקט שלכם באותו SDK. מה ה-orchestration pattern? כמה סוכנים? מה כל אחד עושה?
Debugging מערכות Multi-Agent
דיבוג סוכן בודד קשה. דיבוג 5 סוכנים שעובדים יחד -- זה סדר גודל יותר קשה. הבעיה המרכזית: visibility. מה קרה? מי החליט מה? למה הפלט שגוי?
Distributed Tracing עם Correlation IDs
הכלי הכי חשוב ב-debugging multi-agent: correlation ID. מזהה ייחודי שעובר בין כל הסוכנים ב-run אחד.
import uuid
import time
import json
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TraceEvent:
correlation_id: str
agent_name: str
event_type: str # "start", "llm_call", "tool_call", "handoff", "error", "end"
timestamp: float
data: dict = field(default_factory=dict)
duration_ms: float = 0
class MultiAgentTracer:
"""Distributed tracing for multi-agent systems."""
def __init__(self):
self.events: list[TraceEvent] = []
def start_run(self) -> str:
correlation_id = str(uuid.uuid4())[:8]
self.log(correlation_id, "system", "run_start", {})
return correlation_id
def log(self, correlation_id: str, agent: str, event_type: str, data: dict):
event = TraceEvent(
correlation_id=correlation_id,
agent_name=agent,
event_type=event_type,
timestamp=time.time(),
data=data
)
self.events.append(event)
# Real-time logging
print(f"[{correlation_id}][{agent}] {event_type}: "
f"{json.dumps(data, ensure_ascii=False)[:100]}")
def trace_agent(self, correlation_id: str, agent_name: str):
"""Context manager for tracing an agent's execution."""
class AgentTrace:
def __init__(self, tracer, cid, name):
self.tracer = tracer
self.cid = cid
self.name = name
self.start_time = None
def __enter__(self):
self.start_time = time.time()
self.tracer.log(self.cid, self.name, "start", {})
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = (time.time() - self.start_time) * 1000
if exc_type:
self.tracer.log(self.cid, self.name, "error",
{"error": str(exc_val), "duration_ms": duration})
else:
self.tracer.log(self.cid, self.name, "end",
{"duration_ms": duration})
return AgentTrace(self, correlation_id, agent_name)
def get_timeline(self, correlation_id: str) -> list[dict]:
"""Get ordered timeline of events for a run."""
events = [e for e in self.events if e.correlation_id == correlation_id]
events.sort(key=lambda e: e.timestamp)
return [
{"agent": e.agent_name, "event": e.event_type,
"data": e.data, "time": e.timestamp}
for e in events
]
# Usage in a multi-agent pipeline
tracer = MultiAgentTracer()
cid = tracer.start_run()
with tracer.trace_agent(cid, "researcher"):
tracer.log(cid, "researcher", "llm_call", {"model": "sonnet", "tokens": 1500})
research = "..." # actual LLM call
with tracer.trace_agent(cid, "writer"):
tracer.log(cid, "writer", "llm_call", {"model": "sonnet", "tokens": 2000})
draft = "..." # actual LLM call
# View the full timeline
for event in tracer.get_timeline(cid):
print(f" {event['agent']}: {event['event']}")
כשלונות נפוצים
| כשל | סימפטום | פתרון |
|---|---|---|
| Infinite Loop | סוכן A מאציל ל-B, B חזרה ל-A, חוזר ומחזר | Depth limit + cycle detection |
| Wrong Agent Selected | שאלת billing הגיעה ל-technical agent | שפרו את prompt ה-triage + הוסיפו test cases |
| Context Lost in Handoff | הסוכן השני לא יודע מה קרה לפני | העבירו summary + key facts בכל handoff |
| State Corruption | תוצאות לא הגיוניות, נתונים דרוסים | Per-agent namespaces + append-only logs |
| Silent Failure | סוכן נכשל אבל אף אחד לא שם לב | Health checks + error propagation |
הוסיפו את ה-MultiAgentTracer ל-Supervisor Pattern מ-Section 2. הריצו ואז בדקו את ה-timeline. כמה LLM calls נעשו? כמה זמן כל סוכן עבד? האם יש שלב שלקח הרבה יותר מהאחרים?
ניהול עלויות ב-Multi-Agent
הבעיה המיידית ביותר עם multi-agent: N סוכנים = בערך N כפול העלות. Supervisor שמפעיל 4 workers עם 3 iterations = 13 LLM calls ב-run אחד. בלי ניהול עלויות, הפרויקט ישרוף תקציב תוך ימים.
אסטרטגיות לחיסכון
| אסטרטגיה | חיסכון צפוי | איך |
|---|---|---|
| Model tiering | 50-80% | Haiku לניתוב ($0.25/1M), Sonnet ליצירה ($3/1M), Opus רק כשחייבים ($15/1M) |
| Context trimming | 20-40% | העבירו רק summary בין סוכנים, לא את כל ה-context |
| Caching | 30-60% | אם סוכן B שואל את אותה שאלה פעמיים -- cache את התשובה |
| Early termination | 10-30% | אם ה-supervisor רואה תשובה טובה -- עצרו, לא ממשיכים iterations |
| Per-run budget | 100% (cap) | הגדירו תקציב מקסימלי לכל run. עצרו כשמגיעים לגבול |
Multi-Agent Cost Estimator
from dataclasses import dataclass
@dataclass
class AgentCostConfig:
name: str
model: str
avg_input_tokens: int
avg_output_tokens: int
calls_per_run: float # Average number of calls per run
# Pricing per 1M tokens (March 2026)
MODEL_PRICING = {
"claude-opus-4": {"input": 15.0, "output": 75.0},
"claude-sonnet-4": {"input": 3.0, "output": 15.0},
"claude-haiku-4": {"input": 0.25, "output": 1.25},
"gpt-5": {"input": 2.50, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gemini-2.5-pro": {"input": 1.25, "output": 5.0},
"gemini-2.5-flash": {"input": 0.15, "output": 0.60},
}
def estimate_run_cost(agents: list[AgentCostConfig]) -> dict:
"""Estimate total cost for one multi-agent run."""
total = 0
breakdown = []
for agent in agents:
pricing = MODEL_PRICING[agent.model]
input_cost = (agent.avg_input_tokens / 1_000_000) * pricing["input"]
output_cost = (agent.avg_output_tokens / 1_000_000) * pricing["output"]
agent_cost = (input_cost + output_cost) * agent.calls_per_run
breakdown.append({
"agent": agent.name,
"model": agent.model,
"cost_per_call": f"${input_cost + output_cost:.4f}",
"calls": agent.calls_per_run,
"total": f"${agent_cost:.4f}"
})
total += agent_cost
return {
"total_per_run": f"${total:.4f}",
"total_per_1000_runs": f"${total * 1000:.2f}",
"total_per_day_100_runs": f"${total * 100:.2f}",
"breakdown": breakdown
}
# Example: Content Pipeline with 4 agents
pipeline_agents = [
AgentCostConfig("triage", "claude-haiku-4", 500, 100, 1),
AgentCostConfig("researcher", "claude-sonnet-4", 2000, 1500, 1),
AgentCostConfig("writer", "claude-sonnet-4", 3000, 2000, 1),
AgentCostConfig("editor", "claude-sonnet-4", 4000, 2000, 1),
]
estimate = estimate_run_cost(pipeline_agents)
print(f"Cost per run: {estimate['total_per_run']}")
print(f"Cost per 1000 runs: {estimate['total_per_1000_runs']}")
for item in estimate["breakdown"]:
print(f" {item['agent']} ({item['model']}): {item['total']}")
Pipeline של 4 סוכנים על Sonnet 4 עם triage על Haiku: ~$0.04 per run. עם 1,000 runs ביום = ~$40/day. אם מחליפים את ה-triage ל-rule-based (ללא LLM) ומוסיפים cache: ~$25/day. Model tiering (Haiku לכתיבה, Sonnet לניתוח) יכול להוריד ל-~$15/day. הפער בין תכנון חכם לנאיבי הוא 3x-5x בעלות.
Supervisor pattern עם max_steps=10 ו-4 workers יכול להגיע ל-40+ LLM calls ב-run אחד. אם ה-supervisor "מתבלבל" ולא מסיים, אתם משלמים על loops חסרי תועלת. תמיד הגדירו max_steps נמוך (3-5 לרוב ה-use cases) ו-per-run budget כ-safety net.
ארכיטקטורות מהעולם האמיתי
איך חברות אמיתיות בונות מערכות multi-agent? הנה 4 ארכיטקטורות מוכחות:
1. Customer Support Pipeline
הארכיטקטורה הנפוצה ביותר -- Swarm + Escalation:
| שלב | סוכן | Model | תפקיד |
|---|---|---|---|
| 1 | Triage Agent | Haiku (זול) | מנתח את הבקשה, מנתב למחלקה |
| 2 | Specialist Agent | Sonnet | מטפל בבקשה עם tools ספציפיים (billing, tech, sales) |
| 3 | Quality Agent | Haiku | בודק שהתשובה מדויקת ומלאה |
| 4 | Escalation Agent | Opus / Human | מטפל בבקשות מורכבות שה-specialist לא פתר |
2. Content Pipeline
Sequential + Parallel -- research ב-parallel, כתיבה ועריכה ב-sequence:
| שלב | דפוס | סוכנים |
|---|---|---|
| 1. Research | Parallel fan-out | 3 researchers (market, tech, user) במקביל |
| 2. Outline | Sequential | Synthesizer יוצר outline מ-3 המחקרים |
| 3. Write | Sequential | Writer כותב לפי ה-outline |
| 4. Edit + SEO | Parallel | Editor + SEO optimizer במקביל |
| 5. Publish | Sequential | Publisher agent מפרסם את הגרסה הסופית |
3. Software Development Team
Supervisor pattern -- PM כ-supervisor, developers כ-workers:
| סוכן | תפקיד | Tools |
|---|---|---|
| PM (Supervisor) | מפרק requirements לtasks, מקצה, עוקב | task_manager, status_tracker |
| Architect | מתכנן design, API schemas, DB schemas | design_doc_writer |
| Developer | כותב קוד לפי ה-design | code_writer, file_manager |
| Reviewer | בודק קוד, מוצא באגים, מציע שיפורים | code_reader, lint_runner |
| Tester | כותב tests, מריץ, מדווח על כשלונות | test_runner, coverage_checker |
4. Financial Analysis
Parallel + Debate -- אנליסטים במקביל, מתווכחים על המסקנות:
| שלב | דפוס | סוכנים |
|---|---|---|
| 1. Data | Parallel | 3 data collectors (market data, filings, news) במקביל |
| 2. Analysis | Parallel | Fundamental analyst + Technical analyst במקביל |
| 3. Debate | Debate | Bull case vs Bear case -- 3 rounds |
| 4. Report | Sequential | Report writer מסנתז הכל לדוח |
הקשר ישראלי: Multi-Agent בהייטק ובסייבר
התעשייה הישראלית אימצה multi-agent patterns בצורה מהירה, במיוחד בשני תחומים:
Cybersecurity (סייבר): חברות כמו SentinelOne ו-Wiz משתמשות בארכיטקטורות multi-agent לאיתור איומים. סוכן אחד מנטר לוגים, שני מנתח anomalies, שלישי מוודא false positives, רביעי מייצר דוח threat. הדפוס הנפוץ: Parallel fan-out לניטור + Escalation כשמתגלה איום אמיתי.
Defense Tech: מערכות C4I (Command, Control, Communications, Computers, Intelligence) בתעשייה הביטחונית הישראלית משתמשות בגישות multi-agent לשילוב מידע ממקורות רבים. כל "sensor" הוא סוכן שמנתח את הדאטה שלו, ו-fusion agent מאחד את התמונה. הדפוס: Blackboard pattern עם shared situational awareness.
בחרו אחת מ-4 הארכיטקטורות למעלה ותכננו גרסה מותאמת לפרויקט שלכם. ציירו דיאגרמה (על נייר או בכלי ציור) שמראה: אילו סוכנים, מה כל אחד עושה, מה דפוס התזמור, ואיפה ה-handoffs.
טעויות נפוצות -- ואיך להימנע מהן
מה קורה: מפתח קורא על multi-agent, מתלהב, ובונה מערכת עם 6 סוכנים למשימה שסוכן אחד היה מסתדר איתה.
למה זה בעיה: Complexity, עלות, ודיבוג מיותרים. Single-agent פותר 80% מה-use cases.
הפתרון: התחילו תמיד עם סוכן אחד. עברו ל-multi-agent רק כשהסוכן הבודד מגיע לגבולות שלו -- prompts ארוכים מדי, יותר מ-5 tools, או כישורים שונים מהותית.
מה קורה: כל סוכן מעביר את כל ה-context (כל ההודעות, כל ה-tool results) לסוכן הבא.
למה זה בעיה: Context windows מתמלאים, עלויות מתנפחות, והסוכן מתבלבל ב-noise.
הפתרון: העבירו summary ממוקד -- מה נעשה, מה התוצאה, מה נשאר. 200-500 tokens מספיקים ברוב המקרים.
מה קורה: סוכן A מאציל ל-B, שמאציל ל-C, שמאציל חזרה ל-A. לולאה אינסופית.
למה זה בעיה: עלויות לא מבוקרות, latency, ולפעמים crash.
הפתרון: תמיד הגדירו MAX_DELEGATION_DEPTH (3-5). הוסיפו cycle detection. כשמגיעים לגבול -- escalate לאדם.
מה קורה: כל 5 הסוכנים רצים על Opus/Sonnet, כולל triage פשוט וlogging.
למה זה בעיה: עלות מופרזת. Triage agent לא צריך את הכוח של Opus.
הפתרון: Model tiering: Haiku/Flash לניתוב ו-QA, Sonnet ליצירה, Opus רק לניתוח מורכב. זה חוסך 50-80%.
מה קורה: המערכת מייצרת פלט שגוי ואין לכם מושג איפה הבעיה.
למה זה בעיה: בלי tracing, דיבוג multi-agent הוא guesswork. כל agent בודד נראה "בסדר" אבל התמונה הכוללת שבורה.
הפתרון: הוסיפו Correlation ID מהיום הראשון. לוגו כל decision, כל handoff, כל LLM call. השקיעו 30 דקות בtracing עכשיו, תחסכו שעות של דיבוג אחר כך.
מה קורה: בונים את כל המערכת, מריצים end-to-end, ומגלים שמשהו שבור. אבל מה?
למה זה בעיה: כשכל הסוכנים רצים יחד, קשה לבודד את הבעיה.
הפתרון: בדקו כל סוכן בנפרד עם test cases ייעודיים לפני שמחברים אותם. Unit tests לסוכנים, integration tests למערכת.
| תדירות | משימה | זמן |
|---|---|---|
| יומי | בדקו traces -- האם יש runs עם יותר מ-max_steps צפוי? Infinite loops? Agent שנבחר לא נכון? | 5 דק' |
| יומי | בדקו עלויות -- כמה עלה כל run בממוצע? יש חריגות? | 3 דק' |
| שבועי | סקרו routing accuracy -- כמה אחוז מהבקשות הגיעו ל-agent הנכון? | 10 דק' |
| שבועי | בדקו handoff quality -- האם ה-context שעובר בין agents מספיק? חסר מידע? | 10 דק' |
| חודשי | Re-evaluate architecture -- האם הדפוס הנוכחי עדיין מתאים? צריך להוסיף/להסיר agents? | 30 דק' |
| חודשי | Model tiering review -- האם יש agents שאפשר להעביר ל-model זול יותר ללא ירידה באיכות? | 15 דק' |
בנו Sequential Pipeline עם 3 סוכנים: Researcher, Writer, Editor. הריצו על נושא שמעניין אתכם. ואז הוסיפו Debate -- שימו סוכן Critic לפני ה-Editor שמבקר את הטיוטה. השוו את הפלט עם ובלי ה-Critic. ההבדל באיכות יראה לכם למה multi-agent שווה את ה-complexity.
תרגילים
בנו את אותה משימה (כתיבת מאמר על נושא) ב-3 דפוסי תזמור שונים:
- גרסה A: Sequential Pipeline (research -> write -> edit)
- גרסה B: Supervisor pattern (supervisor מנהל 3 workers)
- גרסה C: Debate pattern (writer + critic, 3 rounds + judge)
לכל גרסה מדדו:
- איכות הפלט (דרגו 1-10)
- מספר LLM calls
- עלות משוערת (בעזרת ה-Cost Estimator)
- Latency (זמן כולל)
Bonus: בנו גרסה D שמשלבת Parallel fan-out (3 researchers) + Sequential (write + edit).
בנו מערכת שירות לקוחות עם Swarm pattern:
- צרו Triage agent שמנתב לפי סוג הבקשה
- צרו 3 specialist agents לפחות (billing, technical, general)
- הוסיפו Escalation לאדם כשהסוכן לא בטוח
- הוסיפו Tracing עם Correlation ID
- בדקו עם 10 בקשות שונות -- כמה נותבו נכון?
Advanced: הוסיפו shared state שזוכר היסטוריית לקוח (מפרק 12).
קחו את ה-Supervisor Pattern מ-Section 2 והכניסו 3 באגים מכוונים:
- שנו את prompt ה-supervisor כך שלפעמים הוא מאציל ל-worker שלא קיים
- הוסיפו "infinite delegation" -- worker שמאציל חזרה ל-supervisor ללא תנאי עצירה
- הסירו את truncation של תוצאות (שלחו full context בין agents)
ואז תקנו -- בעזרת ה-Tracer. כמה זמן לוקח לכם לזהות כל באג?
בנו את ה-Deliverable הסופי של הפרק -- מערכת multi-agent מלאה:
- 4+ סוכנים עם תפקידים ברורים
- Orchestration pattern -- בחרו את המתאים (sequential, supervisor, swarm, או hybrid)
- Shared state -- כל הסוכנים ניגשים ל-state משותף עם per-agent namespaces
- Delegation -- סוכן אחד לפחות שמאציל תת-משימות, עם depth limit
- Tracing -- Correlation ID, timeline, cost tracking
- Model tiering -- לפחות 2 models שונים (Haiku + Sonnet, או דומה)
הריצו את המערכת עם 3 בקשות שונות. הציגו: timeline, cost breakdown, ותוצאות.
זה הבסיס של הפרויקט הסופי. בפרקים הבאים תוסיפו Human-in-the-Loop (14), Guardrails (14), ו-Deployment (15).
- מה ההבדל בין Sequential Pipeline ל-Supervisor Pattern? מתי תבחרו כל אחד? (רמז: deterministic vs dynamic routing)
- תארו את 4 דפוסי התקשורת בין סוכנים. מה ה-tradeoff בין coupling ל-debugging ease? (רמז: shared state = easy debug, high coupling)
- למה חשוב לשים depth limit על delegation? מה קורה בלעדיו? תנו דוגמה מספרית לעלות. (רמז: 10 delegations x $0.01 per call = ...)
- מה model tiering ואיך הוא חוסך 50-80% בעלויות? תנו דוגמה עם 3 סוכנים. (רמז: Haiku לניתוב, Sonnet ליצירה, Opus לניתוח)
- בנו בראש ארכיטקטורה multi-agent למערכת שמנתחת חדשות: כמה סוכנים, מה כל אחד, מה הדפוס? (רמז: parallel collection + debate on analysis + sequential report)
עברתם 4 מתוך 5? מצוין -- אתם מוכנים לפרק 14.
בפרק הזה עברתם מסוכן בודד למערכת של סוכנים שעובדים יחד. התחלתם עם ה-Multi-Agent Decision Matrix -- שעוזר להחליט מתי באמת צריך multi-agent ומתי סוכן אחד מספיק. למדתם 5 דפוסי תזמור: Sequential Pipeline (פשוט, צפוי), Parallel Fan-Out (מהיר, עצמאי), Supervisor (גמיש, דינמי), Swarm/Handoff (דצנטרלי, ניתוב), ו-Debate (מדויק, anti-hallucination). הבנתם 4 דפוסי תקשורת בין סוכנים -- Shared State, Message Passing, Blackboard, ו-Event-Driven -- ומתי להשתמש בכל אחד. בניתם Shared State Management עם per-agent namespaces, append-only logs, ו-thread safety. למדתם Delegation ו-Escalation עם depth limits ו-specialist routing. ראיתם איך 6 SDKs שונים מממשים multi-agent -- כל אחד עם הגישה שלו. בניתם Distributed Tracing עם Correlation IDs לדיבוג. תכננתם עלויות עם ה-Multi-Agent Cost Estimator ולמדתם model tiering שחוסך 50-80%. וראיתם 4 ארכיטקטורות מהעולם האמיתי -- customer support, content pipeline, software dev, financial analysis -- כולל ההקשר הישראלי בסייבר ו-defense tech.
הנקודה המרכזית: multi-agent הוא כלי עוצמתי, אבל לא תמיד הכלי הנכון. התחילו פשוט, עברו ל-multi-agent רק כשצריך, ותמיד מדדו -- cost, quality, latency.
בפרק הבא (פרק 14) תוסיפו את השכבה הקריטית: Human-in-the-Loop -- איך מוודאים שאדם שומר על בקרה כשהסוכנים מקבלים החלטות.
צ'קליסט -- סיכום פרק 13
- מבין/ה מתי להשתמש ב-multi-agent ומתי סוכן אחד מספיק -- ה-Decision Matrix
- יודע/ת לבנות Sequential Pipeline -- פשוט, צפוי, קל לדיבוג
- יודע/ת לבנות Parallel Fan-Out -- async/await, Promise.all, aggregation
- יודע/ת לבנות Supervisor Pattern -- manager דינמי שמאציל ל-workers
- יודע/ת לבנות Swarm / Handoff -- ניתוב דצנטרלי בין סוכנים
- יודע/ת לבנות Debate Pattern -- writer + critic + judge להפחתת hallucinations
- מבין/ה 4 דפוסי תקשורת -- Shared State, Message Passing, Blackboard, Event-Driven
- יודע/ת לנהל Shared State -- per-agent namespaces, append-only logs, thread safety
- מבין/ה Delegation ו-Escalation -- depth limits, specialist routing, cycle detection
- מכיר/ה multi-agent ב-6 SDKs -- Claude, OpenAI, Vercel, Google ADK, LangGraph, CrewAI
- יודע/ת לעשות Distributed Tracing -- Correlation IDs, timeline, error tracking
- יודע/ת לחשב ולנהל עלויות multi-agent -- Cost Estimator, model tiering, budgets
- מכיר/ה 4 ארכיטקטורות מהעולם האמיתי -- support, content, dev, finance
- מבין/ה את ההקשר הישראלי -- multi-agent בסייבר ו-defense tech
- בנית מערכת multi-agent מלאה עם 4+ סוכנים, shared state, delegation, ו-tracing