server.py•73.6 kB
#!/usr/bin/env python3
# flake8: noqa
"""
Gemini MCP Server - Single File Bundle
MCP server that enables Claude to collaborate with Gemini AI models.
This version combines all modular components into a single deployable file.
Generated by bundler.
"""
import asyncio
import collections
import hashlib
import importlib
import inspect
import json
import logging
import os
import sys
import time
from abc import ABC, abstractmethod
from collections import OrderedDict, deque
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type
import google.generativeai as genai
from google.api_core import exceptions as google_exceptions
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
# Create logger without configuring (main() will configure)
logger = logging.getLogger("gemini-mcp")
__version__ = "3.0.0"
# Global model manager instance (will be set by server)
model_manager = None
# Create gemini_mcp namespace for bundled mode
class _GeminiMCP:
_server_instance = None
gemini_mcp = _GeminiMCP()
# ========== Standalone JSON-RPC 2.0 implementation for MCP servers. Based on Gemini's rec... ==========
from typing import Any, Callable, Dict, Optional
# JSON-RPC 2.0 constants
JSONRPC_VERSION = "2.0"
ERROR_PARSE = -32700
ERROR_INVALID_REQUEST = -32600
ERROR_METHOD_NOT_FOUND = -32601
ERROR_INVALID_PARAMS = -32602
ERROR_INTERNAL = -32603
class JsonRpcRequest:
"""JSON-RPC 2.0 Request"""
def __init__(self, data: dict):
self.jsonrpc = data.get("jsonrpc", JSONRPC_VERSION)
self.method = data.get("method")
self.params = data.get("params", {})
self.id = data.get("id")
# Validate
if self.jsonrpc != JSONRPC_VERSION:
raise ValueError(f"Invalid JSON-RPC version: {self.jsonrpc}")
if not self.method:
raise ValueError("Missing method")
class JsonRpcResponse:
"""JSON-RPC 2.0 Response"""
def __init__(self, result: Any = None, error: Optional[Dict[str, Any]] = None, id: Any = None):
self.jsonrpc = JSONRPC_VERSION
self.id = id
if error is not None:
self.error = error
else:
self.result = result
def to_dict(self) -> dict:
d = {"jsonrpc": self.jsonrpc, "id": self.id}
if hasattr(self, "error"):
d["error"] = self.error
else:
d["result"] = self.result if hasattr(self, "result") else None
return d
class JsonRpcError:
"""JSON-RPC 2.0 Error"""
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
def to_dict(self) -> dict:
d = {"code": self.code, "message": self.message}
if self.data is not None:
d["data"] = self.data
return d
class JsonRpcServer:
"""
A synchronous JSON-RPC 2.0 server over stdio.
Compatible with MCP protocol expectations.
"""
def __init__(self, server_name: str):
self.server_name = server_name
self._handlers: Dict[str, Callable] = {}
self._running = False
def register_handler(self, method: str, handler: Callable):
"""Register a handler for a JSON-RPC method."""
logger.info(f"Registering handler for method: {method}")
self._handlers[method] = handler
def _read_message(self) -> Optional[str]:
"""Read a single line from stdin."""
try:
line = sys.stdin.readline()
if not line:
return None
return line.strip()
except Exception as e:
logger.error(f"Error reading from stdin: {e}")
return None
def _write_message(self, message: dict):
"""Write a JSON message to stdout."""
try:
print(json.dumps(message), flush=True)
except Exception as e:
logger.error(f"Error writing to stdout: {e}")
def _process_request(self, request_str: str) -> Optional[dict]:
"""Process a single JSON-RPC request."""
request_id = None
try:
# Parse JSON
try:
request_data = json.loads(request_str)
except json.JSONDecodeError as e:
return JsonRpcResponse(
error=JsonRpcError(ERROR_PARSE, f"Parse error: {e}").to_dict()
).to_dict()
# Parse request
try:
request = JsonRpcRequest(request_data)
request_id = request.id
except ValueError as e:
return JsonRpcResponse(
error=JsonRpcError(ERROR_INVALID_REQUEST, str(e)).to_dict(), id=request_id
).to_dict()
# Find handler
handler = self._handlers.get(request.method) if request.method else None
if not handler:
return JsonRpcResponse(
error=JsonRpcError(
ERROR_METHOD_NOT_FOUND, f"Method not found: {request.method}"
).to_dict(),
id=request_id,
).to_dict()
# Execute handler
try:
result = handler(request_id, request.params)
# Handler returns a complete response dict
return result
except Exception as e:
logger.error(f"Handler error for {request.method}: {e}", exc_info=True)
return JsonRpcResponse(
error=JsonRpcError(ERROR_INTERNAL, f"Internal error: {str(e)}").to_dict(),
id=request_id,
).to_dict()
except Exception as e:
logger.error(f"Unexpected error processing request: {e}", exc_info=True)
return JsonRpcResponse(
error=JsonRpcError(ERROR_INTERNAL, f"Internal error: {str(e)}").to_dict(),
id=request_id,
).to_dict()
def run(self):
"""Run the JSON-RPC server (synchronous)."""
logger.info(f"Starting JSON-RPC server '{self.server_name}'...")
self._running = True
while self._running:
try:
# Read a line
line = self._read_message()
if line is None:
logger.info("EOF reached, shutting down")
break
if not line:
continue
# Process the request
response = self._process_request(line)
# Send response if any
if response:
self._write_message(response)
except KeyboardInterrupt:
logger.info("Keyboard interrupt, shutting down")
break
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
continue
logger.info("Server stopped")
def stop(self):
"""Stop the server."""
self._running = False
# MCP-compatible type definitions (simple dicts instead of Pydantic)
def create_text_content(text: str) -> dict:
"""Create a text content object."""
return {"type": "text", "text": text}
def create_tool(name: str, description: str, input_schema: dict) -> dict:
"""Create a tool definition."""
return {"name": name, "description": description, "inputSchema": input_schema}
def create_error_response(request_id: Any, code: int, message: str) -> dict:
"""Create an error response."""
return {
"jsonrpc": JSONRPC_VERSION,
"id": request_id,
"error": {"code": code, "message": message},
}
def create_result_response(request_id: Any, result: Any) -> dict:
"""Create a result response."""
return {"jsonrpc": JSONRPC_VERSION, "id": request_id, "result": result}
# ========== Base models for tools and responses. ==========
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
@dataclass
class ToolMetadata:
"""Metadata for a tool."""
name: str
description: str
version: str = "1.0.0"
author: str = "gemini-mcp"
tags: List[str] = field(default_factory=list)
@dataclass
class ToolInput:
"""Base class for tool inputs."""
tool_name: str
parameters: Dict[str, Any]
context: Optional[Dict[str, Any]] = None
request_id: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ToolOutput:
"""Base class for tool outputs."""
tool_name: str
result: Any
success: bool = True
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
execution_time_ms: Optional[float] = None
model_used: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
# ========== Model manager for handling dual-model configuration with fallback. ==========
import os
import traceback
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from typing import Tuple
import google.generativeai as genai
from google.api_core import exceptions as google_exceptions
class DualModelManager:
"""Manages primary and fallback Gemini models with automatic failover."""
def __init__(self, api_key: str):
"""Initialize the model manager with API key and model configuration."""
genai.configure(api_key=api_key)
# Get model names from environment or use defaults
self.primary_model_name = os.getenv("GEMINI_MODEL_PRIMARY", "gemini-2.5-pro-preview-06-05")
self.fallback_model_name = os.getenv("GEMINI_MODEL_FALLBACK", "gemini-1.5-pro")
# Timeout configuration (in seconds)
self.timeout = float(os.getenv("GEMINI_MODEL_TIMEOUT", "10000")) / 1000
# Initialize models
self._primary_model = self._initialize_model(self.primary_model_name, "Primary")
self._fallback_model = self._initialize_model(self.fallback_model_name, "Fallback")
# Track model usage
self.primary_calls = 0
self.fallback_calls = 0
self.primary_failures = 0
def _initialize_model(self, model_name: str, model_type: str):
"""Initialize a single model with error handling."""
try:
model = genai.GenerativeModel(model_name)
logger.info(f"{model_type} model initialized: {model_name}")
return model
except Exception as e:
logger.error(f"Failed to initialize {model_type} model {model_name}: {e}")
return None
def _generate_with_timeout(self, model, model_name: str, prompt: str, timeout: float) -> str:
"""Execute model generation with timeout using ThreadPoolExecutor."""
from google.generativeai.types import RequestOptions
# Create request options with timeout
request_options = RequestOptions(timeout=timeout)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
model.generate_content, prompt, request_options=request_options
)
try:
response = future.result(timeout=timeout)
return response.text
except FutureTimeoutError:
logger.warning(f"{model_name} timed out after {timeout}s")
future.cancel()
raise TimeoutError(f"{model_name} generation timed out")
def generate_content(self, prompt: str) -> Tuple[str, str]:
"""
Generate content using primary model with automatic fallback.
Returns:
Tuple of (response_text, model_used)
"""
# Try primary model first
if self._primary_model:
try:
self.primary_calls += 1
response_text = self._generate_with_timeout(
self._primary_model, self.primary_model_name, prompt, self.timeout
)
logger.debug("Primary model responded successfully")
return response_text, self.primary_model_name
except (
google_exceptions.GoogleAPICallError,
google_exceptions.InternalServerError,
ValueError,
TimeoutError,
Exception,
) as e:
self.primary_failures += 1
error_type = type(e).__name__
logger.warning(
f"Primary model {self.primary_model_name} failed "
f"(attempt {self.primary_failures}): {error_type}: {e}"
)
if hasattr(e, "code"):
logger.warning(f"Error code: {e.code}")
if hasattr(e, "details"):
logger.warning(f"Error details: {e.details}")
# Check for 500 errors specifically
if "500" in str(e) or "Internal" in str(e):
logger.warning(
"Detected 500/Internal error - typically a temporary Gemini API issue"
)
# Log full traceback for debugging
logger.debug(f"Full traceback:\n{traceback.format_exc()}")
# Fallback to secondary model
if self._fallback_model:
try:
self.fallback_calls += 1
response_text = self._generate_with_timeout(
self._fallback_model,
self.fallback_model_name,
prompt,
self.timeout * 1.5, # Give fallback more time
)
logger.info("Fallback model responded successfully")
return response_text, self.fallback_model_name
except Exception as e:
error_type = type(e).__name__
logger.error(
f"Fallback model {self.fallback_model_name} also failed: {error_type}: {e}"
)
if hasattr(e, "code"):
logger.error(f"Error code: {e.code}")
if hasattr(e, "details"):
logger.error(f"Error details: {e.details}")
# Log full traceback for debugging
logger.debug(f"Full traceback:\n{traceback.format_exc()}")
raise RuntimeError(f"Both models failed. Last error: {error_type}: {e}")
raise RuntimeError("No models available for content generation")
def get_stats(self) -> dict:
"""Get usage statistics for the model manager."""
total_calls = self.primary_calls + self.fallback_calls
primary_success_rate = (
(self.primary_calls - self.primary_failures) / self.primary_calls
if self.primary_calls > 0
else 0
)
return {
"primary_model": self.primary_model_name,
"fallback_model": self.fallback_model_name,
"total_calls": total_calls,
"primary_calls": self.primary_calls,
"fallback_calls": self.fallback_calls,
"primary_failures": self.primary_failures,
"primary_success_rate": primary_success_rate,
"timeout_seconds": self.timeout,
}
# ========== Memory-related data models. ==========
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict
@dataclass
class ConversationTurn:
"""Represents a single turn in a conversation."""
role: str # "user", "assistant", "system"
content: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MemoryEntry:
"""Represents an entry in conversation memory."""
key: str
value: Any
category: str = "general"
timestamp: datetime = field(default_factory=datetime.now)
last_accessed: datetime = field(default_factory=datetime.now)
access_count: int = 0
# ========== Caching service for expensive operations. ==========
import hashlib
import time
from collections import OrderedDict
from typing import Any, Dict, Optional
class ResponseCache:
"""Simple LRU cache for tool responses."""
def __init__(self, max_size: int = 100, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: OrderedDict[str, Dict[str, Any]] = OrderedDict()
self.hits = 0
self.misses = 0
def create_key(self, tool_name: str, parameters: Dict[str, Any]) -> str:
"""Create a cache key from tool name and parameters."""
# Sort parameters for consistent hashing
params_str = json.dumps(parameters, sort_keys=True)
key_data = f"{tool_name}:{params_str}"
return hashlib.sha256(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get a value from cache if it exists and isn't expired."""
if key not in self.cache:
self.misses += 1
return None
entry = self.cache[key]
# Check if expired
if time.time() - entry["timestamp"] > self.ttl_seconds:
del self.cache[key]
self.misses += 1
return None
# Move to end (most recently used)
self.cache.move_to_end(key)
self.hits += 1
return entry["value"]
def set(self, key: str, value: Any) -> None:
"""Set a value in cache."""
# Remove oldest if at capacity
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = {"value": value, "timestamp": time.time()}
def clear(self) -> None:
"""Clear all cache entries."""
self.cache.clear()
self.hits = 0
self.misses = 0
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
total_requests = self.hits + self.misses
return {
"size": len(self.cache),
"max_size": self.max_size,
"hits": self.hits,
"misses": self.misses,
"hit_rate": self.hits / total_requests if total_requests > 0 else 0,
"ttl_seconds": self.ttl_seconds,
}
# ========== Memory service for conversation context. ==========
from collections import deque
from datetime import datetime
from typing import Any, Dict, List, Optional
class ConversationMemory:
"""Enhanced conversation memory with TTL and structured storage."""
def __init__(self, max_turns: int = 50, max_entries: int = 100):
self.max_turns = max_turns
self.max_entries = max_entries
self.turns: deque[ConversationTurn] = deque(maxlen=max_turns)
self.entries: Dict[str, MemoryEntry] = {}
self.created_at = datetime.now()
self.access_count = 0
def add_turn(self, role: str, content: str, metadata: Optional[Dict] = None) -> None:
"""Add a conversation turn."""
turn = ConversationTurn(role=role, content=content, metadata=metadata or {})
self.turns.append(turn)
self.access_count += 1
def set(self, key: str, value: Any, category: str = "general") -> None:
"""Store a value with a key."""
# Remove oldest entries if at capacity
if len(self.entries) >= self.max_entries:
oldest_key = min(self.entries.keys(), key=lambda k: self.entries[k].timestamp)
del self.entries[oldest_key]
self.entries[key] = MemoryEntry(key=key, value=value, category=category, access_count=0)
self.access_count += 1
def get(self, key: str, default: Any = None) -> Any:
"""Retrieve a value by key."""
if key in self.entries:
entry = self.entries[key]
entry.access_count += 1
entry.last_accessed = datetime.now()
self.access_count += 1
return entry.value
return default
def get_turns(self, limit: Optional[int] = None) -> List[ConversationTurn]:
"""Get recent conversation turns."""
if limit:
return list(self.turns)[-limit:]
return list(self.turns)
def search_entries(self, category: Optional[str] = None) -> List[MemoryEntry]:
"""Search entries by category."""
if category:
return [e for e in self.entries.values() if e.category == category]
return list(self.entries.values())
def clear(self) -> None:
"""Clear all memory."""
self.turns.clear()
self.entries.clear()
self.access_count = 0
def get_stats(self) -> Dict[str, Any]:
"""Get memory usage statistics."""
return {
"turns_count": len(self.turns),
"entries_count": len(self.entries),
"max_turns": self.max_turns,
"max_entries": self.max_entries,
"total_accesses": self.access_count,
"created_at": self.created_at.isoformat(),
"categories": list(set(e.category for e in self.entries.values())),
}
# ========== Base class for all tools. ==========
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
# Simplified ToolOutput for bundled tools
class ToolOutput:
"""Standard output format for tool execution."""
def __init__(self, success: bool, result: Optional[str] = None, error: Optional[str] = None):
self.success = success
self.result = result
self.error = error
self.metadata: Dict[str, Any] = {}
# Add missing attributes for compatibility with orchestrator
self.tool_name: str = ""
self.execution_time_ms: Optional[float] = None
self.model_used: Optional[str] = None
self.timestamp = None
class MCPTool(ABC):
"""Abstract base class for all tools using simplified property-based approach."""
@property
@abstractmethod
def name(self) -> str:
"""Return the tool name."""
pass
@property
@abstractmethod
def description(self) -> str:
"""Return the tool description."""
pass
@property
@abstractmethod
def input_schema(self) -> Dict[str, Any]:
"""Return the JSON schema for tool inputs."""
pass
@abstractmethod
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
pass
def get_mcp_definition(self) -> Dict[str, Any]:
"""Get the MCP tool definition."""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
}
# Keep the original BaseTool for backwards compatibility during migration
class BaseTool(MCPTool):
"""Legacy base class that wraps MCPTool for backwards compatibility."""
def __init__(self):
# No-op for legacy compatibility
pass
@property
def name(self) -> str:
"""Default to empty string for legacy tools."""
return ""
@property
def description(self) -> str:
"""Default to empty string for legacy tools."""
return ""
@property
def input_schema(self) -> Dict[str, Any]:
"""Default to empty schema for legacy tools."""
return {"type": "object", "properties": {}, "required": []}
# ========== Tool registry for dynamic tool discovery and management. ==========
import importlib
import inspect
from pathlib import Path
from typing import Dict, List, Optional, Type
class ToolRegistry:
"""Registry for discovering and managing tools."""
def __init__(self):
self._tools: Dict[str, BaseTool] = {}
self._tool_classes: Dict[str, Type[BaseTool]] = {}
def discover_tools(self, tools_path: Optional[Path] = None) -> None:
"""Discover and register all tools in the tools directory."""
if tools_path is None:
# Default to the tools package
tools_path = Path(__file__).parent.parent / "tools"
logger.info(f"Discovering tools in {tools_path}")
# Get all Python files in the tools directory
tool_files = list(tools_path.glob("*.py"))
logger.debug(f"Found {len(tool_files)} Python files in {tools_path}")
for tool_file in tool_files:
if tool_file.name.startswith("_") or tool_file.name == "base.py":
continue
# Try both import paths
module_names = [
f"gemini_mcp.tools.{tool_file.stem}",
f"src.gemini_mcp.tools.{tool_file.stem}",
]
module = None
for module_name in module_names:
try:
logger.debug(f"Attempting to import {module_name}")
module = importlib.import_module(module_name)
break
except ImportError:
continue
if module is None:
logger.error(f"Failed to import tool from {tool_file}")
continue
try:
# Find all classes that inherit from BaseTool
for name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, BaseTool) and obj != BaseTool:
logger.debug(f"Found tool class: {name}")
self._register_tool_class(obj)
except Exception as e:
logger.error(f"Failed to import tool from {tool_file}: {e}")
def _register_tool_class(self, tool_class: Type[BaseTool]) -> None:
"""Register a tool class."""
try:
# Instantiate the tool to get its metadata
tool_instance = tool_class()
# Use property-based access for compatibility
tool_name = tool_instance.name
if tool_name in self._tools:
logger.warning(f"Tool {tool_name} already registered, skipping")
return
self._tools[tool_name] = tool_instance
self._tool_classes[tool_name] = tool_class
logger.info(f"Registered tool: {tool_name}")
except Exception as e:
logger.error(f"Failed to register tool {tool_class.__name__}: {e}")
def get_tool(self, name: str) -> Optional[BaseTool]:
"""Get a tool instance by name."""
return self._tools.get(name)
def get_tool_class(self, name: str) -> Optional[Type[BaseTool]]:
"""Get a tool class by name."""
return self._tool_classes.get(name)
def list_tools(self) -> List[str]:
"""List all registered tool names."""
return list(self._tools.keys())
def get_all_tools(self) -> Dict[str, BaseTool]:
"""Get all registered tools."""
return self._tools.copy()
def get_mcp_tool_definitions(self) -> List[Dict]:
"""Get MCP tool definitions for all registered tools."""
definitions = []
for tool in self._tools.values():
try:
definitions.append(tool.get_mcp_definition())
except Exception as e:
# Use property-based access for error logging
tool_name = getattr(tool, "name", tool.__class__.__name__)
logger.error(f"Failed to get MCP definition for {tool_name}: {e}")
return definitions
# ========== Orchestrator for managing tool execution and conversation flow. ==========
from typing import Any, Dict, List, Optional
class ConversationOrchestrator:
"""Orchestrates tool execution and manages conversation flow."""
def __init__(
self,
tool_registry: ToolRegistry,
model_manager: Any, # DualModelManager
memory: Optional[ConversationMemory] = None,
cache: Optional[ResponseCache] = None,
):
self.tool_registry = tool_registry
self.model_manager = model_manager
self.memory = memory or ConversationMemory()
self.cache = cache or ResponseCache()
self.execution_history: List[ToolOutput] = []
async def execute_tool(
self, tool_name: str, parameters: Dict[str, Any], request_id: Optional[str] = None
) -> ToolOutput:
"""Execute a single tool with proper context injection."""
# Check cache first
cache_key = self.cache.create_key(tool_name, parameters)
cached_result = self.cache.get(cache_key)
if cached_result:
logger.info(f"Cache hit for {tool_name}")
return cached_result
# Get the tool
tool = self.tool_registry.get_tool(tool_name)
if not tool:
return ToolOutput(success=False, error=f"Unknown tool: {tool_name}")
# For bundled operation, set global model manager
global model_manager
model_manager = self.model_manager
# Execute the tool
try:
output = await tool.execute(parameters)
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
output = ToolOutput(success=False, error=str(e))
# Cache successful results
if output.success:
self.cache.set(cache_key, output)
# Store in execution history
self.execution_history.append(output)
return output
# Create tool input with context (kept for reference, though not used in new API)
# tool_input = ToolInput(
# tool_name=tool_name,
# parameters=parameters,
# context={
# "model_manager": self.model_manager,
# "memory": self.memory,
# "orchestrator": self,
# },
# request_id=request_id,
# )
# Execute the tool with just parameters (new API)
output = await tool.execute(parameters)
# Cache successful results
if output.success:
self.cache.set(cache_key, output)
# Store in execution history
self.execution_history.append(output)
# Update memory if needed
if output.success and hasattr(tool, "update_memory"):
tool.update_memory(self.memory, output)
return output
async def execute_protocol(
self, protocol_name: str, initial_input: Dict[str, Any]
) -> List[ToolOutput]:
"""Execute a multi-step protocol (e.g., debate, synthesis)."""
logger.info(f"Executing protocol: {protocol_name}")
# Example: Simple sequential execution
if protocol_name == "simple":
tool_name = initial_input.get("tool_name", "")
parameters = initial_input.get("parameters", {})
if tool_name:
return [await self.execute_tool(tool_name, parameters)]
return []
# Debate protocol
elif protocol_name == "debate":
topic = initial_input.get("topic", "")
positions = initial_input.get("positions", [])
if not topic or not positions:
output = ToolOutput(
success=False,
error="Debate protocol requires 'topic' and 'positions' parameters",
)
output.tool_name = "debate_protocol"
return [output]
debate = DebateProtocol(self, topic, positions)
try:
result = await debate.run()
output = ToolOutput(
success=True,
result=str(result), # Convert to string
)
output.tool_name = "debate_protocol"
output.metadata = {"protocol": "debate", "rounds": len(result.get("rounds", []))}
return [output]
except Exception as e:
logger.error(f"Debate protocol error: {e}")
output = ToolOutput(success=False, error=str(e))
output.tool_name = "debate_protocol"
return [output]
# Synthesis protocol (simple wrapper around synthesize tool)
elif protocol_name == "synthesis":
return [
await self.execute_tool(
"synthesize_perspectives", initial_input.get("parameters", {})
)
]
# Protocol not implemented
raise NotImplementedError(f"Protocol {protocol_name} not implemented")
def get_execution_stats(self) -> Dict[str, Any]:
"""Get statistics about tool executions."""
total = len(self.execution_history)
successful = sum(1 for output in self.execution_history if output.success)
failed = total - successful
avg_time: float = 0
if total > 0:
times = [o.execution_time_ms for o in self.execution_history if o.execution_time_ms]
avg_time = sum(times) / len(times) if times else 0
return {
"total_executions": total,
"successful": successful,
"failed": failed,
"success_rate": successful / total if total > 0 else 0,
"average_execution_time_ms": avg_time,
"cache_stats": self.cache.get_stats(),
}
# ========== Debate protocol for structured multi-agent discussions. ==========
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class DebatePosition:
"""Represents a position in a debate."""
agent_name: str
stance: str
arguments: List[str] = field(default_factory=list)
rebuttals: Dict[str, str] = field(default_factory=dict)
confidence: float = 0.5
@dataclass
class DebateRound:
"""Represents a round of debate."""
round_number: int
positions: List[DebatePosition]
synthesis: Optional[str] = None
class DebateProtocol:
"""Orchestrates structured debates between multiple agents."""
def __init__(self, orchestrator, topic: str, positions: List[str]):
self.orchestrator = orchestrator
self.topic = topic
self.positions = positions
self.rounds: List[DebateRound] = []
self.max_rounds = 3
async def run(self) -> Dict[str, Any]:
"""Run the debate protocol."""
logger.info(f"Starting debate on topic: {self.topic}")
# Round 1: Opening statements
round1 = await self._opening_statements()
self.rounds.append(round1)
# Round 2: Rebuttals
round2 = await self._rebuttal_round()
self.rounds.append(round2)
# Round 3: Final synthesis
synthesis = await self._synthesis_round()
return {
"topic": self.topic,
"rounds": self.rounds,
"final_synthesis": synthesis,
"positions_explored": len(self.positions),
}
async def _opening_statements(self) -> DebateRound:
"""Generate opening statements for each position."""
logger.info("Debate Round 1: Opening statements")
debate_positions = []
for i, position in enumerate(self.positions):
# Create a persona for this position
prompt = f"""You are participating in a structured debate on the topic: {self.topic}
Your assigned position is: {position}
Please provide:
1. Your main argument (2-3 sentences)
2. Three supporting points
3. Your confidence level (0.0-1.0) in this position
4. Any caveats or limitations you acknowledge
Be concise but persuasive."""
# Execute via orchestrator
result = await self.orchestrator.execute_tool(
"ask_gemini", {"question": prompt, "context": f"Debate agent {i+1}"}
)
if result.success:
# Parse the response (in a real implementation, we'd use structured output)
debate_position = DebatePosition(
agent_name=f"Agent_{i+1}",
stance=position,
arguments=[result.result], # Simplified for now
confidence=0.7, # Would be parsed from response
)
debate_positions.append(debate_position)
return DebateRound(round_number=1, positions=debate_positions)
async def _rebuttal_round(self) -> DebateRound:
"""Generate rebuttals for each position."""
logger.info("Debate Round 2: Rebuttals")
if not self.rounds:
raise ValueError("No opening statements to rebut")
previous_positions = self.rounds[0].positions
updated_positions = []
for i, position in enumerate(previous_positions):
rebuttals = {}
# Generate rebuttals against other positions
for j, other_position in enumerate(previous_positions):
if i == j:
continue
prompt = f"""You previously argued for: {position.stance}
The opposing view argues: {other_position.arguments[0]}
Please provide:
1. A concise rebuttal to their argument
2. Why your position is stronger
3. Any points of agreement or common ground
Keep your response under 100 words."""
result = await self.orchestrator.execute_tool(
"ask_gemini",
{"question": prompt, "context": f"Rebuttal from {position.agent_name}"},
)
if result.success:
rebuttals[other_position.agent_name] = result.result
# Update position with rebuttals
position.rebuttals = rebuttals
updated_positions.append(position)
return DebateRound(round_number=2, positions=updated_positions)
async def _synthesis_round(self) -> str:
"""Synthesize all positions into a final analysis."""
logger.info("Debate Round 3: Synthesis")
# Prepare perspectives for synthesis tool
perspectives = []
for round in self.rounds:
for position in round.positions:
perspectives.append(
{
"source": f"{position.agent_name} ({position.stance})",
"content": " ".join(position.arguments),
}
)
# Use the synthesize_perspectives tool
result = await self.orchestrator.execute_tool(
"synthesize_perspectives", {"topic": self.topic, "perspectives": perspectives}
)
return result.result if result.success else "Failed to synthesize debate"
# ========== Main MCP server implementation that orchestrates all modular components. ==========
import os
from logging.handlers import RotatingFileHandler
from os import PathLike
from typing import IO, Any, Dict, Optional, Union
# Try to import dotenv if available
try:
from dotenv import load_dotenv
HAS_DOTENV = True
except ImportError:
HAS_DOTENV = False
def load_dotenv(
dotenv_path: Optional[Union[str, PathLike[str]]] = None,
stream: Optional[IO[str]] = None,
verbose: bool = False,
override: bool = False,
interpolate: bool = True,
encoding: Optional[str] = None,
) -> bool:
"""Dummy function when dotenv is not available."""
return False
__version__ = "3.0.0"
class GeminiMCPServer:
"""Main MCP Server that integrates all modular components."""
def __init__(self):
"""Initialize the server with modular components."""
# Load environment variables at startup
self._load_env_file()
self.model_manager: Optional[DualModelManager] = None
self.tool_registry = ToolRegistry()
self.cache = ResponseCache(max_size=100, ttl_seconds=3600)
self.memory = ConversationMemory(max_turns=50, max_entries=100)
self.orchestrator: Optional[ConversationOrchestrator] = None
# Create JSON-RPC server
self.server = JsonRpcServer("gemini-mcp-server")
self._setup_handlers()
# Make server instance available globally for tools
setattr(gemini_mcp, "_server_instance", self)
# Also set as global for bundled mode
globals()["_server_instance"] = self
def _load_env_file(self) -> None:
"""Load .env file from multiple possible locations."""
# Try multiple locations for .env file
# 1. Directory of the main entry point (works with launcher.py)
main_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
# 2. Parent directory of main (in case we're in a subdirectory)
parent_dir = os.path.dirname(main_dir)
# 3. Current working directory
cwd = os.getcwd()
# 4. Script directory (where this file is)
script_dir = os.path.dirname(os.path.abspath(__file__))
env_locations = [
os.path.join(main_dir, ".env"),
os.path.join(parent_dir, ".env"),
os.path.join(cwd, ".env"),
os.path.join(script_dir, ".env"),
]
# If python-dotenv is available, try to use it first
if HAS_DOTENV:
env_loaded = False
for env_path in env_locations:
if os.path.exists(env_path):
logger.info(f"Loading .env from {env_path}")
load_dotenv(env_path)
env_loaded = True
break
if not env_loaded:
# Try current directory as last fallback
logger.info("No .env file found in expected locations, trying current directory")
load_dotenv()
else:
# Manual .env loading if python-dotenv is not available
for env_path in env_locations:
if os.path.exists(env_path):
logger.info(f"Loading .env from {env_path} (manual mode)")
try:
with open(env_path, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
# Strip whitespace from key and value
key = key.strip()
value = value.strip()
# Remove quotes if present
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
os.environ[key] = value
if key == "GEMINI_API_KEY":
logger.info(
f"Set GEMINI_API_KEY from .env file (length: {len(value)})"
)
break
except Exception as e:
logger.error(f"Failed to load .env file: {e}")
def _initialize_model_manager(self) -> bool:
"""Initialize the model manager with API key."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logger.error("No GEMINI_API_KEY found in environment. Please check your .env file.")
# Log all env vars starting with GEMINI for debugging
gemini_vars = {k: v for k, v in os.environ.items() if k.startswith("GEMINI")}
if gemini_vars:
logger.info(f"Found GEMINI env vars: {list(gemini_vars.keys())}")
else:
logger.warning("No GEMINI environment variables found at all")
return False
try:
logger.info(f"Initializing DualModelManager with API key (length: {len(api_key)})")
self.model_manager = DualModelManager(api_key)
# Create orchestrator with all components
logger.info("Creating conversation orchestrator...")
self.orchestrator = ConversationOrchestrator(
tool_registry=self.tool_registry,
model_manager=self.model_manager,
memory=self.memory,
cache=self.cache,
)
logger.info("Model manager initialization complete")
return True
except Exception as e:
logger.error(f"Failed to initialize model manager: {e}", exc_info=True)
return False
def _setup_handlers(self):
"""Set up JSON-RPC handlers."""
# Register handlers
self.server.register_handler("initialize", self.handle_initialize)
self.server.register_handler("tools/list", self.handle_tools_list)
self.server.register_handler("tools/call", self.handle_tool_call)
def handle_initialize(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialization request."""
# Reload environment variables in case they changed
self._load_env_file()
# Log the API key status
api_key = os.environ.get("GEMINI_API_KEY")
if api_key:
logger.info(f"GEMINI_API_KEY found (length: {len(api_key)})")
else:
logger.warning("GEMINI_API_KEY not found in environment")
# Discover and register all tools FIRST
self.tool_registry.discover_tools()
logger.info(f"Registered {len(self.tool_registry.list_tools())} tools")
# Initialize model manager AFTER tools are registered
model_initialized = self._initialize_model_manager()
return create_result_response(
request_id,
{
"protocolVersion": "2024-11-05",
"serverInfo": {
"name": "gemini-mcp-server",
"version": __version__,
"modelsAvailable": model_initialized,
},
"capabilities": {"tools": {}},
},
)
def handle_tools_list(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool list request."""
# Get tool definitions from registry
tool_defs = self.tool_registry.get_mcp_tool_definitions()
# Convert to tool list format
tools = []
for tool_def in tool_defs:
tools.append(
{
"name": tool_def["name"],
"description": tool_def["description"],
"inputSchema": tool_def["inputSchema"],
}
)
return create_result_response(request_id, {"tools": tools})
def handle_tool_call(self, request_id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool execution request."""
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
logger.info(f"Executing tool: {tool_name}")
# Validate tool name
if not tool_name:
return create_result_response(
request_id,
{"content": [{"type": "text", "text": "❌ Error: Tool name is required"}]},
)
# Check if models are initialized
if not self.orchestrator:
result = (
"❌ Gemini models not initialized. "
"Please set GEMINI_API_KEY environment variable."
)
return create_result_response(
request_id, {"content": [{"type": "text", "text": result}]}
)
# Execute tool through orchestrator
try:
# Use orchestrator to execute tool (async converted to sync)
import asyncio
# Create event loop if needed
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
output = loop.run_until_complete(
self.orchestrator.execute_tool(
tool_name=tool_name, parameters=arguments, request_id=request_id
)
)
if output.success:
result = output.result or ""
else:
result = f"❌ Error: {output.error or 'Unknown error'}"
finally:
# Clean up loop if we created it
if asyncio.get_event_loop() is loop:
loop.close()
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
result = f"❌ Error executing tool: {str(e)}"
return create_result_response(request_id, {"content": [{"type": "text", "text": result}]})
def run(self):
"""Run the MCP server."""
logger.info(f"Starting Gemini MCP Server v{__version__} (Modular)")
# Configure unbuffered output for proper MCP communication
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", 1)
sys.stderr = os.fdopen(sys.stderr.fileno(), "w", 1)
# Run the JSON-RPC server
self.server.run()
def main():
"""Main entry point."""
# Create logs directory if it doesn't exist
log_dir = os.path.expanduser("~/.claude-mcp-servers/gemini-collab/logs")
os.makedirs(log_dir, exist_ok=True)
# Configure logging with both stderr and file output
log_file = os.path.join(log_dir, "gemini-mcp-server.log")
# Create handlers
handlers = [
logging.StreamHandler(sys.stderr),
RotatingFileHandler(
log_file,
mode="a",
encoding="utf-8",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5, # Keep 5 backup files
),
]
# Configure logging
# Use DEBUG level if GEMINI_DEBUG env var is set, otherwise INFO
log_level = logging.DEBUG if os.getenv("GEMINI_DEBUG") else logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=handlers,
force=True, # Ensure logging is configured even if already configured elsewhere
)
logger.info(f"Logging to file: {log_file}")
try:
server = GeminiMCPServer()
server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}")
sys.exit(1)
# ========== Tool for asking Gemini general questions. ==========
from typing import Any, Dict
class AskGeminiTool(MCPTool):
"""Tool for Ask Gemini."""
@property
def name(self) -> str:
return "ask_gemini"
@property
def description(self) -> str:
return "Ask Gemini a general question or for help with a problem"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question or problem to ask Gemini",
},
"context": {
"type": "string",
"description": "Optional context to help Gemini understand better",
"default": "",
},
},
"required": ["question"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
# Get parameters
question = parameters.get("question", "")
context = parameters.get("context", "")
if not question:
return ToolOutput(success=False, error="Question is required")
# Build prompt
prompt = f"Context: {context}\n\n" if context else ""
prompt += f"Question: {question}"
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt)
formatted_response = f"🤖 Gemini's Response:\n\n{response_text}"
if model_used != model_manager.primary_model_name:
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
# ========== Brainstorming tool for generating ideas and solutions. ==========
from typing import Any, Dict
class BrainstormTool(MCPTool):
"""Tool for Brainstorm."""
@property
def name(self) -> str:
return "gemini_brainstorm"
@property
def description(self) -> str:
return "Brainstorm ideas or solutions with Gemini"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "The topic or problem to brainstorm about",
},
"constraints": {
"type": "string",
"description": "Any constraints or requirements to consider",
"default": "",
},
},
"required": ["topic"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
topic = parameters.get("topic")
if not topic:
return ToolOutput(success=False, error="Topic is required for brainstorming")
constraints = parameters.get("constraints", "")
# Build the prompt
prompt = self._build_prompt(topic, constraints)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt)
formatted_response = f"💡 Brainstorming Results:\n\n{response_text}"
if model_used != model_manager.primary_model_name:
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, topic: str, constraints: str) -> str:
"""Build the brainstorming prompt."""
constraints_text = f"\nConstraints to consider:\n{constraints}" if constraints else ""
return f"""Let's brainstorm ideas about: {topic}{constraints_text}
Please provide:
1. Creative and innovative ideas
2. Different perspectives and approaches
3. Potential challenges and solutions
4. Actionable next steps
Be creative but practical. Think outside the box while considering feasibility."""
# ========== Code review tool for analyzing code quality and suggesting improvements. ==========
from typing import Any, Dict
class CodeReviewTool(MCPTool):
"""Tool for Code Review."""
@property
def name(self) -> str:
return "gemini_code_review"
@property
def description(self) -> str:
return "Ask Gemini to review code for issues, improvements, or best practices"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"code": {"type": "string", "description": "The code to review"},
"language": {
"type": "string",
"description": "Programming language (e.g., python, javascript)",
"default": "javascript",
},
"focus": {
"type": "string",
"description": "Specific aspect to focus on "
"(e.g., security, performance, readability)",
"default": "general",
},
},
"required": ["code"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
code = parameters.get("code")
if not code:
return ToolOutput(success=False, error="Code is required for review")
language = parameters.get("language", "javascript")
focus = parameters.get("focus", "general")
# Build the prompt
prompt = self._build_prompt(code, language, focus)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt)
formatted_response = f"🔍 Code Review:\n\n{response_text}"
if model_used != model_manager.primary_model_name:
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, code: str, language: str, focus: str) -> str:
"""Build the code review prompt."""
focus_instructions = {
"security": "Pay special attention to security vulnerabilities, "
"input validation, and potential exploits.",
"performance": "Focus on performance optimizations, "
"algorithmic complexity, and resource usage.",
"readability": "Emphasize code clarity, naming conventions, and maintainability.",
"best_practices": f"Review against {language} best practices and idiomatic patterns.",
"general": "Provide a comprehensive review covering all aspects.",
}
focus_text = focus_instructions.get(focus, focus_instructions["general"])
return f"""Please review the following {language} code:
```{language}
{code}
```
{focus_text}
Provide:
1. Overall assessment
2. Specific issues found (if any)
3. Suggestions for improvement
4. Examples of better implementations where applicable
Be constructive and specific in your feedback."""
# ========== Explanation tool for understanding complex code or concepts. ==========
from typing import Any, Dict
class ExplainTool(MCPTool):
"""Tool for Explain."""
@property
def name(self) -> str:
return "gemini_explain"
@property
def description(self) -> str:
return "Ask Gemini to explain complex code or concepts"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"topic": {"type": "string", "description": "Code or concept to explain"},
"level": {
"type": "string",
"description": "Explanation level (beginner, intermediate, expert)",
"default": "intermediate",
},
},
"required": ["topic"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
topic = parameters.get("topic")
if not topic:
return ToolOutput(success=False, error="Topic is required for explanation")
level = parameters.get("level", "intermediate")
# Build the prompt
prompt = self._build_prompt(topic, level)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt)
formatted_response = f"📚 Explanation:\n\n{response_text}"
if model_used != model_manager.primary_model_name:
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, topic: str, level: str) -> str:
"""Build the explanation prompt."""
level_instructions = {
"beginner": """Explain this as if to someone new to programming:
- Use simple language and avoid jargon
- Provide analogies to everyday concepts
- Break down complex ideas into simple steps
- Include examples that build understanding gradually""",
"intermediate": """Explain this to someone with programming experience:
- Assume familiarity with basic programming concepts
- Focus on the key insights and patterns
- Include practical examples and use cases
- Mention common pitfalls and best practices""",
"expert": """Provide an in-depth technical explanation:
- Include implementation details and edge cases
- Discuss performance implications and trade-offs
- Reference relevant algorithms, data structures, or design patterns
- Compare with alternative approaches""",
}
level_text = level_instructions.get(level, level_instructions["intermediate"])
return f"""Please explain the following:
{topic}
{level_text}
Structure your explanation with:
1. Overview/Summary
2. Detailed explanation
3. Examples (if applicable)
4. Key takeaways"""
# ========== Server information tool for checking status and configuration. ==========
from typing import Any, Dict
__version__ = "3.0.0"
class ServerInfoTool(MCPTool):
"""Tool for getting server information and status."""
@property
def name(self) -> str:
return "server_info"
@property
def description(self) -> str:
return "Get server version and status"
@property
def input_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}, "required": []}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
# Access the server components through global context
# In bundled mode, will be set as global _server_instance
server = None
# Try modular approach first
try:
server = getattr(gemini_mcp, "_server_instance", None)
except ImportError:
pass
# Fall back to global _server_instance (for bundled mode)
if not server:
server = globals().get("_server_instance", None)
# Declare info variable
info: Dict[str, Any]
if not server:
# Fallback to basic info if server instance not available
info = {
"version": __version__,
"architecture": "modular",
"status": "running",
"note": "Full stats unavailable - server instance not accessible",
}
else:
# Get list of available tools from registry
registered_tools = server.tool_registry.list_tools()
all_tools = registered_tools # server_info is now just another tool
info = {
"version": __version__,
"architecture": "modular",
"available_tools": all_tools,
"components": {
"tools_registered": len(registered_tools),
"total_tools_available": len(all_tools),
"cache_stats": server.cache.get_stats() if server.cache else None,
"memory_stats": server.memory.get_stats() if server.memory else None,
},
"models": {
"initialized": server.model_manager is not None,
"primary": getattr(server.model_manager, "primary_model_name", None),
"fallback": getattr(server.model_manager, "fallback_model_name", None),
"stats": self._get_model_stats(server.model_manager),
},
}
if server.orchestrator:
info["execution_stats"] = server.orchestrator.get_execution_stats()
result = f"🤖 Gemini MCP Server v{__version__}\n\n{json.dumps(info, indent=2)}"
return ToolOutput(success=True, result=result)
except Exception as e:
return ToolOutput(success=False, error=f"Error getting server info: {str(e)}")
def _get_model_stats(self, model_manager):
"""Safely get model stats, returning None if there's an error."""
if not model_manager:
return None
try:
return model_manager.get_stats()
except Exception:
# If get_stats() fails, return None instead of crashing
return None
# ========== Synthesis tool for combining multiple perspectives into cohesive insights. ==========
from typing import Any, Dict, List
class SynthesizeTool(MCPTool):
"""Tool for Synthesize."""
@property
def name(self) -> str:
return "synthesize_perspectives"
@property
def description(self) -> str:
return "Synthesize multiple viewpoints or pieces of information into a coherent summary"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"topic": {"type": "string", "description": "The topic or question being addressed"},
"perspectives": {
"type": "array",
"description": "List of different perspectives or pieces of information",
"items": {
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Source or viewpoint identifier",
},
"content": {
"type": "string",
"description": "The perspective or information",
},
},
"required": ["content"],
},
},
},
"required": ["topic", "perspectives"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
topic = parameters.get("topic")
if not topic:
return ToolOutput(success=False, error="Topic is required for synthesis")
perspectives = parameters.get("perspectives", [])
if not perspectives:
return ToolOutput(success=False, error="At least one perspective is required")
# Build the prompt
prompt = self._build_prompt(topic, perspectives)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt)
formatted_response = f"🔄 Synthesis:\n\n{response_text}"
if model_used != model_manager.primary_model_name:
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, topic: str, perspectives: List[Dict[str, str]]) -> str:
"""Build the synthesis prompt."""
perspectives_text = "\n\n".join(
[
f"**{p.get('source', f'Perspective {i+1}')}:**\n{p['content']}"
for i, p in enumerate(perspectives)
]
)
return f"""Please synthesize the following perspectives on: {topic}
{perspectives_text}
Provide a balanced synthesis that:
1. Identifies common themes and agreements
2. Highlights key differences and tensions
3. Evaluates the strengths and weaknesses of each perspective
4. Proposes a unified understanding or framework
5. Suggests actionable insights or next steps
Be objective and fair to all viewpoints while providing critical analysis."""
# ========== Test case generation tool for suggesting comprehensive test scenarios. ==========
from typing import Any, Dict
class TestCasesTool(MCPTool):
"""Tool for Test Cases."""
@property
def name(self) -> str:
return "gemini_test_cases"
@property
def description(self) -> str:
return "Ask Gemini to suggest test cases for code or features"
@property
def input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"code_or_feature": {
"type": "string",
"description": "Code snippet or feature description",
},
"test_type": {
"type": "string",
"description": "Type of tests (unit, integration, edge cases)",
"default": "all",
},
},
"required": ["code_or_feature"],
}
async def execute(self, parameters: Dict[str, Any]) -> ToolOutput:
"""Execute the tool."""
try:
code_or_feature = parameters.get("code_or_feature")
if not code_or_feature:
return ToolOutput(success=False, error="Code or feature description is required")
test_type = parameters.get("test_type", "all")
# Build the prompt
prompt = self._build_prompt(code_or_feature, test_type)
# Access global model manager in bundled version
global model_manager
response_text, model_used = model_manager.generate_content(prompt)
formatted_response = f"🧪 Test Cases:\n\n{response_text}"
if model_used != model_manager.primary_model_name:
formatted_response += f"\n\n[Model: {model_used}]"
return ToolOutput(success=True, result=formatted_response)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return ToolOutput(success=False, error=f"Error: {str(e)}")
def _build_prompt(self, code_or_feature: str, test_type: str) -> str:
"""Build the test case generation prompt."""
test_type_instructions = {
"unit": "Focus on unit tests that test individual functions or methods in isolation.",
"integration": "Focus on integration tests that verify "
"components work together correctly.",
"edge": "Focus on edge cases, boundary conditions, and error scenarios.",
"performance": "Include performance and load testing scenarios.",
"all": "Provide comprehensive test cases covering all aspects.",
}
test_focus = test_type_instructions.get(test_type, test_type_instructions["all"])
# Detect if input is code or feature description
is_code = any(
indicator in code_or_feature
for indicator in ["def ", "function", "class", "{", "=>", "()"]
)
input_type = "code" if is_code else "feature"
return f"""Please suggest test cases for the following {input_type}:
{code_or_feature}
{test_focus}
For each test case, provide:
1. Test name/description
2. Input/setup required
3. Expected behavior/output
4. Why this test is important
Include both positive (happy path) and negative (error) test cases."""
# ========== Tool Registry Override for Bundled Operation ==========
# Store the bundled tool classes globally
BUNDLED_TOOL_CLASSES = [
AskGeminiTool,
BrainstormTool,
CodeReviewTool,
ExplainTool,
ServerInfoTool,
SynthesizeTool,
TestCasesTool,
]
# Override the ToolRegistry's discover_tools method for bundled operation
def _bundled_discover_tools(self, tools_path: Optional[Path] = None) -> None:
"""Discover and register all tools - bundled version."""
# Clear any existing tools to ensure clean state
self._tools.clear()
self._tool_classes.clear()
logger.info("Registering bundled tools")
for tool_class in BUNDLED_TOOL_CLASSES:
try:
tool_instance = tool_class()
tool_name = tool_instance.name # Use property access
self._tools[tool_name] = tool_instance
self._tool_classes[tool_name] = tool_class
logger.info(f"Registered tool: {tool_name}")
except Exception as e:
logger.error(f"Failed to register tool {tool_class.__name__}: {e}")
logger.info(f"Registered {len(self._tools)} tools in bundled mode")
# Function to apply the override - will be called from main()
def _apply_tool_registry_override():
"""Apply the bundled tool registry override."""
ToolRegistry.discover_tools = _bundled_discover_tools
# ========== Main Execution ==========
if __name__ == "__main__":
# Apply the tool registry override before running
_apply_tool_registry_override()
# Call the main function from the bundled code
main()