#!/usr/bin/env python3
"""
ORCHESTRATOR MCP SERVER v2.1
=============================
Model Context Protocol server for Claude Code Orchestrator Plugin.
Provides automatic orchestration capabilities - ALWAYS ON, like Serena.
This server exposes orchestrator tools that are automatically available
in every Claude Code session without requiring explicit activation.
Author: LeoDg
Version: 2.1.0-MCP
"""
import asyncio
import json
import logging
import os
import sys
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Sequence
from dataclasses import dataclass, asdict
# MCP imports
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
# =============================================================================
# CONFIGURATION
# =============================================================================
PLUGIN_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CONFIG_DIR = os.path.join(PLUGIN_DIR, "config")
AGENTS_REGISTRY = os.path.join(CONFIG_DIR, "agent-registry.json")
KEYWORD_MAPPINGS = os.path.join(CONFIG_DIR, "keyword-mappings.json")
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("orchestrator-mcp")
# =============================================================================
# TYPES & ENUMS
# =============================================================================
class ModelType(str, Enum):
HAIKU = "haiku"
SONNET = "sonnet"
OPUS = "opus"
AUTO = "auto"
class TaskPriority(str, Enum):
CRITICAL = "CRITICA"
HIGH = "ALTA"
MEDIUM = "MEDIA"
LOW = "BASSA"
class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class AgentTask:
"""Single agent task definition"""
id: str
description: str
agent_expert_file: str
model: str
specialization: str
dependencies: List[str]
priority: str
level: int
estimated_time: float
estimated_cost: float
@dataclass
class ExecutionPlan:
"""Complete execution plan for orchestration"""
session_id: str
tasks: List[AgentTask]
parallel_batches: List[List[str]]
total_agents: int
estimated_time: float
estimated_cost: float
complexity: str
domains: List[str]
@dataclass
class OrchestrationSession:
"""Active orchestration session"""
session_id: str
user_request: str
status: TaskStatus
plan: Optional[ExecutionPlan]
started_at: datetime
completed_at: Optional[datetime]
results: List[Dict[str, Any]]
# =============================================================================
# KEYWORD MAPPINGS (from orchestrator-core.ts)
# =============================================================================
KEYWORD_TO_EXPERT_MAPPING = {
# GUI Domain
'gui': 'experts/gui-super-expert.md',
'pyqt5': 'experts/gui-super-expert.md',
'qt': 'experts/gui-super-expert.md',
'tab': 'experts/gui-super-expert.md',
'widget': 'experts/gui-super-expert.md',
'dialog': 'experts/gui-super-expert.md',
'layout': 'experts/gui-super-expert.md',
'ui': 'experts/gui-super-expert.md',
'interface': 'experts/gui-super-expert.md',
# Database Domain
'database': 'experts/database_expert.md',
'sql': 'experts/database_expert.md',
'sqlite': 'experts/database_expert.md',
'postgresql': 'experts/database_expert.md',
'query': 'experts/database_expert.md',
'schema': 'experts/database_expert.md',
'migration': 'experts/database_expert.md',
# Security Domain
'security': 'experts/security_unified_expert.md',
'auth': 'experts/security_unified_expert.md',
'authentication': 'experts/security_unified_expert.md',
'encryption': 'experts/security_unified_expert.md',
'jwt': 'experts/security_unified_expert.md',
'password': 'experts/security_unified_expert.md',
# API Integration
'api': 'experts/integration_expert.md',
'telegram': 'experts/integration_expert.md',
'ctrader': 'experts/integration_expert.md',
'rest': 'experts/integration_expert.md',
'webhook': 'experts/integration_expert.md',
'integration': 'experts/integration_expert.md',
# MQL Domain
'mql': 'experts/mql_expert.md',
'mql5': 'experts/mql_expert.md',
'mql4': 'experts/mql_expert.md',
'ea': 'experts/mql_expert.md',
'metatrader': 'experts/mql_expert.md',
# Trading Domain
'trading': 'experts/trading_strategy_expert.md',
# Architecture Domain
'architettura': 'experts/architect_expert.md',
'architecture': 'experts/architect_expert.md',
'design pattern': 'experts/architect_expert.md',
'refactor': 'experts/architect_expert.md',
# Testing & Debug
'test': 'experts/tester_expert.md',
'debug': 'experts/tester_expert.md',
'bug': 'experts/tester_expert.md',
'fix': 'experts/tester_expert.md',
# DevOps
'devops': 'experts/devops_expert.md',
'deploy': 'experts/devops_expert.md',
'docker': 'experts/devops_expert.md',
'git': 'experts/devops_expert.md',
# Languages
'python': 'experts/languages_expert.md',
'javascript': 'experts/languages_expert.md',
'c#': 'experts/languages_expert.md',
'coding': 'experts/languages_expert.md',
# Core Functions
'cerca': 'core/analyzer.md',
'trova': 'core/analyzer.md',
'esplora': 'core/analyzer.md',
'analizza': 'core/analyzer.md',
'implementa': 'core/coder.md',
'feature': 'core/coder.md',
'codifica': 'core/coder.md',
'review': 'core/reviewer.md',
'valida': 'core/reviewer.md',
'documenta': 'core/documenter.md',
'docs': 'core/documenter.md',
}
EXPERT_TO_MODEL_MAPPING = {
'experts/gui-super-expert.md': 'sonnet',
'experts/database_expert.md': 'sonnet',
'experts/security_unified_expert.md': 'sonnet',
'experts/integration_expert.md': 'sonnet',
'experts/mql_expert.md': 'sonnet',
'experts/trading_strategy_expert.md': 'sonnet',
'experts/architect_expert.md': 'opus',
'experts/tester_expert.md': 'sonnet',
'experts/devops_expert.md': 'haiku',
'experts/languages_expert.md': 'sonnet',
'core/analyzer.md': 'haiku',
'core/coder.md': 'sonnet',
'core/reviewer.md': 'sonnet',
'core/documenter.md': 'haiku',
}
EXPERT_TO_PRIORITY_MAPPING = {
'experts/security_unified_expert.md': 'CRITICA',
'core/documenter.md': 'CRITICA',
'experts/gui-super-expert.md': 'ALTA',
'experts/database_expert.md': 'ALTA',
'experts/integration_expert.md': 'ALTA',
'experts/mql_expert.md': 'ALTA',
'experts/architect_expert.md': 'ALTA',
'experts/tester_expert.md': 'ALTA',
'core/analyzer.md': 'ALTA',
'core/coder.md': 'MEDIA',
'core/reviewer.md': 'MEDIA',
'experts/devops_expert.md': 'MEDIA',
}
SPECIALIZATION_DESCRIPTIONS = {
'experts/gui-super-expert.md': 'PyQt5, Qt, UI, Widget, Tab, Dialog, Layout',
'experts/database_expert.md': 'SQLite, PostgreSQL, Schema, Query, Migration',
'experts/security_unified_expert.md': 'Security, Encryption, Auth, JWT, OWASP',
'experts/integration_expert.md': 'API, Telegram, cTrader, REST, Webhook',
'experts/mql_expert.md': 'MQL5, MQL4, MetaTrader, EA',
'experts/trading_strategy_expert.md': 'Trading, Risk Management, Position Sizing',
'experts/architect_expert.md': 'Architettura, Design Pattern, Microservizi',
'experts/tester_expert.md': 'Testing, QA, Debug, Performance',
'experts/devops_expert.md': 'DevOps, CI/CD, Deploy, Docker',
'core/coder.md': 'Coding generale, implementazione feature',
'core/analyzer.md': 'Analisi codice, ricerca, esplorazione',
'core/reviewer.md': 'Code review, validazione, quality check',
'core/documenter.md': 'Documentation, technical writing, README',
}
# =============================================================================
# ORCHESTRATOR ENGINE
# =============================================================================
class OrchestratorEngine:
"""Core orchestration engine - ported from TypeScript"""
def __init__(self):
self.sessions: Dict[str, OrchestrationSession] = {}
logger.info("Orchestrator Engine initialized")
def analyze_request(self, user_request: str) -> Dict[str, Any]:
"""Analyze user request and extract keywords/domains"""
request_lower = user_request.lower()
found_keywords = []
found_domains = set()
for keyword, expert_file in KEYWORD_TO_EXPERT_MAPPING.items():
if keyword in request_lower:
found_keywords.append(keyword)
if 'gui' in expert_file:
found_domains.add('GUI')
elif 'database' in expert_file:
found_domains.add('Database')
elif 'security' in expert_file:
found_domains.add('Security')
elif 'integration' in expert_file:
found_domains.add('API')
elif 'mql' in expert_file:
found_domains.add('MQL')
elif 'trading' in expert_file:
found_domains.add('Trading')
elif 'architect' in expert_file:
found_domains.add('Architecture')
elif 'tester' in expert_file:
found_domains.add('Testing')
elif 'devops' in expert_file:
found_domains.add('DevOps')
# Determine complexity
domain_count = len(found_domains)
word_count = len(user_request.split())
if domain_count <= 1 and word_count <= 10:
complexity = "bassa"
elif domain_count <= 2 and word_count <= 20:
complexity = "media"
else:
complexity = "alta"
return {
"keywords": found_keywords,
"domains": list(found_domains),
"complexity": complexity,
"is_multi_domain": domain_count > 1,
"word_count": word_count
}
def generate_execution_plan(self, user_request: str) -> ExecutionPlan:
"""Generate complete execution plan for orchestration"""
session_id = str(uuid.uuid4())[:8]
analysis = self.analyze_request(user_request)
# Generate tasks from keywords
tasks = []
used_experts = set()
task_counter = 1
for keyword in analysis["keywords"]:
expert_file = KEYWORD_TO_EXPERT_MAPPING.get(keyword)
if expert_file and expert_file not in used_experts:
used_experts.add(expert_file)
model = EXPERT_TO_MODEL_MAPPING.get(expert_file, 'sonnet')
priority = EXPERT_TO_PRIORITY_MAPPING.get(expert_file, 'MEDIA')
specialization = SPECIALIZATION_DESCRIPTIONS.get(
expert_file, 'Specializzazione generale'
)
task = AgentTask(
id=f"T{task_counter}",
description=f"Work on {keyword} for: {user_request}",
agent_expert_file=expert_file,
model=model,
specialization=specialization,
dependencies=[],
priority=priority,
level=1,
estimated_time=2.5,
estimated_cost=0.25 if model == 'opus' else 0.08 if model == 'sonnet' else 0.02
)
tasks.append(task)
task_counter += 1
# Fallback if no tasks
if not tasks:
tasks.append(AgentTask(
id="T1",
description=f"Implement: {user_request}",
agent_expert_file="core/coder.md",
model="sonnet",
specialization="Coding generale",
dependencies=[],
priority="MEDIA",
level=1,
estimated_time=2.5,
estimated_cost=0.08
))
task_counter = 2
# Always add documenter as final task (RULE #5)
documenter_deps = [t.id for t in tasks]
tasks.append(AgentTask(
id=f"T{task_counter}",
description="Document all changes (RULE #5)",
agent_expert_file="core/documenter.md",
model="haiku",
specialization="Documentation, technical writing",
dependencies=documenter_deps,
priority="CRITICA",
level=1,
estimated_time=1.0,
estimated_cost=0.02
))
# Calculate parallel batches
work_tasks = [t for t in tasks if "documenter" not in t.agent_expert_file]
parallel_batches = [[t.id for t in work_tasks]]
total_time = sum(t.estimated_time for t in work_tasks) / len(work_tasks) + 1.0
total_cost = sum(t.estimated_cost for t in tasks)
plan = ExecutionPlan(
session_id=session_id,
tasks=tasks,
parallel_batches=parallel_batches,
total_agents=len(tasks),
estimated_time=total_time,
estimated_cost=total_cost,
complexity=analysis["complexity"],
domains=analysis["domains"]
)
# Create session
self.sessions[session_id] = OrchestrationSession(
session_id=session_id,
user_request=user_request,
status=TaskStatus.PENDING,
plan=plan,
started_at=datetime.now(),
completed_at=None,
results=[]
)
return plan
def format_plan_table(self, plan: ExecutionPlan) -> str:
"""Format execution plan as table"""
lines = [
"π― ORCHESTRATOR v2.1 - MCP MODE (ALWAYS ON)",
"",
"π EXECUTION PLAN",
f"ββ Session ID: {plan.session_id}",
f"ββ Domains: {', '.join(plan.domains) if plan.domains else 'General'}",
f"ββ Complexity: {plan.complexity}",
f"ββ Total Agents: {plan.total_agents}",
f"ββ Est. Time: {plan.estimated_time:.1f} min",
f"ββ Est. Cost: ${plan.estimated_cost:.2f}",
"",
"π€ AGENT TABLE",
"| # | Task | Expert File | Model | Priority | Status |",
"|---|------|-------------|-------|----------|--------|"
]
for task in plan.tasks:
deps = ", ".join(task.dependencies) if task.dependencies else "-"
status = "β³ PENDING"
lines.append(
f"| {task.id} | {task.description[:30]} | {task.agent_expert_file} | "
f"{task.model} | {task.priority} | {status} |"
)
lines.append("")
lines.append("β‘ EXECUTION STRATEGY:")
lines.append(f"ββ Parallel execution: {len(plan.parallel_batches)} batch(es)")
lines.append(f"ββ Max concurrent agents: {max(len(b) for b in plan.parallel_batches)}")
lines.append(f"ββ Documenter task: T{len(plan.tasks)} (always last - RULE #5)")
return "\n".join(lines)
def get_session(self, session_id: str) -> Optional[OrchestrationSession]:
"""Get session by ID"""
return self.sessions.get(session_id)
def list_sessions(self, limit: int = 10) -> List[Dict[str, Any]]:
"""List recent sessions"""
sessions = list(self.sessions.values())
sessions.sort(key=lambda s: s.started_at, reverse=True)
return [
{
"session_id": s.session_id,
"user_request": s.user_request,
"status": s.status.value,
"started_at": s.started_at.isoformat(),
"tasks_count": len(s.plan.tasks) if s.plan else 0
}
for s in sessions[:limit]
]
def get_available_agents(self) -> List[Dict[str, Any]]:
"""Get list of all available expert agents"""
agents = []
seen_experts = set()
for keyword, expert_file in KEYWORD_TO_EXPERT_MAPPING.items():
if expert_file not in seen_experts:
seen_experts.add(expert_file)
agents.append({
"keyword": keyword,
"expert_file": expert_file,
"model": EXPERT_TO_MODEL_MAPPING.get(expert_file, "sonnet"),
"priority": EXPERT_TO_PRIORITY_MAPPING.get(expert_file, "MEDIA"),
"specialization": SPECIALIZATION_DESCRIPTIONS.get(
expert_file, "General"
)
})
return agents
# Global engine instance
engine = OrchestratorEngine()
# =============================================================================
# MCP SERVER SETUP
# =============================================================================
server = Server("orchestrator-mcp")
@server.list_resources()
async def handle_list_resources() -> list[str]:
"""List available resources"""
return [
"orchestrator://sessions",
"orchestrator://agents",
"orchestrator://config"
]
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a resource"""
if uri == "orchestrator://sessions":
sessions = engine.list_sessions()
return json.dumps(sessions, indent=2)
elif uri == "orchestrator://agents":
agents = engine.get_available_agents()
return json.dumps(agents, indent=2)
elif uri == "orchestrator://config":
return json.dumps({
"version": "2.1.0-MCP",
"max_parallel_agents": 64,
"default_model": "auto",
"auto_orchestrate": True
}, indent=2)
else:
raise ValueError(f"Unknown resource: {uri}")
@server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available MCP tools"""
return [
Tool(
name="orchestrator_analyze",
description="Analyze a request and generate execution plan without executing",
inputSchema={
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "The user request to analyze"
},
"show_table": {
"type": "boolean",
"description": "Show execution plan table",
"default": True
}
},
"required": ["request"]
}
),
Tool(
name="orchestrator_execute",
description="Execute orchestration plan (generates plan for Task tool execution)",
inputSchema={
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "The user request to orchestrate"
},
"parallel": {
"type": "number",
"description": "Max parallel agents (1-64)",
"default": 6,
"minimum": 1,
"maximum": 64
},
"model": {
"type": "string",
"description": "Force specific model",
"enum": ["auto", "haiku", "sonnet", "opus"],
"default": "auto"
}
},
"required": ["request"]
}
),
Tool(
name="orchestrator_status",
description="Get status of an orchestration session",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "Session ID to check (leave empty for latest)"
}
}
}
),
Tool(
name="orchestrator_agents",
description="List all available expert agents",
inputSchema={
"type": "object",
"properties": {
"filter": {
"type": "string",
"description": "Filter by domain or keyword (optional)"
}
}
}
),
Tool(
name="orchestrator_list",
description="List recent orchestration sessions",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "number",
"description": "Max sessions to return",
"default": 10,
"minimum": 1,
"maximum": 50
}
}
}
),
Tool(
name="orchestrator_preview",
description="Preview orchestration with detailed task breakdown",
inputSchema={
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "Request to preview"
}
},
"required": ["request"]
}
),
Tool(
name="orchestrator_cancel",
description="Cancel an active orchestration session",
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "Session ID to cancel"
}
},
"required": ["session_id"]
}
),
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> List[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls"""
try:
if name == "orchestrator_analyze":
request = arguments.get("request", "")
show_table = arguments.get("show_table", True)
if not request:
return [TextContent(
type="text",
text="β Error: 'request' parameter is required"
)]
plan = engine.generate_execution_plan(request)
output = f"""π― ORCHESTRATOR ANALYSIS COMPLETE
π ANALYSIS SUMMARY
ββ Request: {request}
ββ Session ID: {plan.session_id}
ββ Domains: {', '.join(plan.domains) if plan.domains else 'General'}
ββ Complexity: {plan.complexity}
ββ Total Tasks: {plan.total_agents}
ββ Est. Time: {plan.estimated_time:.1f} min
ββ Est. Cost: ${plan.estimated_cost:.2f}
ββ Parallel Batches: {len(plan.parallel_batches)}
"""
if show_table:
output += "\n" + engine.format_plan_table(plan)
return [TextContent(type="text", text=output)]
elif name == "orchestrator_execute":
request = arguments.get("request", "")
parallel = arguments.get("parallel", 6)
model = arguments.get("model", "auto")
if not request:
return [TextContent(
type="text",
text="β Error: 'request' parameter is required"
)]
plan = engine.generate_execution_plan(request)
output = f"""π ORCHESTRATOR v2.1 - EXECUTION MODE
β‘ ALWAYS ON - Like Serena MCP
π EXECUTION PREPARED
ββ Session ID: {plan.session_id}
ββ Parallelism: {parallel} agents max
ββ Model Override: {model}
ββ Total Tasks: {plan.total_agents}
{engine.format_plan_table(plan)}
π NEXT STEP: Use Task tool to launch agents with this plan:
The following agents should be launched in parallel:
"""
for task in plan.tasks:
if "documenter" not in task.agent_expert_file:
output += f"\n [{task.id}] {task.description}\n"
output += f" β Expert: {task.agent_expert_file}\n"
output += f" β Model: {task.model}\n"
output += f"\n Finally (after all complete):\n"
doc_task = plan.tasks[-1]
output += f" [{doc_task.id}] {doc_task.description}\n"
output += f" β Expert: {doc_task.agent_expert_file}\n"
return [TextContent(type="text", text=output)]
elif name == "orchestrator_status":
session_id = arguments.get("session_id", "")
if session_id:
session = engine.get_session(session_id)
if not session:
return [TextContent(
type="text",
text=f"β Session '{session_id}' not found"
)]
output = f"""π SESSION STATUS: {session.session_id}
ββ Request: {session.user_request}
ββ Status: {session.status.value}
ββ Started: {session.started_at.isoformat()}
ββ Tasks: {len(session.plan.tasks) if session.plan else 0}
ββ Est. Cost: ${session.plan.estimated_cost:.2f if session.plan else 0.00}
"""
else:
sessions = engine.list_sessions(5)
if not sessions:
return [TextContent(
type="text",
text="π No recent sessions found"
)
output = "π RECENT SESSIONS\n\n"
for s in sessions:
output += f"ββ {s['session_id']}: {s['user_request'][:40]}...\n"
output += f" β Status: {s['status']} | Tasks: {s['tasks_count']}\n"
return [TextContent(type="text", text=output)]
elif name == "orchestrator_agents":
filter_kw = arguments.get("filter", "").lower()
agents = engine.get_available_agents()
if filter_kw:
agents = [
a for a in agents
if filter_kw in a["expert_file"].lower() or
filter_kw in a["keyword"].lower() or
filter_kw in a["specialization"].lower()
]
output = f"π€ AVAILABLE EXPERT AGENTS ({len(agents)} total)\n\n"
output += "| Keyword | Expert File | Model | Priority | Specialization |\n"
output += "|---------|-------------|-------|----------|----------------|\n"
for agent in agents:
output += f"| {agent['keyword']} | {agent['expert_file']} | {agent['model']} | {agent['priority']} | {agent['specialization'][:40]}... |\n"
return [TextContent(type="text", text=output)]
elif name == "orchestrator_list":
limit = min(arguments.get("limit", 10), 50)
sessions = engine.list_sessions(limit)
output = f"π RECENT ORCHESTRATION SESSIONS (max {limit})\n\n"
if not sessions:
output += "No sessions found yet. Use orchestrator_analyze or orchestrator_execute first."
else:
for s in sessions:
output += f"ββ [{s['session_id']}] {s['user_request'][:50]}\n"
output += f"β ββ Status: {s['status']} | Tasks: {s['tasks_count']} | {s['started_at']}\n"
return [TextContent(type="text", text=output)]
elif name == "orchestrator_preview":
request = arguments.get("request", "")
if not request:
return [TextContent(
type="text",
text="β Error: 'request' parameter is required"
)]
plan = engine.generate_execution_plan(request)
analysis = engine.analyze_request(request)
output = f"""π ORCHESTRATOR PREVIEW MODE
{'=' * 50}
π REQUEST ANALYSIS
ββ Input: "{request}"
ββ Keywords Found: {', '.join(analysis['keywords']) if analysis['keywords'] else 'None - will use fallback'}
ββ Domains: {', '.join(analysis['domains']) if analysis['domains'] else 'General'}
ββ Complexity: {analysis['complexity']}
ββ Multi-Domain: {'Yes' if analysis['is_multi_domain'] else 'No'}
π€ TASK BREAKDOWN
{'=' * 50}
Work Tasks (Parallel):
"""
work_tasks = [t for t in plan.tasks if "documenter" not in t.agent_expert_file]
for i, task in enumerate(work_tasks, 1):
output += f"""
[{task.id}] {task.description}
ββ Expert: {task.agent_expert_file}
ββ Model: {task.model}
ββ Priority: {task.priority}
ββ Specialization: {task.specialization}
ββ Est: {task.estimated_time}m / ${task.estimated_cost:.2f}
"""
doc_task = plan.tasks[-1]
output += f"""
Final Task (Sequential):
[{doc_task.id}] {doc_task.description}
ββ Expert: {doc_task.agent_expert_file}
ββ Model: {doc_task.model}
ββ Est: {doc_task.estimated_time}m / ${doc_task.estimated_cost:.2f}
π SUMMARY
ββ Total Agents: {plan.total_agents}
ββ Parallel Tasks: {len(work_tasks)}
ββ Est. Total Time: {plan.estimated_time:.1f} min
ββ Est. Total Cost: ${plan.estimated_cost:.2f}
ββ Session ID: {plan.session_id}
"""
return [TextContent(type="text", text=output)]
elif name == "orchestrator_cancel":
session_id = arguments.get("session_id", "")
if not session_id:
return [TextContent(
type="text",
text="β Error: 'session_id' parameter is required"
)]
session = engine.get_session(session_id)
if not session:
return [TextContent(
type="text",
text=f"β Session '{session_id}' not found"
)]
session.status = TaskStatus.CANCELLED
session.completed_at = datetime.now()
return [TextContent(
type="text",
text=f"β
Session {session_id} cancelled successfully"
)]
else:
return [TextContent(
type="text",
text=f"β Unknown tool: {name}"
)]
except Exception as e:
logger.exception(f"Error executing tool {name}")
return [TextContent(
type="text",
text=f"β Error executing {name}: {str(e)}"
)]
# =============================================================================
# MAIN ENTRY POINT
# =============================================================================
async def main():
"""Main entry point for MCP server"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="orchestrator-mcp",
server_version="2.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
if __name__ == "__main__":
asyncio.run(main())