#!/usr/bin/env python3
# flake8: noqa
"""
Council MCP Server - Single File Bundle
MCP server that enables Claude to collaborate with multiple AI models via OpenRouter.
This version combines all modular components into a single deployable file.
Generated by bundler.
"""
import asyncio
import collections
import hashlib
import importlib
import inspect
import json
import logging
import os
import sys
import time
from abc import ABC, abstractmethod
from collections import OrderedDict, deque
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type
# OpenRouter uses OpenAI-compatible API
try:
from openai import OpenAI
except ImportError:
OpenAI = None
# Keep google SDK as optional fallback
try:
import google.generativeai as genai
from google.api_core import exceptions as google_exceptions
except ImportError:
genai = None
google_exceptions = None
# HTTP client for model discovery
try:
import httpx
except ImportError:
httpx = None
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
# Create logger without configuring (main() will configure)
logger = logging.getLogger("council-mcp")
__version__ = "4.0.0"
# Global model manager instance (will be set by server)
model_manager = None
# Create council namespace for bundled mode
class _Council:
_server_instance = None
council = _Council()
# ========== Standalone JSON-RPC 2.0 implementation for MCP servers. Based on Gemini's rec... ==========
from typing import Any, Callable, Dict, Optional
# JSON-RPC 2.0 constants
JSONRPC_VERSION = "2.0"
ERROR_PARSE = -32700
ERROR_INVALID_REQUEST = -32600
ERROR_METHOD_NOT_FOUND = -32601
ERROR_INVALID_PARAMS = -32602
ERROR_INTERNAL = -32603
class JsonRpcRequest:
"""JSON-RPC 2.0 Request"""
def __init__(self, data: dict):
self.jsonrpc = data.get("jsonrpc", JSONRPC_VERSION)
self.method = data.get("method")
self.params = data.get("params", {})
self.id = data.get("id")
# Validate
if self.jsonrpc != JSONRPC_VERSION:
raise ValueError(f"Invalid JSON-RPC version: {self.jsonrpc}")
if not self.method:
raise ValueError("Missing method")
class JsonRpcResponse:
"""JSON-RPC 2.0 Response"""
def __init__(self, result: Any = None, error: Optional[Dict[str, Any]] = None, id: Any = None):
self.jsonrpc = JSONRPC_VERSION
self.id = id
if error is not None:
self.error = error
else:
self.result = result
def to_dict(self) -> dict:
d = {"jsonrpc": self.jsonrpc, "id": self.id}
if hasattr(self, "error"):
d["error"] = self.error
else:
d["result"] = self.result if hasattr(self, "result") else None
return d
class JsonRpcError:
"""JSON-RPC 2.0 Error"""
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
def to_dict(self) -> dict:
d = {"code": self.code, "message": self.message}
if self.data is not None:
d["data"] = self.data
return d
class JsonRpcServer:
"""
A synchronous JSON-RPC 2.0 server over stdio.
Compatible with MCP protocol expectations.
"""
def __init__(self, server_name: str):
self.server_name = server_name
self._handlers: Dict[str, Callable] = {}
self._running = False
def register_handler(self, method: str, handler: Callable):
"""Register a handler for a JSON-RPC method."""
logger.info(f"Registering handler for method: {method}")
self._handlers[method] = handler
def _read_message(self) -> Optional[str]:
"""Read a single line from stdin."""
try:
line = sys.stdin.readline()
if not line:
return None
return line.strip()
except Exception as e:
logger.error(f"Error reading from stdin: {e}")
return None
def _write_message(self, message: dict):
"""Write a JSON message to stdout."""
try:
print(json.dumps(message), flush=True)
except Exception as e:
logger.error(f"Error writing to stdout: {e}")
def _process_request(self, request_str: str) -> Optional[dict]:
"""Process a single JSON-RPC request."""
request_id = None
try:
# Parse JSON
try:
request_data = json.loads(request_str)
except json.JSONDecodeError as e:
return JsonRpcResponse(
error=JsonRpcError(ERROR_PARSE, f"Parse error: {e}").to_dict()
).to_dict()
# Parse request
try:
request = JsonRpcRequest(request_data)
request_id = request.id
except ValueError as e:
return JsonRpcResponse(
error=JsonRpcError(ERROR_INVALID_REQUEST, str(e)).to_dict(), id=request_id
).to_dict()
# Find handler
handler = self._handlers.get(request.method) if request.method else None
if not handler:
return JsonRpcResponse(
error=JsonRpcError(
ERROR_METHOD_NOT_FOUND, f"Method not found: {request.method}"
).to_dict(),
id=request_id,
).to_dict()
# Execute handler
try:
result = handler(request_id, request.params)
# Handler returns a complete response dict
return result
except Exception as e:
logger.error(f"Handler error for {request.method}: {e}", exc_info=True)
return JsonRpcResponse(
error=JsonRpcError(ERROR_INTERNAL, f"Internal error: {str(e)}").to_dict(),
id=request_id,
).to_dict()
except Exception as e:
logger.error(f"Unexpected error processing request: {e}", exc_info=True)
return JsonRpcResponse(
error=JsonRpcError(ERROR_INTERNAL, f"Internal error: {str(e)}").to_dict(),
id=request_id,
).to_dict()
def run(self):
"""Run the JSON-RPC server (synchronous)."""
logger.info(f"Starting JSON-RPC server '{self.server_name}'...")
self._running = True
while self._running:
try:
# Read a line
line = self._read_message()
if line is None:
logger.info("EOF reached, shutting down")
break
if not line:
continue
# Process the request
response = self._process_request(line)
# Send response if any
if response:
self._write_message(response)
except KeyboardInterrupt:
logger.info("Keyboard interrupt, shutting down")
break
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
continue
logger.info("Server stopped")
def stop(self):
"""Stop the server."""
self._running = False
# MCP-compatible type definitions (simple dicts instead of Pydantic)
def create_text_content(text: str) -> dict:
"""Create a text content object."""
return {"type": "text", "text": text}
def create_tool(name: str, description: str, input_schema: dict) -> dict:
"""Create a tool definition."""
return {"name": name, "description": description, "inputSchema": input_schema}
def create_error_response(request_id: Any, code: int, message: str) -> dict:
"""Create an error response."""
return {
"jsonrpc": JSONRPC_VERSION,
"id": request_id,
"error": {"code": code, "message": message},
}
def create_result_response(request_id: Any, result: Any) -> dict:
"""Create a result response."""
return {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result}
# ========== Base classes for LLM providers. ==========
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class LLMResponse:
"""Response from an LLM provider."""
content: str
model: str
usage: dict[str, int] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class ModelInfo:
"""Information about an available model."""
id: str
name: str
provider: str
context_length: int = 0
pricing: dict[str, float] = field(default_factory=dict)
capabilities: list[str] = field(default_factory=list)
is_free: bool = False
@classmethod
def from_openrouter(cls, data: dict[str, Any]) -> "ModelInfo":
"""Create ModelInfo from OpenRouter API response."""
model_id = data.get("id", "")
# Extract provider from model ID (e.g., "google/gemini-3-pro-preview" -> "google")
provider = model_id.split("/")[0] if "/" in model_id else "unknown"
# Check if it's a free model (ends with ":free")
is_free = model_id.endswith(":free")
# Extract capabilities from architecture or description
capabilities = []
arch = data.get("architecture", {})
if arch.get("modality") == "text+image->text":
capabilities.append("vision")
if "function" in str(data.get("description", "")).lower():
capabilities.append("function_calling")
if "code" in str(data.get("description", "")).lower():
capabilities.append("code")
return cls(
id=model_id,
name=data.get("name", model_id),
provider=provider,
context_length=data.get("context_length", 0),
pricing={
"prompt": data.get("pricing", {}).get("prompt", 0),
"completion": data.get("pricing", {}).get("completion", 0),
},
capabilities=capabilities,
is_free=is_free,
)
class LLMProvider(ABC):
"""Abstract base class for LLM providers."""
@property
@abstractmethod
def name(self) -> str:
"""Return the provider name."""
...
@abstractmethod
def generate(
self,
prompt: str,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs: Any,
) -> LLMResponse:
"""Generate a response from the LLM.
Args:
prompt: The prompt to send to the model.
model: The model to use. If None, uses the default model.
temperature: Sampling temperature (0.0-2.0).
max_tokens: Maximum tokens to generate.
**kwargs: Additional provider-specific parameters.
Returns:
LLMResponse containing the generated content and metadata.
Raises:
LLMProviderError: If the generation fails.
"""
...
@abstractmethod
def list_models(self) -> list[ModelInfo]:
"""List available models from this provider.
Returns:
List of ModelInfo objects describing available models.
"""
...
@abstractmethod
def is_available(self) -> bool:
"""Check if the provider is available and configured.
Returns:
True if the provider can be used, False otherwise.
"""
...
class LLMProviderError(Exception):
"""Base exception for LLM provider errors."""
def __init__(
self,
message: str,
provider: str = "",
model: str = "",
is_retryable: bool = False,
):
super().__init__(message)
self.provider = provider
self.model = model
self.is_retryable = is_retryable
class RateLimitError(LLMProviderError):
"""Raised when rate limited by the provider."""
def __init__(self, message: str, provider: str = "", model: str = ""):
super().__init__(message, provider, model, is_retryable=True)
class AuthenticationError(LLMProviderError):
"""Raised when authentication fails."""
def __init__(self, message: str, provider: str = "", model: str = ""):
super().__init__(message, provider, model, is_retryable=False)
class ModelNotFoundError(LLMProviderError):
"""Raised when the requested model is not found."""
def __init__(self, message: str, provider: str = "", model: str = ""):
super().__init__(message, provider, model, is_retryable=False)
# ========== OpenRouter LLM provider implementation. ==========
import os
from typing import Any, Optional
import httpx
from openai import OpenAI
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
OPENROUTER_MODELS_URL = "https://openrouter.ai/api/v1/models"
class OpenRouterProvider(LLMProvider):
"""LLM provider using OpenRouter API."""
def __init__(
self,
api_key: Optional[str] = None,
default_model: str = "google/gemini-3-pro-preview",
timeout: float = 600.0,
app_name: str = "council-mcp",
):
"""Initialize the OpenRouter provider.
Args:
api_key: OpenRouter API key. If None, reads from OPENROUTER_API_KEY env var.
default_model: Default model to use for generation.
timeout: Request timeout in seconds.
app_name: Application name for OpenRouter headers.
"""
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
self.default_model = default_model
self.timeout = timeout
self.app_name = app_name
self._client: Optional[OpenAI] = None
self._models_cache: Optional[list[ModelInfo]] = None
@property
def name(self) -> str:
"""Return the provider name."""
return "openrouter"
@property
def client(self) -> OpenAI:
"""Get or create the OpenAI client configured for OpenRouter."""
if self._client is None:
if not self.api_key:
raise AuthenticationError(
"OpenRouter API key not configured. "
"Set OPENROUTER_API_KEY environment variable.",
provider=self.name,
)
self._client = OpenAI(
base_url=OPENROUTER_BASE_URL,
api_key=self.api_key,
timeout=self.timeout,
default_headers={
"HTTP-Referer": f"https://github.com/lbds137/{self.app_name}",
"X-Title": self.app_name,
},
)
return self._client
def generate(
self,
prompt: str,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs: Any,
) -> LLMResponse:
"""Generate a response using OpenRouter.
Args:
prompt: The prompt to send to the model.
model: The model to use. If None, uses the default model.
temperature: Sampling temperature (0.0-2.0).
max_tokens: Maximum tokens to generate.
**kwargs: Additional parameters passed to the API.
Returns:
LLMResponse containing the generated content and metadata.
Raises:
LLMProviderError: If the generation fails.
"""
model_id = model or self.default_model
logger.info(f"Generating with OpenRouter model: {model_id}")
try:
response = self.client.chat.completions.create(
model=model_id,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens,
**kwargs,
)
content = response.choices[0].message.content or ""
usage = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
}
logger.info(f"OpenRouter response received from {model_id}")
return LLMResponse(
content=content,
model=model_id,
usage=usage,
metadata={"id": response.id, "created": response.created},
)
except Exception as e:
error_msg = str(e)
logger.error(f"OpenRouter error: {error_msg}")
# Parse error type and raise appropriate exception
if "rate" in error_msg.lower() or "429" in error_msg:
raise RateLimitError(error_msg, provider=self.name, model=model_id)
elif "auth" in error_msg.lower() or "401" in error_msg or "403" in error_msg:
raise AuthenticationError(error_msg, provider=self.name, model=model_id)
elif "not found" in error_msg.lower() or "404" in error_msg:
raise ModelNotFoundError(error_msg, provider=self.name, model=model_id)
else:
raise LLMProviderError(
error_msg,
provider=self.name,
model=model_id,
is_retryable="timeout" in error_msg.lower(),
)
def list_models(self, force_refresh: bool = False) -> list[ModelInfo]:
"""List available models from OpenRouter.
Args:
force_refresh: If True, bypass the cache and fetch fresh data.
Returns:
List of ModelInfo objects describing available models.
"""
if self._models_cache is not None and not force_refresh:
return self._models_cache
logger.info("Fetching models from OpenRouter")
try:
response = httpx.get(
OPENROUTER_MODELS_URL,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
models = [ModelInfo.from_openrouter(m) for m in data.get("data", [])]
self._models_cache = models
logger.info(f"Fetched {len(models)} models from OpenRouter")
return models
except Exception as e:
logger.error(f"Failed to fetch models from OpenRouter: {e}")
return self._models_cache or []
def is_available(self) -> bool:
"""Check if the OpenRouter provider is available.
Returns:
True if the API key is configured, False otherwise.
"""
return bool(self.api_key)
def get_model_info(self, model_id: str) -> Optional[ModelInfo]:
"""Get information about a specific model.
Args:
model_id: The model ID to look up.
Returns:
ModelInfo for the model, or None if not found.
"""
models = self.list_models()
for model in models:
if model.id == model_id:
return model
return None
# ========== Model caching with TTL for Council MCP server. ==========
import os
import time
from typing import Optional
import httpx
OPENROUTER_MODELS_URL = "https://openrouter.ai/api/v1/models"
class ModelCache:
"""Cache for OpenRouter model information with TTL.
This class provides:
- In-memory caching of model information
- Configurable TTL (time-to-live)
- Automatic refresh when cache expires
- Thread-safe access (through atomic operations)
"""
def __init__(
self,
api_key: Optional[str] = None,
ttl_seconds: float = 3600.0, # 1 hour default
timeout: float = 30.0,
):
"""Initialize the model cache.
Args:
api_key: OpenRouter API key. If None, reads from OPENROUTER_API_KEY.
ttl_seconds: Cache TTL in seconds. Default is 1 hour.
timeout: Request timeout in seconds for fetching models.
"""
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
self.ttl_seconds = ttl_seconds or float(os.getenv("COUNCIL_CACHE_TTL", "3600"))
self.timeout = timeout
self._models: list[ModelInfo] = []
self._last_fetch: float = 0
self._fetch_count: int = 0
@property
def is_expired(self) -> bool:
"""Check if the cache has expired."""
if not self._models:
return True
return time.time() - self._last_fetch > self.ttl_seconds
@property
def age_seconds(self) -> float:
"""Get the age of the cache in seconds."""
if self._last_fetch == 0:
return float("inf")
return time.time() - self._last_fetch
def get_models(self, force_refresh: bool = False) -> list[ModelInfo]:
"""Get cached models, fetching if expired or forced.
Args:
force_refresh: If True, bypass cache and fetch fresh data.
Returns:
List of ModelInfo objects.
"""
if force_refresh or self.is_expired:
self._fetch_models()
return self._models
def _fetch_models(self) -> None:
"""Fetch models from OpenRouter API."""
if not self.api_key:
logger.warning("No API key configured, cannot fetch models")
return
logger.info("Fetching models from OpenRouter")
try:
response = httpx.get(
OPENROUTER_MODELS_URL,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=self.timeout,
)
response.raise_for_status()
data = response.json()
self._models = [ModelInfo.from_openrouter(m) for m in data.get("data", [])]
self._last_fetch = time.time()
self._fetch_count += 1
logger.info(f"Cached {len(self._models)} models from OpenRouter")
except Exception as e:
logger.error(f"Failed to fetch models from OpenRouter: {e}")
# Keep existing cache on failure
def get_model(self, model_id: str) -> Optional[ModelInfo]:
"""Get a specific model by ID.
Args:
model_id: The model ID to look up.
Returns:
ModelInfo if found, None otherwise.
"""
models = self.get_models()
for model in models:
if model.id == model_id:
return model
return None
def clear(self) -> None:
"""Clear the cache."""
self._models = []
self._last_fetch = 0
logger.info("Model cache cleared")
def get_stats(self) -> dict:
"""Get cache statistics.
Returns:
Dictionary with cache statistics.
"""
return {
"cached_models": len(self._models),
"last_fetch": self._last_fetch,
"age_seconds": self.age_seconds if self._last_fetch > 0 else None,
"is_expired": self.is_expired,
"ttl_seconds": self.ttl_seconds,
"fetch_count": self._fetch_count,
}
# ========== Model filtering for Council MCP server. ==========
from typing import Optional
class ModelFilter:
"""Filter models by various criteria.
This class provides methods to filter models by:
- Provider (google, anthropic, openai, etc.)
- Capability (vision, code, function_calling, reasoning)
- Free tier availability
- Text search on name/description
- Context length
"""
def __init__(self, models: list[ModelInfo]):
"""Initialize the filter with a list of models.
Args:
models: List of ModelInfo objects to filter.
"""
self.models = models
def by_provider(self, provider: str) -> "ModelFilter":
"""Filter models by provider.
Args:
provider: Provider name (e.g., "google", "anthropic", "openai").
Returns:
New ModelFilter with filtered models.
"""
filtered = [m for m in self.models if m.provider.lower() == provider.lower()]
return ModelFilter(filtered)
def by_capability(self, capability: str) -> "ModelFilter":
"""Filter models by capability.
Args:
capability: Capability name (e.g., "vision", "code", "function_calling").
Returns:
New ModelFilter with filtered models.
"""
filtered = [
m for m in self.models if capability.lower() in [c.lower() for c in m.capabilities]
]
return ModelFilter(filtered)
def free_only(self) -> "ModelFilter":
"""Filter to only include free tier models.
Returns:
New ModelFilter with only free models.
"""
filtered = [m for m in self.models if m.is_free]
return ModelFilter(filtered)
def paid_only(self) -> "ModelFilter":
"""Filter to only include paid models.
Returns:
New ModelFilter with only paid models.
"""
filtered = [m for m in self.models if not m.is_free]
return ModelFilter(filtered)
def search(self, query: str) -> "ModelFilter":
"""Search models by name or ID.
Args:
query: Search query to match against name or ID.
Returns:
New ModelFilter with matching models.
"""
query_lower = query.lower()
filtered = [
m for m in self.models if query_lower in m.name.lower() or query_lower in m.id.lower()
]
return ModelFilter(filtered)
def min_context_length(self, length: int) -> "ModelFilter":
"""Filter models with at least the specified context length.
Args:
length: Minimum context length in tokens.
Returns:
New ModelFilter with models meeting the requirement.
"""
filtered = [m for m in self.models if m.context_length >= length]
return ModelFilter(filtered)
def limit(self, count: int) -> "ModelFilter":
"""Limit the number of models returned.
Args:
count: Maximum number of models to return.
Returns:
New ModelFilter with limited models.
"""
return ModelFilter(self.models[:count])
def sort_by_context_length(self, descending: bool = True) -> "ModelFilter":
"""Sort models by context length.
Args:
descending: If True, sort from highest to lowest.
Returns:
New ModelFilter with sorted models.
"""
sorted_models = sorted(
self.models,
key=lambda m: m.context_length,
reverse=descending,
)
return ModelFilter(sorted_models)
def sort_by_name(self, descending: bool = False) -> "ModelFilter":
"""Sort models by name.
Args:
descending: If True, sort in reverse order.
Returns:
New ModelFilter with sorted models.
"""
sorted_models = sorted(
self.models,
key=lambda m: m.name.lower(),
reverse=descending,
)
return ModelFilter(sorted_models)
def to_list(self) -> list[ModelInfo]:
"""Get the filtered models as a list.
Returns:
List of ModelInfo objects.
"""
return list(self.models)
def count(self) -> int:
"""Get the count of filtered models.
Returns:
Number of models.
"""
return len(self.models)
def first(self) -> Optional[ModelInfo]:
"""Get the first model in the filtered list.
Returns:
First ModelInfo or None if empty.
"""
return self.models[0] if self.models else None
@classmethod
def apply_filters(
cls,
models: list[ModelInfo],
provider: Optional[str] = None,
capability: Optional[str] = None,
free_only: bool = False,
search: Optional[str] = None,
min_context: Optional[int] = None,
limit: Optional[int] = None,
) -> list[ModelInfo]:
"""Apply multiple filters at once.
This is a convenience method that applies all specified filters.
Args:
models: List of models to filter.
provider: Filter by provider name.
capability: Filter by capability.
free_only: If True, only include free models.
search: Search query for name/ID.
min_context: Minimum context length.
limit: Maximum number of results.
Returns:
Filtered list of ModelInfo objects.
"""
result = cls(models)
if provider:
result = result.by_provider(provider)
if capability:
result = result.by_capability(capability)
if free_only:
result = result.free_only()
if search:
result = result.search(search)
if min_context:
result = result.min_context_length(min_context)
if limit:
result = result.limit(limit)
return result.to_list()
# ========== Curated model registry with task-specific recommendations. This module provi... ==========
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ModelClass(str, Enum):
"""Model class/tier for quick selection."""
FLASH = "flash" # Fast, cheap, good for simple tasks
PRO = "pro" # Balanced, good for most tasks
DEEP = "deep" # Maximum quality, complex reasoning
class TaskType(str, Enum):
"""Task types for model recommendations."""
CODING = "coding"
CODE_REVIEW = "code_review"
REASONING = "reasoning"
CREATIVE = "creative"
VISION = "vision"
LONG_CONTEXT = "long_context"
GENERAL = "general"
@dataclass
class ModelMetadata:
"""Curated metadata for a model."""
model_class: ModelClass
strengths: dict[str, str] = field(default_factory=dict) # TaskType -> S/A/B/C rating
description: str = ""
notes: str = ""
recommended_for: list[str] = field(default_factory=list)
# Curated model registry - updated December 2025
# Based on benchmarks: SWE-bench, WebDev Arena, GPQA, and OpenRouter usage data
MODEL_REGISTRY: dict[str, ModelMetadata] = {
# === Anthropic Models ===
"anthropic/claude-3.5-sonnet": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.CODE_REVIEW: "A",
TaskType.REASONING: "A",
TaskType.CREATIVE: "A",
TaskType.VISION: "A",
TaskType.GENERAL: "A",
},
description="Excellent all-rounder with strong coding abilities",
recommended_for=["code_review", "refactoring", "general"],
),
"anthropic/claude-3.5-haiku": ModelMetadata(
model_class=ModelClass.FLASH,
strengths={
TaskType.CODING: "B",
TaskType.REASONING: "B",
TaskType.GENERAL: "B",
},
description="Fast and cost-effective for simpler tasks",
recommended_for=["quick_questions", "simple_code", "summaries"],
),
"anthropic/claude-sonnet-4": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "S",
TaskType.CODE_REVIEW: "S",
TaskType.REASONING: "A",
TaskType.CREATIVE: "A",
TaskType.VISION: "A",
TaskType.GENERAL: "A",
},
description="State-of-the-art coding (72.5% SWE-bench)",
notes="Leads coding benchmarks as of late 2025",
recommended_for=["coding", "code_review", "debugging", "refactoring"],
),
"anthropic/claude-opus-4": ModelMetadata(
model_class=ModelClass.DEEP,
strengths={
TaskType.CODING: "S",
TaskType.CODE_REVIEW: "S",
TaskType.REASONING: "S",
TaskType.CREATIVE: "A",
TaskType.LONG_CONTEXT: "A",
TaskType.GENERAL: "S",
},
description="Most capable Claude, excellent for complex tasks",
recommended_for=["complex_reasoning", "architecture", "long_documents"],
),
# === Google Models ===
"google/gemini-2.5-pro": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "S",
TaskType.VISION: "S",
TaskType.LONG_CONTEXT: "S",
TaskType.CREATIVE: "A",
TaskType.GENERAL: "A",
},
description="Excellent reasoning, 1M context, leads WebDev Arena",
notes="Best for web development and multimodal tasks",
recommended_for=["web_development", "vision", "long_context", "reasoning"],
),
"google/gemini-2.5-flash": ModelMetadata(
model_class=ModelClass.FLASH,
strengths={
TaskType.CODING: "B",
TaskType.REASONING: "B",
TaskType.VISION: "A",
TaskType.GENERAL: "B",
},
description="Fast multimodal model, good for vision tasks",
recommended_for=["quick_vision", "image_analysis", "fast_responses"],
),
"google/gemini-3-pro-preview": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "S",
TaskType.VISION: "S",
TaskType.LONG_CONTEXT: "S",
TaskType.CREATIVE: "A",
TaskType.GENERAL: "A",
},
description="Latest Gemini with enhanced reasoning (86.4 GPQA)",
notes="Strong multimodal and reasoning capabilities",
recommended_for=["reasoning", "vision", "research", "analysis"],
),
# === OpenAI Models ===
"openai/gpt-4o": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.VISION: "A",
TaskType.CREATIVE: "A",
TaskType.GENERAL: "A",
},
description="Strong all-rounder with good speed",
recommended_for=["general", "creative", "coding"],
),
"openai/gpt-4o-mini": ModelMetadata(
model_class=ModelClass.FLASH,
strengths={
TaskType.CODING: "B",
TaskType.REASONING: "B",
TaskType.GENERAL: "B",
},
description="Cost-effective GPT-4 class model",
recommended_for=["quick_tasks", "simple_coding", "summaries"],
),
"openai/gpt-4-turbo": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.VISION: "A",
TaskType.LONG_CONTEXT: "A",
TaskType.GENERAL: "A",
},
description="128K context, strong overall performance",
recommended_for=["long_documents", "coding", "general"],
),
# === DeepSeek Models ===
"deepseek/deepseek-r1": ModelMetadata(
model_class=ModelClass.DEEP,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "S",
TaskType.GENERAL: "A",
},
description="Specialized reasoning with reinforcement learning",
notes="Excels at math, logic, and complex coding",
recommended_for=["complex_reasoning", "math", "logic_puzzles"],
),
"deepseek/deepseek-chat": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.CREATIVE: "A",
TaskType.GENERAL: "A",
},
description="Strong general-purpose model, cost-effective",
notes="Popular open-source option",
recommended_for=["general", "coding", "creative"],
),
# === Meta Models ===
"meta-llama/llama-3.3-70b-instruct": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.GENERAL: "A",
},
description="Strong open-source model, often free tier",
notes="Great for cost-conscious usage",
recommended_for=["general", "coding", "free_tier"],
),
"meta-llama/llama-3.1-405b-instruct": ModelMetadata(
model_class=ModelClass.DEEP,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.LONG_CONTEXT: "A",
TaskType.GENERAL: "A",
},
description="Largest Llama, 128K context",
recommended_for=["complex_tasks", "long_context"],
),
# === Mistral Models ===
"mistralai/mistral-large": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.GENERAL: "A",
},
description="Strong European model with good coding",
recommended_for=["coding", "general", "multilingual"],
),
"mistralai/mistral-medium-3": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "B",
TaskType.GENERAL: "A",
},
description="90% of premium performance at $0.40/M tokens",
notes="Best value for money",
recommended_for=["cost_effective", "general", "coding"],
),
# === xAI Models ===
"x-ai/grok-2": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.CREATIVE: "A",
TaskType.GENERAL: "A",
},
description="Strong reasoning with real-time web integration",
notes="Has 'Think' mode for step-by-step reasoning",
recommended_for=["reasoning", "current_events", "creative"],
),
# === Qwen Models ===
"qwen/qwen-2.5-72b-instruct": ModelMetadata(
model_class=ModelClass.PRO,
strengths={
TaskType.CODING: "A",
TaskType.REASONING: "A",
TaskType.GENERAL: "A",
},
description="Strong open-source alternative from Alibaba",
notes="Second most used open-source on OpenRouter",
recommended_for=["coding", "general", "multilingual"],
),
}
# Task-to-model recommendations based on our research
TASK_RECOMMENDATIONS: dict[TaskType, list[str]] = {
TaskType.CODING: [
"anthropic/claude-sonnet-4", # SWE-bench leader
"anthropic/claude-3.5-sonnet",
"google/gemini-2.5-pro",
"deepseek/deepseek-chat",
],
TaskType.CODE_REVIEW: [
"anthropic/claude-sonnet-4",
"anthropic/claude-3.5-sonnet",
"google/gemini-3-pro-preview",
],
TaskType.REASONING: [
"deepseek/deepseek-r1", # Specialized reasoning
"google/gemini-3-pro-preview", # 86.4 GPQA
"anthropic/claude-opus-4",
"x-ai/grok-2",
],
TaskType.CREATIVE: [
"anthropic/claude-3.5-sonnet",
"openai/gpt-4o",
"deepseek/deepseek-chat",
],
TaskType.VISION: [
"google/gemini-2.5-pro", # Dominates vision workloads
"google/gemini-2.5-flash",
"openai/gpt-4o",
"anthropic/claude-3.5-sonnet",
],
TaskType.LONG_CONTEXT: [
"google/gemini-2.5-pro", # 1M tokens
"google/gemini-3-pro-preview", # 1M tokens
"anthropic/claude-opus-4",
"meta-llama/llama-3.1-405b-instruct",
],
TaskType.GENERAL: [
"anthropic/claude-3.5-sonnet",
"openai/gpt-4o",
"google/gemini-2.5-pro",
"deepseek/deepseek-chat",
],
}
# Free tier recommendations
FREE_TIER_MODELS = [
"meta-llama/llama-3.3-70b-instruct:free",
"deepseek/deepseek-chat:free",
"qwen/qwen-2.5-72b-instruct:free",
]
def get_model_metadata(model_id: str) -> Optional[ModelMetadata]:
"""Get curated metadata for a model.
Args:
model_id: The model ID (e.g., 'anthropic/claude-3.5-sonnet').
Returns:
ModelMetadata if found, None otherwise.
"""
# Try exact match first
if model_id in MODEL_REGISTRY:
return MODEL_REGISTRY[model_id]
# Try without version suffix (e.g., ':free', ':beta')
base_id = model_id.split(":")[0]
if base_id in MODEL_REGISTRY:
return MODEL_REGISTRY[base_id]
# Try fuzzy match on model name
model_lower = model_id.lower()
for reg_id, metadata in MODEL_REGISTRY.items():
if reg_id.lower() in model_lower or model_lower in reg_id.lower():
return metadata
return None
def get_recommendations_for_task(task: TaskType, limit: int = 3) -> list[str]:
"""Get recommended models for a specific task type.
Args:
task: The type of task.
limit: Maximum number of recommendations.
Returns:
List of recommended model IDs.
"""
recommendations = TASK_RECOMMENDATIONS.get(task, TASK_RECOMMENDATIONS[TaskType.GENERAL])
return recommendations[:limit]
def get_model_class_description(model_class: ModelClass) -> str:
"""Get a description of a model class.
Args:
model_class: The model class.
Returns:
Human-readable description.
"""
descriptions = {
ModelClass.FLASH: "Fast & cost-effective - good for simple tasks, quick responses",
ModelClass.PRO: "Balanced quality/cost - recommended for most tasks",
ModelClass.DEEP: "Maximum quality - complex reasoning, long context, quality-critical",
}
return descriptions.get(model_class, "Unknown class")
def generate_model_guide() -> str:
"""Generate a human-readable model selection guide.
Returns:
Markdown-formatted guide string.
"""
lines = [
"# Model Selection Guide",
"",
"## Quick Reference by Task",
"",
]
for task in TaskType:
task_name = task.value.replace("_", " ").title()
recommendations = get_recommendations_for_task(task, limit=3)
models_str = ", ".join(r.split("/")[1] for r in recommendations)
lines.append(f"**{task_name}**: {models_str}")
lines.extend(
[
"",
"## Model Classes",
"",
f"- **Flash**: {get_model_class_description(ModelClass.FLASH)}",
f"- **Pro**: {get_model_class_description(ModelClass.PRO)}",
f"- **Deep**: {get_model_class_description(ModelClass.DEEP)}",
"",
"## Free Tier Options",
"",
]
)
for model in FREE_TIER_MODELS:
lines.append(f"- {model}")
return "\n".join(lines)
# ========== Model manager for Council MCP server. ==========
import os
from typing import Any, Optional
class ModelManager:
"""Manages LLM interactions for Council MCP server.
This class provides a unified interface for:
- Generating content from LLMs via OpenRouter
- Switching between different models
- Tracking usage statistics
"""
def __init__(
self,
api_key: Optional[str] = None,
default_model: Optional[str] = None,
timeout: Optional[float] = None,
):
"""Initialize the model manager.
Args:
api_key: OpenRouter API key. If None, reads from OPENROUTER_API_KEY.
default_model: Default model to use. If None, reads from COUNCIL_DEFAULT_MODEL
or defaults to "google/gemini-3-pro-preview".
timeout: Request timeout in seconds. If None, reads from COUNCIL_TIMEOUT
or defaults to 600.0 (10 minutes).
"""
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
# default_model always has a value due to the fallback
self.default_model: str = (
default_model
or os.getenv("COUNCIL_DEFAULT_MODEL", "google/gemini-3-pro-preview")
or "google/gemini-3-pro-preview"
)
self.timeout = timeout or float(os.getenv("COUNCIL_TIMEOUT", "600000")) / 1000
# Initialize the provider
self._provider: Optional[OpenRouterProvider] = None
# Current active model (can be changed with set_model)
self._active_model: str = self.default_model
# Statistics
self.total_calls = 0
self.successful_calls = 0
self.failed_calls = 0
logger.info(f"ModelManager initialized with default model: {self.default_model}")
@property
def provider(self) -> OpenRouterProvider:
"""Get the OpenRouter provider, initializing if needed."""
if self._provider is None:
self._provider = OpenRouterProvider(
api_key=self.api_key,
default_model=self.default_model,
timeout=self.timeout,
)
return self._provider
@property
def active_model(self) -> str:
"""Get the currently active model."""
return self._active_model
def set_model(self, model_id: str) -> bool:
"""Set the active model for subsequent requests.
Args:
model_id: The model ID to use (e.g., "google/gemini-3-pro-preview").
Returns:
True if the model was set successfully.
"""
logger.info(f"Setting active model to: {model_id}")
self._active_model = model_id
return True
def generate_content(
self,
prompt: str,
model: Optional[str] = None,
**kwargs: Any,
) -> tuple[str, str]:
"""Generate content from the LLM.
This method provides backward compatibility with the old DualModelManager
interface by returning a tuple of (content, model_used).
Args:
prompt: The prompt to send to the model.
model: Override model for this request. If None, uses active model.
**kwargs: Additional parameters passed to the provider.
Returns:
Tuple of (response_content, model_used).
Raises:
LLMProviderError: If generation fails.
"""
model_to_use = model or self._active_model
self.total_calls += 1
try:
response = self.provider.generate(prompt, model=model_to_use, **kwargs)
self.successful_calls += 1
return response.content, response.model
except LLMProviderError as e:
self.failed_calls += 1
logger.error(f"Generation failed: {e}")
raise
def generate(
self,
prompt: str,
model: Optional[str] = None,
**kwargs: Any,
) -> LLMResponse:
"""Generate content and return full response object.
Args:
prompt: The prompt to send to the model.
model: Override model for this request. If None, uses active model.
**kwargs: Additional parameters passed to the provider.
Returns:
LLMResponse object with content and metadata.
Raises:
LLMProviderError: If generation fails.
"""
model_to_use = model or self._active_model
self.total_calls += 1
try:
response = self.provider.generate(prompt, model=model_to_use, **kwargs)
self.successful_calls += 1
return response
except LLMProviderError as e:
self.failed_calls += 1
logger.error(f"Generation failed: {e}")
raise
def list_models(self, force_refresh: bool = False) -> list[ModelInfo]:
"""List available models.
Args:
force_refresh: If True, bypass cache and fetch fresh data.
Returns:
List of ModelInfo objects.
"""
return self.provider.list_models(force_refresh=force_refresh)
def get_model_info(self, model_id: str) -> Optional[ModelInfo]:
"""Get information about a specific model.
Args:
model_id: The model ID to look up.
Returns:
ModelInfo for the model, or None if not found.
"""
return self.provider.get_model_info(model_id)
def is_available(self) -> bool:
"""Check if the model manager is properly configured.
Returns:
True if the provider is available.
"""
return self.provider.is_available()
def get_stats(self) -> dict[str, Any]:
"""Get usage statistics.
Returns:
Dictionary with usage statistics.
"""
success_rate = (
(self.successful_calls / self.total_calls * 100) if self.total_calls > 0 else 0
)
return {
"provider": "openrouter",
"active_model": self._active_model,
"default_model": self.default_model,
"total_calls": self.total_calls,
"successful_calls": self.successful_calls,
"failed_calls": self.failed_calls,
"success_rate": f"{success_rate:.1f}%",
}
# ========== Memory-related data models. ==========
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict
@dataclass
class ConversationTurn:
"""Represents a single turn in a conversation."""
role: str # "user", "assistant", "system"
content: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MemoryEntry:
"""Represents an entry in conversation memory."""
key: str
value: Any
category: str = "general"
timestamp: datetime = field(default_factory=datetime.now)
last_accessed: datetime = field(default_factory=datetime.now)
access_count: int = 0
# ========== Caching service for expensive operations. ==========
import hashlib
import time
from collections import OrderedDict
from typing import Any, Dict, Optional
class ResponseCache:
"""Simple LRU cache for tool responses."""
def __init__(self, max_size: int = 100, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: OrderedDict[str, Dict[str, Any]] = OrderedDict()
self.hits = 0
self.misses = 0
def create_key(self, tool_name: str, parameters: Dict[str, Any]) -> str:
"""Create a cache key from tool name and parameters."""
# Sort parameters for consistent hashing
params_str = json.dumps(parameters, sort_keys=True)
key_data = f"{tool_name}:{params_str}"
return hashlib.sha256(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get a value from cache if it exists and isn't expired."""
if key not in self.cache:
self.misses += 1
return None
entry = self.cache[key]
# Check if expired
if time.time() - entry["timestamp"] > self.ttl_seconds:
del self.cache[key]
self.misses += 1
return None
# Move to end (most recently used)
self.cache.move_to_end(key)
self.hits += 1
return entry["value"]
def set(self, key: str, value: Any) -> None:
"""Set a value in cache."""
# Remove oldest if at capacity
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = {"value": value, "timestamp": time.time()}
def clear(self) -> None:
"""Clear all cache entries."""
self.cache.clear()
self.hits = 0
self.misses = 0
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
total_requests = self.hits + self.misses
return {
"size": len(self.cache),
"max_size": self.max_size,
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total_requests if total_requests > 0 else 0,
"ttl_seconds": self.ttl_seconds,
}
# ========== Memory service for conversation context. ==========
from collections import deque
from datetime import datetime
from typing import Any, Dict, List, Optional
class ConversationMemory:
"""Enhanced conversation memory with TTL and structured storage."""
def __init__(self, max_turns: int = 50, max_entries: int = 100):
self.max_turns = max_turns
self.max_entries = max_entries
self.turns: deque[ConversationTurn] = deque(maxlen=max_turns)
self.entries: Dict[str, MemoryEntry] = {}
self.created_at = datetime.now()
self.access_count = 0
def add_turn(self, role: str, content: str, metadata: Optional[Dict] = None) -> None:
"""Add a conversation turn."""
turn = ConversationTurn(role=role, content=content, metadata=metadata or {})
self.turns.append(turn)
self.access_count += 1
def set(self, key: str, value: Any, category: str = "general") -> None:
"""Store a value with a key."""
# Remove oldest entries if at capacity
if len(self.entries) >= self.max_entries:
oldest_key = min(self.entries.keys(), key=lambda k: self.entries[k].timestamp)
del self.entries[oldest_key]
self.entries[key] = MemoryEntry(key=key, value=value, category=category, access_count=0)
self.access_count += 1
def get(self, key: str, default: Any = None) -> Any:
"""Retrieve a value by key."""
if key in self.entries:
entry = self.entries[key]
entry.access_count += 1
entry.last_accessed = datetime.now()
self.access_count += 1
return entry.value
return default
def get_turns(self, limit: Optional[int] = None) -> List[ConversationTurn]:
"""Get recent conversation turns."""
if limit:
return list(self.turns)[-limit:]
return list(self.turns)
def search_entries(self, category: Optional[str] = None) -> List[MemoryEntry]:
"""Search entries by category."""
if category:
return [e for e in self.entries.values() if e.category == category]
return list(self.entries.values())
def clear(self) -> None:
"""Clear all memory."""
self.turns.clear()
self.entries.clear()
self.access_count = 0
def get_stats(self) -> Dict[str, Any]:
"""Get memory usage statistics."""
return {
"turns_count": len(self.turns),
"entries_count": len(self.entries),
"max_turns": self.max_turns,
"max_entries": self.max_entries,
"total_accesses": self.access_count,
"created_at": self.created_at.isoformat(),
"categories": list(set(e.category for e in self.entries.values())),
}
# ========== Session manager for multi-turn AI conversations. ==========
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
@dataclass
class ConversationTurn:
"""A single turn in a conversation."""
role: str # "user" or "assistant"
content: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ConversationSession:
"""A conversation session with a model."""
session_id: str
model: str
system_prompt: str
turns: List[ConversationTurn] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
def add_turn(self, role: str, content: str) -> None:
"""Add a turn to the conversation."""
self.turns.append(ConversationTurn(role=role, content=content))
self.last_activity = datetime.now()
def get_message_history(self) -> List[Dict[str, str]]:
"""Get conversation history in OpenAI message format."""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
for turn in self.turns:
messages.append({"role": turn.role, "content": turn.content})
return messages
def get_summary(self) -> str:
"""Get a brief summary of the session."""
turn_count = len(self.turns)
duration = (self.last_activity - self.created_at).total_seconds()
first_message = self.turns[0].content[:100] if self.turns else "No messages"
return (
f"Session with {self.model}: {turn_count} turns over "
f"{duration:.0f}s. Started with: '{first_message}...'"
)
class SessionManager:
"""Manages multiple conversation sessions."""
def __init__(self, max_sessions: int = 20, max_turns_per_session: int = 50):
self.sessions: Dict[str, ConversationSession] = {}
self.max_sessions = max_sessions
self.max_turns_per_session = max_turns_per_session
def create_session(
self,
model: str,
system_prompt: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Create a new conversation session.
Args:
model: The model to use for this session
system_prompt: Optional system prompt to set context
metadata: Optional metadata for the session
Returns:
The session ID
"""
# Clean up old sessions if at capacity
if len(self.sessions) >= self.max_sessions:
self._cleanup_oldest_session()
session_id = f"sess_{uuid.uuid4().hex[:12]}"
self.sessions[session_id] = ConversationSession(
session_id=session_id,
model=model,
system_prompt=system_prompt,
metadata=metadata or {},
)
logger.info(f"Created session {session_id} with model {model}")
return session_id
def get_session(self, session_id: str) -> Optional[ConversationSession]:
"""Get a session by ID."""
return self.sessions.get(session_id)
def send_message(self, session_id: str, message: str, model_manager: Any) -> tuple[str, str]:
"""Send a message in a session and get a response.
Args:
session_id: The session ID
message: The user's message
model_manager: The model manager to use for generation
Returns:
Tuple of (response_text, model_used)
Raises:
ValueError: If session not found or turn limit exceeded
"""
session = self.sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
if len(session.turns) >= self.max_turns_per_session:
raise ValueError(
f"Session {session_id} has reached the maximum of "
f"{self.max_turns_per_session} turns"
)
# Add user message
session.add_turn("user", message)
# Build prompt with history
history = session.get_message_history()
prompt = self._format_prompt_with_history(history)
# Generate response
response_text, model_used = model_manager.generate_content(prompt, model=session.model)
# Add assistant response
session.add_turn("assistant", response_text)
logger.info(f"Session {session_id}: Turn {len(session.turns)//2} completed")
return response_text, model_used
def _format_prompt_with_history(self, messages: List[Dict[str, str]]) -> str:
"""Format message history into a prompt string."""
parts = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if role == "system":
parts.append(f"System: {content}")
elif role == "user":
parts.append(f"User: {content}")
elif role == "assistant":
parts.append(f"Assistant: {content}")
parts.append("Assistant:")
return "\n\n".join(parts)
def list_sessions(self) -> List[Dict[str, Any]]:
"""List all active sessions with summaries."""
return [
{
"session_id": s.session_id,
"model": s.model,
"turns": len(s.turns),
"created_at": s.created_at.isoformat(),
"last_activity": s.last_activity.isoformat(),
"preview": s.turns[0].content[:50] if s.turns else "Empty",
}
for s in sorted(
self.sessions.values(),
key=lambda x: x.last_activity,
reverse=True,
)
]
def get_history(self, session_id: str, limit: Optional[int] = None) -> List[Dict[str, str]]:
"""Get conversation history for a session.
Args:
session_id: The session ID
limit: Optional limit on number of turns to return
Returns:
List of message dicts with role and content
"""
session = self.sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
turns = session.turns
if limit:
turns = turns[-limit:]
return [{"role": t.role, "content": t.content} for t in turns]
def end_session(self, session_id: str, summarize: bool = False) -> str:
"""End a session and optionally return a summary.
Args:
session_id: The session ID
summarize: Whether to return a summary
Returns:
Summary string or confirmation message
"""
session = self.sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
summary = session.get_summary() if summarize else f"Session {session_id} ended"
del self.sessions[session_id]
logger.info(f"Ended session {session_id}")
return summary
def _cleanup_oldest_session(self) -> None:
"""Remove the oldest session to make room for new ones."""
if not self.sessions:
return
oldest_id = min(
self.sessions.keys(),
key=lambda k: self.sessions[k].last_activity,
)
logger.warning(f"Cleaning up oldest session {oldest_id} to make room")
del self.sessions[oldest_id]
def get_stats(self) -> Dict[str, Any]:
"""Get session manager statistics."""
total_turns = sum(len(s.turns) for s in self.sessions.values())
return {
"active_sessions": len(self.sessions),
"max_sessions": self.max_sessions,
"total_turns": total_turns,
"max_turns_per_session": self.max_turns_per_session,
}
# ========== Base class for all tools. ==========
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
# Simplified ToolOutput for bundled tools
class ToolOutput:
"""Standard output format for tool execution."""
def __init__(self, success: bool, result: Optional[str] = None, error: Optional[str] = None):
self.success = success
self.result = result
self.error = error
self.metadata: Dict[str, Any] = {}
# Add missing attributes for compatibility with orchestrator
self.tool_name: str = ""
self.execution_time_ms: Optional[float] = None
self.model_used: Optional[str] = None
self.timestamp = None
class MCPTool(ABC):
"""Abstract base class for all tools using simplified property-based approach."""
@property
@abstractmethod
def name(self) -> str:
"""Return the tool name."""
pass
@property
@abstractmethod
def description(self) -> str:
"""Return the tool description."""
pass
@property
@abstractmethod
def input_schema(self) -> Dict[str, Any]:
"""Return the JSON schema for tool inputs."""
pass
@abstractmethod
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
pass
def get_mcp_definition(self) -> Dict[str, Any]:
"""Get the MCP tool definition."""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
}
# Keep the original BaseTool for backwards compatibility during migration
class BaseTool(MCPTool):
"""Legacy base class that wraps MCPTool for backwards compatibility."""
def __init__(self):
# No-op for legacy compatibility
pass
@property
def name(self) -> str:
"""Default to empty string for legacy tools."""
return ""
@property
def description(self) -> str:
"""Default to empty string for legacy tools."""
return ""
@property
def input_schema(self) -> Dict[str, Any]:
"""Default to empty schema for legacy tools."""
return {"type": "object", "properties": {}, "required": []}
# ========== Tool registry for dynamic tool discovery and management. ==========
import importlib
import inspect
from pathlib import Path
from typing import Dict, List, Optional, Type
class ToolRegistry:
"""Registry for discovering and managing tools."""
def __init__(self):
self._tools: Dict[str, BaseTool] = {}
self._tool_classes: Dict[str, Type[BaseTool]] = {}
def discover_tools(self, tools_path: Optional[Path] = None) -> None:
"""Discover and register all tools in the tools directory."""
if tools_path is None:
# Default to the tools package
tools_path = Path(__file__).parent.parent / "tools"
logger.info(f"Discovering tools in {tools_path}")
# Get all Python files in the tools directory
tool_files = list(tools_path.glob("*.py"))
logger.debug(f"Found {len(tool_files)} Python files in {tools_path}")
for tool_file in tool_files:
if tool_file.name.startswith("_") or tool_file.name == "base.py":
continue
# Try both import paths
module_names = [
f"council.tools.{tool_file.stem}",
f"src.council.tools.{tool_file.stem}",
]
module = None
for module_name in module_names:
try:
logger.debug(f"Attempting to import {module_name}")
module = importlib.import_module(module_name)
break
except ImportError:
continue
if module is None:
logger.error(f"Failed to import tool from {tool_file}")
continue
try:
# Find all classes that inherit from BaseTool
for name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, BaseTool) and obj != BaseTool:
logger.debug(f"Found tool class: {name}")
self._register_tool_class(obj)
except Exception as e:
logger.error(f"Failed to import tool from {tool_file}: {e}")
def _register_tool_class(self, tool_class: Type[BaseTool]) -> None:
"""Register a tool class."""
try:
# Instantiate the tool to get its metadata
tool_instance = tool_class()
# Use property-based access for compatibility
tool_name = tool_instance.name
if tool_name in self._tools:
logger.warning(f"Tool {tool_name} already registered, skipping")
return
self._tools[tool_name] = tool_instance
self._tool_classes[tool_name] = tool_class
logger.info(f"Registered tool: {tool_name}")
except Exception as e:
logger.error(f"Failed to register tool {tool_class.__name__}: {e}")
def get_tool(self, name: str) -> Optional[BaseTool]:
"""Get a tool instance by name."""
return self._tools.get(name)
def get_tool_class(self, name: str) -> Optional[Type[BaseTool]]:
"""Get a tool class by name."""
return self._tool_classes.get(name)
def list_tools(self) -> List[str]:
"""List all registered tool names."""
return list(self._tools.keys())
def get_all_tools(self) -> Dict[str, BaseTool]:
"""Get all registered tools."""
return self._tools.copy()
def get_mcp_tool_definitions(self) -> List[Dict]:
"""Get MCP tool definitions for all registered tools."""
definitions = []
for tool in self._tools.values():
try:
definitions.append(tool.get_mcp_definition())
except Exception as e:
# Use property-based access for error logging
tool_name = getattr(tool, "name", tool.__class__.__name__)
logger.error(f"Failed to get MCP definition for {tool_name}: {e}")
return definitions
# ========== Orchestrator for managing tool execution and conversation flow. ==========
from typing import Any, Dict, List, Optional
class ConversationOrchestrator:
"""Orchestrates tool execution and manages conversation flow."""
def __init__(
self,
tool_registry: ToolRegistry,
model_manager: Any, # DualModelManager
memory: Optional[ConversationMemory] = None,
cache: Optional[ResponseCache] = None,
):
self.tool_registry = tool_registry
self.model_manager = model_manager
self.memory = memory or ConversationMemory()
self.cache = cache or ResponseCache()
self.execution_history: List[ToolOutput] = []
async def execute_tool(
self, tool_name: str, parameters: Dict[str, Any], request_id: Optional[str] = None
) -> ToolOutput:
"""Execute a single tool with proper context injection."""
# Check cache first
cache_key = self.cache.create_key(tool_name, parameters)
cached_result = self.cache.get(cache_key)
if cached_result:
logger.info(f"Cache hit for {tool_name}")
return cached_result
# Get the tool
tool = self.tool_registry.get_tool(tool_name)
if not tool:
return ToolOutput(success=False, error=f"Unknown tool: {tool_name}")
# For bundled operation, set global model manager
global model_manager
model_manager = self.model_manager
# Execute the tool
try:
output = await tool.execute(parameters)
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
output = ToolOutput(success=False, error=str(e))
# Cache successful results
if output.success:
self.cache.set(cache_key, output)
# Store in execution history
self.execution_history.append(output)
return output
# Create tool input with context (kept for reference, though not used in new API)
# tool_input = ToolInput(
# tool_name=tool_name,
# parameters=parameters,
# context={
# "model_manager": self.model_manager,
# "memory": self.memory,
# "orchestrator": self,
# },
# request_id=request_id,
# )
# Execute the tool with just parameters (new API)
output = await tool.execute(parameters)
# Cache successful results
if output.success:
self.cache.set(cache_key, output)
# Store in execution history
self.execution_history.append(output)
# Update memory if needed
if output.success and hasattr(tool, "update_memory"):
tool.update_memory(self.memory, output)
return output
async def execute_protocol(
self, protocol_name: str, initial_input: Dict[str, Any]
) -> List[ToolOutput]:
"""Execute a multi-step protocol (e.g., debate, synthesis)."""
logger.info(f"Executing protocol: {protocol_name}")
# Example: Simple sequential execution
if protocol_name == "simple":
tool_name = initial_input.get("tool_name", "")
parameters = initial_input.get("parameters", {})
if tool_name:
return [await self.execute_tool(tool_name, parameters)]
return []
# Debate protocol
elif protocol_name == "debate":
topic = initial_input.get("topic", "")
positions = initial_input.get("positions", [])
if not topic or not positions:
output = ToolOutput(
success=False,
error="Debate protocol requires 'topic' and 'positions' parameters",
)
output.tool_name = "debate_protocol"
return [output]
debate = DebateProtocol(self, topic, positions)
try:
result = await debate.run()
output = ToolOutput(
success=True,
result=str(result), # Convert to string
)
output.tool_name = "debate_protocol"
output.metadata = {"protocol": "debate", "rounds": len(result.get("rounds", []))}
return [output]
except Exception as e:
logger.error(f"Debate protocol error: {e}")
output = ToolOutput(success=False, error=str(e))
output.tool_name = "debate_protocol"
return [output]
# Synthesis protocol (simple wrapper around synthesize tool)
elif protocol_name == "synthesis":
return [
await self.execute_tool(
"synthesize_perspectives", initial_input.get("parameters", {})
)
]
# Protocol not implemented
raise NotImplementedError(f"Protocol {protocol_name} not implemented")
def get_execution_stats(self) -> Dict[str, Any]:
"""Get statistics about tool executions."""
total = len(self.execution_history)
successful = sum(1 for output in self.execution_history if output.success)
failed = total - successful
avg_time: float = 0
if total > 0:
times = [o.execution_time_ms for o in self.execution_history if o.execution_time_ms]
avg_time = sum(times) / len(times) if times else 0
return {
"total_executions": total,
"successful": successful,
"failed": failed,
"success_rate": successful / total if total > 0 else 0,
"average_execution_time_ms": avg_time,
"cache_stats": self.cache.get_stats(),
}
# ========== Debate protocol for structured multi-agent discussions. ==========
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class DebatePosition:
"""Represents a position in a debate."""
agent_name: str
stance: str
arguments: List[str] = field(default_factory=list)
rebuttals: Dict[str, str] = field(default_factory=dict)
confidence: float = 0.5
@dataclass
class DebateRound:
"""Represents a round of debate."""
round_number: int
positions: List[DebatePosition]
synthesis: Optional[str] = None
class DebateProtocol:
"""Orchestrates structured debates between multiple agents."""
def __init__(self, orchestrator, topic: str, positions: List[str]):
self.orchestrator = orchestrator
self.topic = topic
self.positions = positions
self.rounds: List[DebateRound] = []
self.max_rounds = 3
async def run(self) -> Dict[str, Any]:
"""Run the debate protocol."""
logger.info(f"Starting debate on topic: {self.topic}")
# Round 1: Opening statements
round1 = await self._opening_statements()
self.rounds.append(round1)
# Round 2: Rebuttals
round2 = await self._rebuttal_round()
self.rounds.append(round2)
# Round 3: Final synthesis
synthesis = await self._synthesis_round()
return {
"topic": self.topic,
"rounds": self.rounds,
"final_synthesis": synthesis,
"positions_explored": len(self.positions),
}
async def _opening_statements(self) -> DebateRound:
"""Generate opening statements for each position."""
logger.info("Debate Round 1: Opening statements")
debate_positions = []
for i, position in enumerate(self.positions):
# Create a persona for this position
prompt = f"""You are participating in a structured debate on the topic: {self.topic}
Your assigned position is: {position}
Please provide:
1. Your main argument (2-3 sentences)
2. Three supporting points
3. Your confidence level (0.0-1.0) in this position
4. Any caveats or limitations you acknowledge
Be concise but persuasive."""
# Execute via orchestrator
result = await self.orchestrator.execute_tool(
"ask_gemini", {"question": prompt, "context": f"Debate agent {i+1}"}
)
if result.success:
# Parse the response (in a real implementation, we'd use structured output)
debate_position = DebatePosition(
agent_name=f"Agent_{i+1}",
stance=position,
arguments=[result.result], # Simplified for now
confidence=0.7, # Would be parsed from response
)
debate_positions.append(debate_position)
return DebateRound(round_number=1, positions=debate_positions)
async def _rebuttal_round(self) -> DebateRound:
"""Generate rebuttals for each position."""
logger.info("Debate Round 2: Rebuttals")
if not self.rounds:
raise ValueError("No opening statements to rebut")
previous_positions = self.rounds[0].positions
updated_positions = []
for i, position in enumerate(previous_positions):
rebuttals = {}
# Generate rebuttals against other positions
for j, other_position in enumerate(previous_positions):
if i == j:
continue
prompt = f"""You previously argued for: {position.stance}
The opposing view argues: {other_position.arguments[0]}
Please provide:
1. A concise rebuttal to their argument
2. Why your position is stronger
3. Any points of agreement or common ground
Keep your response under 100 words."""
result = await self.orchestrator.execute_tool(
"ask_gemini",
{"question": prompt, "context": f"Rebuttal from {position.agent_name}"},
)
if result.success:
rebuttals[other_position.agent_name] = result.result
# Update position with rebuttals
position.rebuttals = rebuttals
updated_positions.append(position)
return DebateRound(round_number=2, positions=updated_positions)
async def _synthesis_round(self) -> str:
"""Synthesize all positions into a final analysis."""
logger.info("Debate Round 3: Synthesis")
# Prepare perspectives for synthesis tool
perspectives = []
for round in self.rounds:
for position in round.positions:
perspectives.append(
{
"source": f"{position.agent_name} ({position.stance})",
"content": " ".join(position.arguments),
}
)
# Use the synthesize_perspectives tool
result = await self.orchestrator.execute_tool(
"synthesize_perspectives", {"topic": self.topic, "perspectives": perspectives}
)
return result.result if result.success else "Failed to synthesize debate"
# ========== Main MCP server implementation that orchestrates all modular components. ==========
import os
from logging.handlers import RotatingFileHandler
from os import PathLike
from typing import IO, Any, Dict, Optional, Union
# Try to import dotenv if available
try:
from dotenv import load_dotenv
HAS_DOTENV = True
except ImportError:
HAS_DOTENV = False
def load_dotenv(
dotenv_path: Optional[Union[str, PathLike[str]]] = None,
stream: Optional[IO[str]] = None,
verbose: bool = False,
override: bool = False,
interpolate: bool = True,
encoding: Optional[str] = None,
) -> bool:
"""Dummy function when dotenv is not available."""
return False
__version__ = "4.0.0"
class CouncilMCPServer:
"""Main MCP Server that integrates all modular components."""
def __init__(self):
"""Initialize the server with modular components."""
# Load environment variables at startup
self._load_env_file()
self.model_manager: Optional[ModelManager] = None
self.tool_registry = ToolRegistry()
self.cache = ResponseCache(max_size=100, ttl_seconds=3600)
self.memory = ConversationMemory(max_turns=50, max_entries=100)
self.orchestrator: Optional[ConversationOrchestrator] = None
# Create JSON-RPC server
self.server = JsonRpcServer("council-mcp-server")
self._setup_handlers()
# Make server instance available globally for tools
setattr(council, "_server_instance", self)
# Also set as global for bundled mode
globals()["_server_instance"] = self
def _load_env_file(self) -> None:
"""Load .env file from multiple possible locations."""
# Try multiple locations for .env file
# 1. Directory of the main entry point (works with launcher.py)
main_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
# 2. Parent directory of main (in case we're in a subdirectory)
parent_dir = os.path.dirname(main_dir)
# 3. Current working directory
cwd = os.getcwd()
# 4. Script directory (where this file is)
script_dir = os.path.dirname(os.path.abspath(__file__))
env_locations = [
os.path.join(main_dir, ".env"),
os.path.join(parent_dir, ".env"),
os.path.join(cwd, ".env"),
os.path.join(script_dir, ".env"),
]
# If python-dotenv is available, try to use it first
if HAS_DOTENV:
env_loaded = False
for env_path in env_locations:
if os.path.exists(env_path):
logger.info(f"Loading .env from {env_path}")
load_dotenv(env_path)
env_loaded = True
break
if not env_loaded:
# Try current directory as last fallback
logger.info("No .env file found in expected locations, trying current directory")
load_dotenv()
else:
# Manual .env loading if python-dotenv is not available
for env_path in env_locations:
if os.path.exists(env_path):
logger.info(f"Loading .env from {env_path} (manual mode)")
try:
with open(env_path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
# Strip whitespace from key and value
key = key.strip()
value = value.strip()
# Remove quotes if present
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
os.environ[key] = value
if key == "OPENROUTER_API_KEY":
logger.info(
f"Set OPENROUTER_API_KEY from .env file "
f"(length: {len(value)})"
)
break
except Exception as e:
logger.error(f"Failed to load .env file: {e}")
def _initialize_model_manager(self) -> bool:
"""Initialize the model manager with API key."""
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
logger.error("No OPENROUTER_API_KEY found in environment. Please check your .env file.")
# Log all env vars starting with OPENROUTER or COUNCIL for debugging
relevant_vars = {
k: v
for k, v in os.environ.items()
if k.startswith("OPENROUTER") or k.startswith("COUNCIL")
}
if relevant_vars:
logger.info(f"Found relevant env vars: {list(relevant_vars.keys())}")
else:
logger.warning("No OPENROUTER or COUNCIL environment variables found at all")
return False
try:
logger.info(f"Initializing ModelManager with API key (length: {len(api_key)})")
self.model_manager = ModelManager(api_key)
# Create orchestrator with all components
logger.info("Creating conversation orchestrator...")
self.orchestrator = ConversationOrchestrator(
tool_registry=self.tool_registry,
model_manager=self.model_manager,
memory=self.memory,
cache=self.cache,
)
logger.info("Model manager initialization complete")
return True
except Exception as e:
logger.error(f"Failed to initialize model manager: {e}", exc_info=True)
return False
def _setup_handlers(self):
"""Set up JSON-RPC handlers."""
# Register handlers
self.server.register_handler("initialize", self.handle_initialize)
self.server.register_handler("tools/list", self.handle_tools_list)
self.server.register_handler("tools/call", self.handle_tool_call)
def handle_initialize(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialization request."""
# Reload environment variables in case they changed
self._load_env_file()
# Log the API key status
api_key = os.environ.get("OPENROUTER_API_KEY")
if api_key:
logger.info(f"OPENROUTER_API_KEY found (length: {len(api_key)})")
else:
logger.warning("OPENROUTER_API_KEY not found in environment")
# Discover and register all tools FIRST
self.tool_registry.discover_tools()
logger.info(f"Registered {len(self.tool_registry.list_tools())} tools")
# Initialize model manager AFTER tools are registered
model_initialized = self._initialize_model_manager()
return create_result_response(
request_id,
{
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": "council-mcp-server",
"version": __version__,
"modelsAvailable": model_initialized,
},
"capabilities": {"tools": {}},
},
)
def handle_tools_list(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool list request."""
# Get tool definitions from registry
tool_defs = self.tool_registry.get_mcp_tool_definitions()
# Convert to tool list format
tools = []
for tool_def in tool_defs:
tools.append(
{
"name": tool_def["name"],
"description": tool_def["description"],
"inputSchema": tool_def["inputSchema"],
}
)
return create_result_response(request_id, {"tools": tools})
def handle_tool_call(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool execution request."""
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
logger.info(f"Executing tool: {tool_name}")
# Validate tool name
if not tool_name:
return create_result_response(
request_id,
{"content": [{"type": "text", "text": "Error: Tool name is required"}]},
)
# Check if models are initialized
if not self.orchestrator:
result = (
"Error: Models not initialized. "
"Please set OPENROUTER_API_KEY environment variable."
)
return create_result_response(
request_id, {"content": [{"type": "text", "text": result}]}
)
# Execute tool through orchestrator
try:
# Use orchestrator to execute tool (async converted to sync)
import asyncio
# Create event loop if needed
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
output = loop.run_until_complete(
self.orchestrator.execute_tool(
tool_name=tool_name, parameters=arguments, request_id=request_id
)
)
if output.success:
result = output.result or ""
else:
result = f"Error: {output.error or 'Unknown error'}"
finally:
# Clean up loop if we created it
if asyncio.get_event_loop() is loop:
loop.close()
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
result = f"Error executing tool: {str(e)}"
return create_result_response(request_id, {"content": [{"type": "text", "text": result}]})
def run(self):
"""Run the MCP server."""
logger.info(f"Starting Council MCP Server v{__version__} (Modular)")
# Configure unbuffered output for proper MCP communication
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 1)
sys.stderr = os.fdopen(sys.stderr.fileno(), "w", 1)
# Run the JSON-RPC server
self.server.run()
# Keep GeminiMCPServer as alias for backwards compatibility
GeminiMCPServer = CouncilMCPServer
def main():
"""Main entry point."""
# Create logs directory if it doesn't exist
log_dir = os.path.expanduser("~/.claude-mcp-servers/council/logs")
os.makedirs(log_dir, exist_ok=True)
# Configure logging with both stderr and file output
log_file = os.path.join(log_dir, "council-mcp-server.log")
# Create handlers
handlers: list[logging.Handler] = [
logging.StreamHandler(sys.stderr),
RotatingFileHandler(
log_file,
mode="a",
encoding="utf-8",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5, # Keep 5 backup files
),
]
# Configure logging
# Use DEBUG level if COUNCIL_DEBUG env var is set, otherwise INFO
log_level = logging.DEBUG if os.getenv("COUNCIL_DEBUG") else logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=handlers,
force=True, # Ensure logging is configured even if already configured elsewhere
)
logger.info(f"Logging to file: {log_file}")
try:
server = CouncilMCPServer()
server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
sys.exit(1)
# ========== Tool for asking general questions via Council. ==========
from typing import Any, Dict
class AskTool(MCPTool):
"""Tool for asking general questions."""
@property
def name(self) -> str:
return "ask"
@property
def description(self) -> str:
return "Ask a general question or for help with a problem"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question or problem to ask",
},
"context": {
"type": "string",
"description": "Optional context to help understand better",
"default": "",
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["question"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
# Get parameters
question = parameters.get("question", "")
context = parameters.get("context", "")
model_override = parameters.get("model")
if not question:
return ToolOutput(success=False, error="Question is required")
# Build prompt
prompt = f"Context: {context}\n\n" if context else ""
prompt += f"Question: {question}"
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
formatted_response = f"๐ค Response:\n\n{response_text}"
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
# ========== Brainstorming tool for generating ideas and solutions. ==========
from typing import Any, Dict
class BrainstormTool(MCPTool):
"""Tool for Brainstorm."""
@property
def name(self) -> str:
return "brainstorm"
@property
def description(self) -> str:
return "Brainstorm ideas or solutions"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "The topic or problem to brainstorm about",
},
"constraints": {
"type": "string",
"description": "Any constraints or requirements to consider",
"default": "",
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["topic"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
topic = parameters.get("topic")
if not topic:
return ToolOutput(success=False, error="Topic is required for brainstorming")
constraints = parameters.get("constraints", "")
model_override = parameters.get("model")
# Build the prompt
prompt = self._build_prompt(topic, constraints)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
formatted_response = f"๐ก Brainstorming Results:\n\n{response_text}"
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, topic: str, constraints: str) -> str:
"""Build the brainstorming prompt."""
constraints_text = f"\nConstraints to consider:\n{constraints}" if constraints else ""
return f"""Let's brainstorm ideas about: {topic}{constraints_text}
Please provide:
1. Creative and innovative ideas
2. Different perspectives and approaches
3. Potential challenges and solutions
4. Actionable next steps
Be creative but practical. Think outside the box while considering feasibility."""
# ========== Code review tool for analyzing code quality and suggesting improvements. ==========
from typing import Any, Dict
class CodeReviewTool(MCPTool):
"""Tool for Code Review."""
@property
def name(self) -> str:
return "code_review"
@property
def description(self) -> str:
return "Review code for issues, improvements, or best practices"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"code": {"type": "string", "description": "The code to review"},
"language": {
"type": "string",
"description": "Programming language (e.g., python, javascript)",
"default": "javascript",
},
"focus": {
"type": "string",
"description": "Specific aspect to focus on "
"(e.g., security, performance, readability)",
"default": "general",
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["code"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
code = parameters.get("code")
if not code:
return ToolOutput(success=False, error="Code is required for review")
language = parameters.get("language", "javascript")
focus = parameters.get("focus", "general")
model_override = parameters.get("model")
# Build the prompt
prompt = self._build_prompt(code, language, focus)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
formatted_response = f"๐ Code Review:\n\n{response_text}"
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, code: str, language: str, focus: str) -> str:
"""Build the code review prompt."""
focus_instructions = {
"security": "Pay special attention to security vulnerabilities, "
"input validation, and potential exploits.",
"performance": "Focus on performance optimizations, "
"algorithmic complexity, and resource usage.",
"readability": "Emphasize code clarity, naming conventions, and maintainability.",
"best_practices": f"Review against {language} best practices and idiomatic patterns.",
"general": "Provide a comprehensive review covering all aspects.",
}
focus_text = focus_instructions.get(focus, focus_instructions["general"])
return f"""Please review the following {language} code:
```{language}
{code}
```
{focus_text}
Provide:
1. Overall assessment
2. Specific issues found (if any)
3. Suggestions for improvement
4. Examples of better implementations where applicable
Be constructive and specific in your feedback."""
# ========== Tools for multi-turn conversations with AI models. ==========
from typing import Any, Dict
# Global session manager instance (initialized by server)
_session_manager = None
def get_session_manager():
"""Get or create the session manager instance."""
global _session_manager
if _session_manager is None:
_session_manager = SessionManager()
return _session_manager
class StartConversationTool(MCPTool):
"""Tool to start a new conversation session with a model."""
@property
def name(self) -> str:
return "start_conversation"
@property
def description(self) -> str:
return (
"Start a new multi-turn conversation session with an AI model. "
"Returns a session_id to use for follow-up messages."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"model": {
"type": "string",
"description": (
"The model to converse with (e.g., 'deepseek/deepseek-r1', "
"'anthropic/claude-3-haiku'). Use list_models to see options."
),
},
"system_prompt": {
"type": "string",
"description": (
"Optional system prompt to set the model's role/context "
"(e.g., 'You are a Python expert specializing in async programming')"
),
"default": "",
},
"initial_message": {
"type": "string",
"description": ("Optional first message to send immediately after starting"),
},
},
"required": ["model"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Start a new conversation session."""
try:
model = parameters.get("model")
if not model:
return ToolOutput(success=False, error="Model is required")
system_prompt = parameters.get("system_prompt", "")
initial_message = parameters.get("initial_message")
session_manager = get_session_manager()
session_id = session_manager.create_session(
model=model,
system_prompt=system_prompt,
)
result_lines = [
"โ
**Conversation Started**",
"",
f"**Session ID:** `{session_id}`",
f"**Model:** {model}",
]
if system_prompt:
result_lines.append(f"**System Prompt:** {system_prompt[:100]}...")
# If initial message provided, send it
if initial_message:
try:
if _server_instance and _server_instance.model_manager:
response, model_used = session_manager.send_message(
session_id, initial_message, _server_instance.model_manager
)
result_lines.extend(
[
"",
"---",
f"**You:** {initial_message}",
"",
f"**{model}:** {response}",
]
)
except Exception as e:
result_lines.append(f"\nโ ๏ธ Initial message failed: {e}")
result_lines.extend(
[
"",
f"๐ก Use `continue_conversation` with session_id `{session_id}` to continue.",
]
)
return ToolOutput(success=True, result="\n".join(result_lines))
except Exception as e:
logger.error(f"Error starting conversation: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
class ContinueConversationTool(MCPTool):
"""Tool to continue an existing conversation session."""
@property
def name(self) -> str:
return "continue_conversation"
@property
def description(self) -> str:
return (
"Send a message in an existing conversation session. "
"The model will have full context of previous messages."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "The session ID from start_conversation",
},
"message": {
"type": "string",
"description": "Your message to send",
},
},
"required": ["session_id", "message"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Continue a conversation."""
try:
session_id = parameters.get("session_id")
message = parameters.get("message")
if not session_id:
return ToolOutput(success=False, error="session_id is required")
if not message:
return ToolOutput(success=False, error="message is required")
session_manager = get_session_manager()
session = session_manager.get_session(session_id)
if not session:
return ToolOutput(
success=False,
error=f"Session {session_id} not found. "
"Use list_conversations to see active sessions.",
)
# Get model manager
try:
if not _server_instance or not _server_instance.model_manager:
raise AttributeError("Model manager not available")
model_manager = _server_instance.model_manager
except (ImportError, AttributeError):
model_manager = globals().get("model_manager")
if not model_manager:
return ToolOutput(success=False, error="Model manager not available")
response, model_used = session_manager.send_message(session_id, message, model_manager)
turn_count = len(session.turns) // 2
result = (
f"**Turn {turn_count}** (Session: `{session_id}`)\n\n"
f"**You:** {message}\n\n"
f"**{session.model}:** {response}\n\n"
f"[Model: {model_used}]"
)
return ToolOutput(success=True, result=result)
except ValueError as e:
return ToolOutput(success=False, error=str(e))
except Exception as e:
logger.error(f"Error in conversation: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
class ListConversationsTool(MCPTool):
"""Tool to list active conversation sessions."""
@property
def name(self) -> str:
return "list_conversations"
@property
def description(self) -> str:
return "List all active conversation sessions with their status and preview."
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {},
"required": [],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""List active conversations."""
try:
session_manager = get_session_manager()
sessions = session_manager.list_sessions()
stats = session_manager.get_stats()
if not sessions:
return ToolOutput(
success=True,
result=(
"๐ญ **No active conversations**\n\n"
"Use `start_conversation` to begin a new session."
),
)
result_lines = [
f"๐ **Active Conversations** ({stats['active_sessions']}/{stats['max_sessions']})",
"",
]
for s in sessions:
result_lines.extend(
[
f"### `{s['session_id']}`",
f"- **Model:** {s['model']}",
f"- **Turns:** {s['turns']}",
f"- **Last Activity:** {s['last_activity']}",
f"- **Preview:** _{s['preview']}_...",
"",
]
)
result_lines.append("๐ก Use `continue_conversation` with a session_id to resume.")
return ToolOutput(success=True, result="\n".join(result_lines))
except Exception as e:
logger.error(f"Error listing conversations: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
class EndConversationTool(MCPTool):
"""Tool to end a conversation session."""
@property
def name(self) -> str:
return "end_conversation"
@property
def description(self) -> str:
return "End a conversation session. Optionally get a summary of the conversation."
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "The session ID to end",
},
"summarize": {
"type": "boolean",
"description": "Whether to return a summary of the conversation",
"default": True,
},
},
"required": ["session_id"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""End a conversation session."""
try:
session_id = parameters.get("session_id")
summarize = parameters.get("summarize", True)
if not session_id:
return ToolOutput(success=False, error="session_id is required")
session_manager = get_session_manager()
summary = session_manager.end_session(session_id, summarize=summarize)
return ToolOutput(
success=True,
result=f"โ
**Conversation Ended**\n\n{summary}",
)
except ValueError as e:
return ToolOutput(success=False, error=str(e))
except Exception as e:
logger.error(f"Error ending conversation: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
class GetConversationHistoryTool(MCPTool):
"""Tool to get the history of a conversation."""
@property
def name(self) -> str:
return "get_conversation_history"
@property
def description(self) -> str:
return "Get the full message history of a conversation session."
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "The session ID",
},
"limit": {
"type": "integer",
"description": "Limit to last N turns (optional)",
},
},
"required": ["session_id"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Get conversation history."""
try:
session_id = parameters.get("session_id")
limit = parameters.get("limit")
if not session_id:
return ToolOutput(success=False, error="session_id is required")
session_manager = get_session_manager()
session = session_manager.get_session(session_id)
if not session:
return ToolOutput(success=False, error=f"Session {session_id} not found")
history = session_manager.get_history(session_id, limit=limit)
result_lines = [
f"๐ **Conversation History** (`{session_id}`)",
f"**Model:** {session.model}",
"",
]
if session.system_prompt:
result_lines.extend(
[
f"**System:** {session.system_prompt}",
"",
"---",
"",
]
)
for i, turn in enumerate(history):
role = "You" if turn["role"] == "user" else session.model
result_lines.append(f"**{role}:** {turn['content']}")
result_lines.append("")
return ToolOutput(success=True, result="\n".join(result_lines))
except ValueError as e:
return ToolOutput(success=False, error=str(e))
except Exception as e:
logger.error(f"Error getting history: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
# ========== Debug tool for structured debugging with hypothesis tracking. ==========
from typing import Any, Dict, List, Optional
class DebugTool(MCPTool):
"""Tool for structured debugging with hypothesis ranking and verification guidance."""
@property
def name(self) -> str:
return "debug"
@property
def description(self) -> str:
return (
"Analyze an error or bug with structured hypothesis generation. "
"Provides ranked probable causes, verification steps, and recommended next actions. "
"Tracks previous attempts to prevent circular debugging."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"error_message": {
"type": "string",
"description": "The error message or symptom description",
},
"code_context": {
"type": "string",
"description": "Relevant code where the error occurs",
},
"stack_trace": {
"type": "string",
"description": "Stack trace if available",
},
"previous_attempts": {
"type": "array",
"items": {"type": "string"},
"description": "List of fixes already tried that didn't work",
},
"environment": {
"type": "string",
"description": "Runtime environment (e.g., 'Python 3.13', 'Node 20')",
"default": "Python 3.x",
},
"session_id": {
"type": "string",
"description": (
"Optional session ID from start_conversation to track "
"debugging history across multiple debug calls"
),
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["error_message", "code_context"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute structured debugging analysis."""
try:
error_message = parameters.get("error_message")
code_context = parameters.get("code_context")
if not error_message:
return ToolOutput(success=False, error="error_message is required")
if not code_context:
return ToolOutput(success=False, error="code_context is required")
stack_trace = parameters.get("stack_trace", "")
previous_attempts = parameters.get("previous_attempts", [])
environment = parameters.get("environment", "Python 3.x")
session_id = parameters.get("session_id")
model_override = parameters.get("model")
# Get debugging history from session if available
session_context = ""
if session_id:
session_context = self._get_session_context(session_id)
# Build the prompt
prompt = self._build_prompt(
error_message=error_message,
code_context=code_context,
stack_trace=stack_trace,
previous_attempts=previous_attempts,
environment=environment,
session_context=session_context,
)
# Get model manager
try:
if _server_instance and _server_instance.model_manager:
model_manager = _server_instance.model_manager
else:
raise AttributeError("Server instance not available")
except (ImportError, AttributeError):
model_manager = globals().get("model_manager")
if not model_manager:
return ToolOutput(success=False, error="Model manager not available")
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
# Format response
formatted_response = self._format_response(
response_text,
model_used,
session_id,
len(previous_attempts),
)
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Debug tool error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _get_session_context(self, session_id: str) -> str:
"""Get previous debugging context from session if available."""
try:
session_manager = get_session_manager()
session = session_manager.get_session(session_id)
if session and session.turns:
# Extract recent debugging history
recent_turns = session.turns[-6:] # Last 3 exchanges
context_parts = ["## Previous Debugging Context"]
for turn in recent_turns:
role = "User" if turn.role == "user" else "Assistant"
# Truncate long content
content = (
turn.content[:500] + "..." if len(turn.content) > 500 else turn.content
)
context_parts.append(f"**{role}:** {content}")
return "\n\n".join(context_parts)
except Exception as e:
logger.debug(f"Could not get session context: {e}")
return ""
def _build_prompt(
self,
error_message: str,
code_context: str,
stack_trace: str,
previous_attempts: List[str],
environment: str,
session_context: str,
) -> str:
"""Build the structured debugging prompt."""
parts = [
"You are a senior debugging expert. Analyze the following error and provide "
"a structured debugging analysis with ranked hypotheses.",
"",
"## Environment",
environment,
"",
"## Error Message",
"```",
error_message,
"```",
"",
"## Code Context",
"```",
code_context,
"```",
]
if stack_trace:
parts.extend(
[
"",
"## Stack Trace",
"```",
f"{stack_trace}",
"```",
]
)
if previous_attempts:
parts.extend(
[
"",
"## Previous Attempts (Already Tried)",
"The following fixes have been attempted but DID NOT solve the issue:",
]
)
for i, attempt in enumerate(previous_attempts, 1):
parts.append(f"{i}. {attempt}")
parts.append("")
parts.append("**Important:** Do NOT suggest these approaches again.")
if session_context:
parts.extend(
[
"",
session_context,
]
)
parts.extend(
[
"",
"## Required Output Format",
"",
"Provide your analysis in this exact structure:",
"",
"### Root Cause Analysis",
"[Explain the most likely root cause based on the evidence. "
"Be specific about what's happening and why.]",
"",
"### Hypotheses (Ranked by Probability)",
"",
"**1. [Most Likely Cause]** (Confidence: High/Medium/Low)",
"- Evidence: [What points to this being the issue]",
"- Verification: [Concrete step to confirm this is the cause]",
"- Fix Strategy: [High-level approach, not full implementation yet]",
"",
"**2. [Second Most Likely]** (Confidence: High/Medium/Low)",
"[Same structure...]",
"",
"(Provide 2-4 hypotheses)",
"",
"### Recommended Next Step",
"[Single, concrete action to take RIGHT NOW to make progress. "
"Focus on verification before fix.]",
"",
"### What NOT to Try",
"[Anti-patterns or approaches that won't work for this specific issue, "
"especially based on previous attempts]",
]
)
return "\n".join(parts)
def _format_response(
self,
response_text: str,
model_used: str,
session_id: Optional[str],
attempt_count: int,
) -> str:
"""Format the debugging response."""
header_parts = ["# Debugging Analysis"]
if attempt_count > 0:
header_parts.append(f"*Attempt #{attempt_count + 1}*")
if session_id:
header_parts.append(f"*Session: `{session_id}`*")
header = " | ".join(header_parts)
return f"{header}\n\n{response_text}\n\n[Model: {model_used}]"
# ========== Explanation tool for understanding complex code or concepts. ==========
from typing import Any, Dict
class ExplainTool(MCPTool):
"""Tool for Explain."""
@property
def name(self) -> str:
return "explain"
@property
def description(self) -> str:
return "Explain complex code or concepts"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"topic": {"type": "string", "description": "Code or concept to explain"},
"level": {
"type": "string",
"description": "Explanation level (beginner, intermediate, expert)",
"default": "intermediate",
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["topic"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
topic = parameters.get("topic")
if not topic:
return ToolOutput(success=False, error="Topic is required for explanation")
level = parameters.get("level", "intermediate")
model_override = parameters.get("model")
# Build the prompt
prompt = self._build_prompt(topic, level)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
formatted_response = f"๐ Explanation:\n\n{response_text}"
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, topic: str, level: str) -> str:
"""Build the explanation prompt."""
level_instructions = {
"beginner": """Explain this as if to someone new to programming:
- Use simple language and avoid jargon
- Provide analogies to everyday concepts
- Break down complex ideas into simple steps
- Include examples that build understanding gradually""",
"intermediate": """Explain this to someone with programming experience:
- Assume familiarity with basic programming concepts
- Focus on the key insights and patterns
- Include practical examples and use cases
- Mention common pitfalls and best practices""",
"expert": """Provide an in-depth technical explanation:
- Include implementation details and edge cases
- Discuss performance implications and trade-offs
- Reference relevant algorithms, data structures, or design patterns
- Compare with alternative approaches""",
}
level_text = level_instructions.get(level, level_instructions["intermediate"])
return f"""Please explain the following:
{topic}
{level_text}
Structure your explanation with:
1. Overview/Summary
2. Detailed explanation
3. Examples (if applicable)
4. Key takeaways"""
# ========== Tool for listing available LLM models. ==========
from typing import Any, Dict
class ListModelsTool(MCPTool):
"""Tool for listing available models from OpenRouter."""
@property
def name(self) -> str:
return "list_models"
@property
def description(self) -> str:
return (
"List available LLM models with optional filtering by provider, "
"capability, or free tier. Returns model names, context lengths, and pricing."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"provider": {
"type": "string",
"description": (
"Filter by provider (e.g., 'google', 'anthropic', 'openai', "
"'meta', 'mistral')"
),
},
"capability": {
"type": "string",
"description": (
"Filter by capability (e.g., 'vision', 'code', 'function_calling')"
),
},
"free_only": {
"type": "boolean",
"description": "Only show free tier models",
"default": False,
},
"search": {
"type": "string",
"description": "Search models by name or ID",
},
"limit": {
"type": "integer",
"description": "Maximum number of models to return",
"default": 20,
},
},
"required": [],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
# Get parameters
provider = parameters.get("provider")
capability = parameters.get("capability")
free_only = parameters.get("free_only", False)
search = parameters.get("search")
limit = parameters.get("limit", 20)
# Get manager from server instance
try:
if _server_instance and hasattr(_server_instance, "council_manager"):
manager = _server_instance.council_manager
elif _server_instance and hasattr(_server_instance, "model_manager"):
# Fallback to old model_manager for compatibility
manager = _server_instance.model_manager
else:
raise AttributeError("Manager not available")
except (ImportError, AttributeError):
# Fallback for bundled mode
manager = globals().get("council_manager") or globals().get("model_manager")
if not manager:
return ToolOutput(success=False, error="Model manager not available")
# Get models from manager
if hasattr(manager, "list_models"):
models = manager.list_models()
else:
return ToolOutput(
success=False,
error="Manager does not support listing models",
)
# Apply filters
filtered = ModelFilter.apply_filters(
models,
provider=provider,
capability=capability,
free_only=free_only,
search=search,
limit=limit,
)
# Format output
if not filtered:
return ToolOutput(
success=True,
result="No models found matching the criteria.",
)
# Import model registry for enhanced metadata
has_registry = False
try:
has_registry = True
except ImportError:
pass
result_lines = [f"๐ Found {len(filtered)} models:\n"]
for model in filtered:
# Format context length nicely
ctx = model.context_length
if ctx >= 1_000_000:
ctx_str = f"{ctx // 1_000_000}M"
elif ctx >= 1_000:
ctx_str = f"{ctx // 1_000}K"
else:
ctx_str = str(ctx)
# Build model line
line = f"โข {model.id}"
# Add model class from registry if available
if has_registry:
metadata = get_model_metadata(model.id)
if metadata:
line += f" [{metadata.model_class.value.upper()}]"
if model.is_free:
line += " [FREE]"
line += f" - {ctx_str} context"
if model.capabilities:
caps = ", ".join(model.capabilities)
line += f" ({caps})"
result_lines.append(line)
# Add legend
result_lines.extend(
[
"",
"Legend: [FLASH] Fast/cheap | [PRO] Balanced | [DEEP] Max quality",
"Use `recommend_model` for task-specific guidance.",
]
)
return ToolOutput(success=True, result="\n".join(result_lines))
except Exception as e:
logger.error(f"Error listing models: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
# ========== Tool for recommending models based on task type. ==========
from typing import Any, Dict
class RecommendModelTool(MCPTool):
"""Tool for recommending the best model for a specific task."""
@property
def name(self) -> str:
return "recommend_model"
@property
def description(self) -> str:
return (
"Recommend the best AI model for a specific task. "
"Provides curated recommendations based on benchmarks and usage data. "
"Task types: coding, code_review, reasoning, creative, vision, long_context, general."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": (
"Type of task: 'coding', 'code_review', 'reasoning', "
"'creative', 'vision', 'long_context', or 'general'"
),
"enum": [
"coding",
"code_review",
"reasoning",
"creative",
"vision",
"long_context",
"general",
],
},
"prefer_free": {
"type": "boolean",
"description": "Prefer free tier models if available",
"default": False,
},
"prefer_fast": {
"type": "boolean",
"description": "Prefer faster (flash-class) models over quality",
"default": False,
},
"min_context": {
"type": "integer",
"description": "Minimum context length needed (in tokens)",
},
},
"required": ["task"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
task_str = parameters.get("task", "general")
prefer_free = parameters.get("prefer_free", False)
# TODO: Implement prefer_fast and min_context filtering
_ = parameters.get("prefer_fast", False)
_ = parameters.get("min_context")
# Parse task type
try:
task = TaskType(task_str)
except ValueError:
task = TaskType.GENERAL
# Get recommendations
recommendations = get_recommendations_for_task(task, limit=5)
# Build response
result_lines = [
f"๐ฏ **Model Recommendations for {task.value.replace('_', ' ').title()}**",
"",
]
# If prefer_free, show free options first
if prefer_free:
result_lines.append("### Free Tier Options")
for model_id in FREE_TIER_MODELS:
result_lines.append(f"โข {model_id}")
result_lines.append("")
# Main recommendations
result_lines.append("### Top Recommendations")
for i, model_id in enumerate(recommendations, 1):
metadata = get_model_metadata(model_id)
if metadata:
# Get strength for this task
strength = metadata.strengths.get(task, "B")
# Build model line
model_name = model_id.split("/")[1]
class_badge = f"[{metadata.model_class.value.upper()}]"
line = f"{i}. **{model_name}** {class_badge} (Rating: {strength})"
if metadata.description:
line += f"\n _{metadata.description}_"
result_lines.append(line)
else:
# Fallback for models not in registry
result_lines.append(f"{i}. {model_id}")
# Add class guide
result_lines.extend(
[
"",
"### Model Classes",
f"โข **Flash**: {get_model_class_description(ModelClass.FLASH)}",
f"โข **Pro**: {get_model_class_description(ModelClass.PRO)}",
f"โข **Deep**: {get_model_class_description(ModelClass.DEEP)}",
"",
"### Rating Scale",
"S = Best in class | A = Excellent | B = Good | C = Adequate",
]
)
# Add notes for specific tasks
task_notes = {
TaskType.CODING: (
"\n๐ก **Tip**: Claude Sonnet 4 leads SWE-bench (77-82%). "
"For web dev, Gemini 2.5 Pro leads WebDev Arena."
),
TaskType.REASONING: (
"\n๐ก **Tip**: DeepSeek R1 uses reinforcement learning for "
"step-by-step reasoning. Gemini 3 Pro scores 86.4 on GPQA."
),
TaskType.VISION: (
"\n๐ก **Tip**: Gemini Flash handles 50%+ of vision workloads on OpenRouter. "
"Great balance of speed and quality for image tasks."
),
TaskType.LONG_CONTEXT: (
"\n๐ก **Tip**: Gemini models support up to 1M tokens. "
"Llama 4 Scout handles up to 10M tokens for extreme cases."
),
}
if task in task_notes:
result_lines.append(task_notes[task])
return ToolOutput(success=True, result="\n".join(result_lines))
except Exception as e:
logger.error(f"Error recommending model: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
# ========== Refactor tool for atomic refactoring plans with before/after examples. ==========
from typing import Any, Dict
class RefactorTool(MCPTool):
"""Tool for generating safe, atomic refactoring plans with before/after code examples."""
REFACTORING_GOALS = [
"extract_method",
"simplify_logic",
"improve_naming",
"reduce_complexity",
"modernize_syntax",
"remove_duplication",
"improve_error_handling",
]
@property
def name(self) -> str:
return "refactor"
@property
def description(self) -> str:
return (
"Generate a safe, step-by-step refactoring plan with before/after code examples. "
"Provides atomic refactoring steps that preserve behavior while improving "
"code quality. Supports: extract_method, simplify_logic, improve_naming, "
"reduce_complexity, modernize_syntax, remove_duplication, improve_error_handling."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The code to refactor",
},
"goal": {
"type": "string",
"enum": self.REFACTORING_GOALS,
"description": (
"The refactoring goal: extract_method, simplify_logic, "
"improve_naming, reduce_complexity, modernize_syntax, "
"remove_duplication, or improve_error_handling"
),
},
"language": {
"type": "string",
"description": "Programming language (e.g., python, javascript, typescript)",
"default": "python",
},
"context": {
"type": "string",
"description": (
"Optional context about how the code is used, "
"constraints, or additional requirements"
),
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["code", "goal"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the refactoring analysis."""
try:
code = parameters.get("code")
goal = parameters.get("goal")
if not code:
return ToolOutput(success=False, error="code is required")
if not goal:
return ToolOutput(success=False, error="goal is required")
if goal not in self.REFACTORING_GOALS:
return ToolOutput(
success=False,
error=f"Invalid goal. Must be one of: {', '.join(self.REFACTORING_GOALS)}",
)
language = parameters.get("language", "python")
context = parameters.get("context", "")
model_override = parameters.get("model")
# Build the prompt
prompt = self._build_prompt(code, goal, language, context)
# Get model manager
try:
if _server_instance and _server_instance.model_manager:
model_manager = _server_instance.model_manager
else:
raise AttributeError("Server instance not available")
except (ImportError, AttributeError):
model_manager = globals().get("model_manager")
if not model_manager:
return ToolOutput(success=False, error="Model manager not available")
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
# Format response
formatted_response = self._format_response(response_text, model_used, goal)
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Refactor tool error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(
self,
code: str,
goal: str,
language: str,
context: str,
) -> str:
"""Build the refactoring prompt."""
goal_descriptions = {
"extract_method": (
"Extract cohesive code blocks into well-named methods/functions. "
"Identify logical groupings, determine parameters and return values, "
"and create clear function signatures."
),
"simplify_logic": (
"Simplify complex conditional logic, reduce nesting, "
"apply early returns, and make the control flow clearer."
),
"improve_naming": (
"Improve variable, function, and class names to better express intent. "
"Apply consistent naming conventions appropriate for the language."
),
"reduce_complexity": (
"Reduce cyclomatic complexity by breaking down large functions, "
"simplifying conditions, and improving code structure."
),
"modernize_syntax": (
f"Update the code to use modern {language} syntax and idioms. "
"Apply current best practices and language features."
),
"remove_duplication": (
"Identify and remove duplicate code by extracting common patterns "
"into reusable functions, classes, or utilities."
),
"improve_error_handling": (
"Improve error handling with proper exception types, "
"meaningful error messages, and appropriate recovery strategies."
),
}
goal_description = goal_descriptions.get(goal, "Improve the code quality.")
parts = [
"You are an expert software engineer specializing in code refactoring.",
f"Your task is to provide a **{goal.replace('_', ' ')}** refactoring plan.",
"",
f"## Goal: {goal.replace('_', ' ').title()}",
goal_description,
"",
f"## Language: {language}",
"",
"## Code to Refactor",
f"```{language}",
code,
"```",
]
if context:
parts.extend(
[
"",
"## Additional Context",
context,
]
)
parts.extend(
[
"",
"## Required Output Format",
"",
"Provide your refactoring plan in this exact structure:",
"",
"### Analysis",
"[Explain what's problematic about the current code and why refactoring helps]",
"",
"### Refactoring Plan",
"",
"**Step 1: [Action Name]**",
"- What: [Specific change to make]",
"- Why: [Benefit of this change]",
"- Risk: Low/Medium/High",
"",
"[Add more steps as needed...]",
"",
"### Before",
f"```{language}",
"[Original code - copy the exact code provided]",
"```",
"",
"### After",
f"```{language}",
"[Fully refactored code - complete, runnable implementation]",
"```",
"",
"### Verification Steps",
"1. [How to verify the refactor didn't break functionality]",
"2. [Tests to run or behavior to check]",
"",
"### Notes",
"[Any caveats, edge cases to watch, or follow-up improvements to consider]",
"",
"**Important Guidelines:**",
"- The refactored code MUST be functionally equivalent to the original",
"- Each step should be atomic and independently verifiable",
"- Provide complete, copy-pasteable code in the After section",
"- Highlight any behavioral changes (even if improvements)",
]
)
return "\n".join(parts)
def _format_response(
self,
response_text: str,
model_used: str,
goal: str,
) -> str:
"""Format the refactoring response."""
goal_title = goal.replace("_", " ").title()
header = f"# Refactoring Plan: {goal_title}"
return f"{header}\n\n{response_text}\n\n[Model: {model_used}]"
# ========== Server information tool for checking status and configuration. ==========
from typing import Any, Dict
__version__ = "4.0.0"
class ServerInfoTool(MCPTool):
"""Tool for getting server information and status."""
@property
def name(self) -> str:
return "server_info"
@property
def description(self) -> str:
return "Get server version and status"
@property
def input_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}, "required": []}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
# Access the server components through global context
# In bundled mode, will be set as global _server_instance
server = None
# Try modular approach first
try:
server = getattr(council, "_server_instance", None)
except ImportError:
pass
# Fall back to global _server_instance (for bundled mode)
if not server:
server = globals().get("_server_instance", None)
# Declare info variable
info: Dict[str, Any]
if not server:
# Fallback to basic info if server instance not available
info = {
"version": __version__,
"architecture": "modular",
"backend": "OpenRouter",
"status": "running",
"note": "Full stats unavailable - server instance not accessible",
}
else:
# Get list of available tools from registry
registered_tools = server.tool_registry.list_tools()
info = {
"version": __version__,
"architecture": "modular",
"backend": "OpenRouter",
"available_tools": registered_tools,
"components": {
"tools_registered": len(registered_tools),
"cache_stats": server.cache.get_stats() if server.cache else None,
"memory_stats": server.memory.get_stats() if server.memory else None,
},
"models": self._get_model_info(server.model_manager),
}
if server.orchestrator:
info["execution_stats"] = server.orchestrator.get_execution_stats()
# Add quick reference guide
quick_guide = self._get_quick_guide()
json_info = json.dumps(info, indent=2)
result = f"Council MCP Server v{__version__}\n\n{json_info}\n\n{quick_guide}"
return ToolOutput(success=True, result=result)
except Exception as e:
return ToolOutput(success=False, error=f"Error getting server info: {str(e)}")
def _get_model_info(self, model_manager) -> Dict[str, Any]:
"""Get model manager information."""
if not model_manager:
return {"initialized": False}
info: Dict[str, Any] = {
"initialized": True,
"default_model": getattr(model_manager, "default_model", None),
"active_model": getattr(model_manager, "active_model", None),
}
# Get stats if available
try:
stats = model_manager.get_stats()
if stats:
info["stats"] = stats
except Exception:
pass
# Get model cache info if available
try:
cache = getattr(model_manager, "model_cache", None)
if cache:
info["model_cache"] = cache.get_stats()
except Exception:
pass
return info
def _get_quick_guide(self) -> str:
"""Generate a quick model selection guide."""
return """## Quick Model Selection Guide
**By Task Type:**
โข Coding/Code Review โ Claude Sonnet 4, Claude 3.5 Sonnet
โข Reasoning/Math โ DeepSeek R1, Gemini 3 Pro
โข Vision/Images โ Gemini 2.5 Flash, Gemini 2.5 Pro
โข Web Development โ Gemini 2.5 Pro (leads WebDev Arena)
โข Long Documents โ Gemini (1M tokens), Llama 4 Scout (10M)
โข General/Creative โ Claude 3.5 Sonnet, GPT-4o
**Model Classes:**
โข FLASH: Fast & cheap (Haiku, GPT-4o-mini, Gemini Flash)
โข PRO: Balanced quality/cost (Sonnet, GPT-4o, Gemini Pro)
โข DEEP: Maximum quality (Opus, o1, DeepSeek R1)
**Free Tier Options:**
โข meta-llama/llama-3.3-70b-instruct:free
โข deepseek/deepseek-chat:free
โข qwen/qwen-2.5-72b-instruct:free
๐ก Use `recommend_model` tool for detailed task-specific recommendations."""
# ========== Tool for setting the active LLM model. ==========
from typing import Any, Dict
class SetModelTool(MCPTool):
"""Tool for changing the active model for subsequent requests."""
@property
def name(self) -> str:
return "set_model"
@property
def description(self) -> str:
return (
"Change the active LLM model for subsequent requests. "
"Use list_models to see available options."
)
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"model": {
"type": "string",
"description": (
"The model ID to use (e.g., 'google/gemini-3-pro-preview', "
"'anthropic/claude-3-opus')"
),
},
},
"required": ["model"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
model_id = parameters.get("model", "").strip()
if not model_id:
return ToolOutput(success=False, error="Model ID is required")
# Get manager from server instance
try:
if _server_instance and hasattr(_server_instance, "council_manager"):
manager = _server_instance.council_manager
elif _server_instance and hasattr(_server_instance, "model_manager"):
manager = _server_instance.model_manager
else:
raise AttributeError("Manager not available")
except (ImportError, AttributeError):
manager = globals().get("council_manager") or globals().get("model_manager")
if not manager:
return ToolOutput(success=False, error="Model manager not available")
# Check if the model exists (optional validation)
if hasattr(manager, "get_model_info"):
model_info = manager.get_model_info(model_id)
if model_info:
logger.info(f"Setting model to {model_id} ({model_info.name})")
else:
logger.warning(f"Model {model_id} not found in cache, setting anyway")
# Set the model
if hasattr(manager, "set_model"):
success = manager.set_model(model_id)
if success:
# Get model info for response
active = getattr(manager, "active_model", model_id)
return ToolOutput(
success=True,
result=f"โ Active model changed to: {active}",
)
else:
return ToolOutput(
success=False,
error=f"Failed to set model to {model_id}",
)
else:
return ToolOutput(
success=False,
error="Manager does not support setting models",
)
except Exception as e:
logger.error(f"Error setting model: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
# ========== Synthesis tool for combining multiple perspectives into cohesive insights. ==========
from typing import Any, Dict, List
class SynthesizeTool(MCPTool):
"""Tool for Synthesize."""
@property
def name(self) -> str:
return "synthesize_perspectives"
@property
def description(self) -> str:
return "Synthesize multiple viewpoints or pieces of information into a coherent summary"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"topic": {"type": "string", "description": "The topic or question being addressed"},
"perspectives": {
"type": "array",
"description": "List of different perspectives or pieces of information",
"items": {
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Source or viewpoint identifier",
},
"content": {
"type": "string",
"description": "The perspective or information",
},
},
"required": ["content"],
},
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["topic", "perspectives"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
topic = parameters.get("topic")
if not topic:
return ToolOutput(success=False, error="Topic is required for synthesis")
perspectives = parameters.get("perspectives", [])
if not perspectives:
return ToolOutput(success=False, error="At least one perspective is required")
model_override = parameters.get("model")
# Build the prompt
prompt = self._build_prompt(topic, perspectives)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
formatted_response = f"๐ Synthesis:\n\n{response_text}"
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, topic: str, perspectives: List[Dict[str, str]]) -> str:
"""Build the synthesis prompt."""
perspectives_text = "\n\n".join(
[
f"**{p.get('source', f'Perspective {i+1}')}:**\n{p['content']}"
for i, p in enumerate(perspectives)
]
)
return f"""Please synthesize the following perspectives on: {topic}
{perspectives_text}
Provide a balanced synthesis that:
1. Identifies common themes and agreements
2. Highlights key differences and tensions
3. Evaluates the strengths and weaknesses of each perspective
4. Proposes a unified understanding or framework
5. Suggests actionable insights or next steps
Be objective and fair to all viewpoints while providing critical analysis."""
# ========== Test case generation tool for suggesting comprehensive test scenarios. ==========
from typing import Any, Dict
class TestCasesTool(MCPTool):
"""Tool for Test Cases."""
@property
def name(self) -> str:
return "test_cases"
@property
def description(self) -> str:
return "Suggest test cases for code or features"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"code_or_feature": {
"type": "string",
"description": "Code snippet or feature description",
},
"test_type": {
"type": "string",
"description": "Type of tests (unit, integration, edge cases)",
"default": "all",
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g., 'anthropic/claude-3-opus'). "
"Use list_models to see available options."
),
},
},
"required": ["code_or_feature"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
code_or_feature = parameters.get("code_or_feature")
if not code_or_feature:
return ToolOutput(success=False, error="Code or feature description is required")
test_type = parameters.get("test_type", "all")
model_override = parameters.get("model")
# Build the prompt
prompt = self._build_prompt(code_or_feature, test_type)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt, model=model_override)
formatted_response = f"๐งช Test Cases:\n\n{response_text}"
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, code_or_feature: str, test_type: str) -> str:
"""Build the test case generation prompt."""
test_type_instructions = {
"unit": "Focus on unit tests that test individual functions or methods in isolation.",
"integration": "Focus on integration tests that verify "
"components work together correctly.",
"edge": "Focus on edge cases, boundary conditions, and error scenarios.",
"performance": "Include performance and load testing scenarios.",
"all": "Provide comprehensive test cases covering all aspects.",
}
test_focus = test_type_instructions.get(test_type, test_type_instructions["all"])
# Detect if input is code or feature description
is_code = any(
indicator in code_or_feature
for indicator in ["def ", "function", "class", "{", "=>", "()"]
)
input_type = "code" if is_code else "feature"
return f"""Please suggest test cases for the following {input_type}:
{code_or_feature}
{test_focus}
For each test case, provide:
1. Test name/description
2. Input/setup required
3. Expected behavior/output
4. Why this test is important
Include both positive (happy path) and negative (error) test cases."""
# ========== Tool Registry Override for Bundled Operation ==========
# Store the bundled tool classes globally
BUNDLED_TOOL_CLASSES = [
AskTool,
BrainstormTool,
CodeReviewTool,
StartConversationTool,
ContinueConversationTool,
ListConversationsTool,
EndConversationTool,
GetConversationHistoryTool,
DebugTool,
ExplainTool,
ListModelsTool,
RecommendModelTool,
RefactorTool,
ServerInfoTool,
SetModelTool,
SynthesizeTool,
TestCasesTool,
]
# Override the ToolRegistry's discover_tools method for bundled operation
def _bundled_discover_tools(self, tools_path: Optional[Path] = None) -> None:
"""Discover and register all tools - bundled version."""
# Clear any existing tools to ensure clean state
self._tools.clear()
self._tool_classes.clear()
logger.info("Registering bundled tools")
for tool_class in BUNDLED_TOOL_CLASSES:
try:
tool_instance = tool_class()
tool_name = tool_instance.name # Use property access
self._tools[tool_name] = tool_instance
self._tool_classes[tool_name] = tool_class
logger.info(f"Registered tool: {tool_name}")
except Exception as e:
logger.error(f"Failed to register tool {tool_class.__name__}: {e}")
logger.info(f"Registered {len(self._tools)} tools in bundled mode")
# Function to apply the override - will be called from main()
def _apply_tool_registry_override():
"""Apply the bundled tool registry override."""
ToolRegistry.discover_tools = _bundled_discover_tools
# ========== Main Execution ==========
if __name__ == "__main__":
# Apply the tool registry override before running
_apply_tool_registry_override()
# Call the main function from the bundled code
main()