Skip to main content
Glama

IBHack MCP Server

by vigenere92
server.py16.4 kB
#!/usr/bin/env python3 """ Tool Discovery MCP Server using FastMCP """ import ast import os import sys from pathlib import Path from typing import Dict, Optional, Any from dataclasses import dataclass from fastmcp import FastMCP from llm_service import LLMService from composio import COMPOSIO_MODULE @dataclass class ToolInfo: """Information about a discovered tool.""" name: str description: str file_path: str class_name: str python_code: str # Complete Python code for the tool input_schema: Optional[str] = None output_schema: Optional[str] = None class ToolDiscovery: """Discovers tools by scanning Python files in a directory.""" def __init__(self): self.tools: Dict[str, ToolInfo] = {} def scan_directory(self, directory_path: str) -> Dict[str, ToolInfo]: """ Scan a directory for Python files containing tool classes. Args: directory_path: Path to the directory to scan Returns: Dictionary mapping tool names to ToolInfo objects """ self.tools = {} directory = Path(directory_path) if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory_path}") if not directory.is_dir(): raise ValueError(f"Path is not a directory: {directory_path}") # Find all Python files in the directory and subdirectories recursively python_files = list(directory.glob("**/*.py")) for file_path in python_files: try: self._scan_file(file_path) except Exception as e: print(f"Error scanning file {file_path}: {e}", file=sys.stderr) continue return self.tools def _scan_file(self, file_path: Path) -> None: """Scan a single Python file for tool classes.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tree = ast.parse(content, filename=str(file_path)) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): tool_info = self._extract_tool_info(node, file_path) if tool_info: self.tools[tool_info.name] = tool_info except Exception as e: print(f"Error parsing file {file_path}: {e}", file=sys.stderr) def _extract_tool_info(self, class_node: ast.ClassDef, file_path: Path) -> Optional[ToolInfo]: """Extract tool information from a class definition.""" # Check if this class inherits from BaseTool or has tool-like methods if not self._is_tool_class(class_node): return None tool_name = None description = "" input_schema = None output_schema = None # Look for class methods that define tool metadata for method in class_node.body: if isinstance(method, ast.FunctionDef) and method.name == "get_name": tool_name = self._extract_string_return(method) elif isinstance(method, ast.FunctionDef) and method.name == "get_description": description = self._extract_string_return(method) or "" elif isinstance(method, ast.FunctionDef) and method.name == "get_input_schema": input_schema = self._extract_class_reference(method) elif isinstance(method, ast.FunctionDef) and method.name == "get_output_schema": output_schema = self._extract_class_reference(method) if not tool_name: return None # Extract complete Python code for the tool python_code = self._extract_complete_code(file_path, class_node) return ToolInfo( name=tool_name, description=description, file_path=str(file_path), class_name=class_node.name, python_code=python_code, input_schema=input_schema, output_schema=output_schema ) def _is_tool_class(self, class_node: ast.ClassDef) -> bool: """Check if a class is likely a tool class.""" # Check if it has the required methods method_names = {method.name for method in class_node.body if isinstance(method, ast.FunctionDef)} required_methods = {"get_name", "get_description", "execute"} return required_methods.issubset(method_names) def _extract_string_return(self, method: ast.FunctionDef) -> Optional[str]: """Extract string literal from a method that returns a string.""" for node in ast.walk(method): if isinstance(node, ast.Return) and node.value: if isinstance(node.value, ast.Constant) and isinstance(node.value.value, str): return node.value.value elif isinstance(node.value, ast.Str): # Python < 3.8 compatibility return node.value.s return None def _extract_class_reference(self, method: ast.FunctionDef) -> Optional[str]: """Extract class reference from a method that returns a class.""" for node in ast.walk(method): if isinstance(node, ast.Return) and node.value: if isinstance(node.value, ast.Name): return node.value.id elif isinstance(node.value, ast.Attribute): return f"{node.value.attr}" return None def _extract_complete_code(self, file_path: Path, tool_class: ast.ClassDef) -> str: """Extract complete Python code including all relevant classes, functions, and imports.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tree = ast.parse(content, filename=str(file_path)) # Find all relevant nodes to include relevant_nodes = [] referenced_names = set() # Start with the tool class and find all references self._find_referenced_names(tool_class, referenced_names) # Walk through all nodes in the file for node in ast.walk(tree): if isinstance(node, ast.Import): # Include all imports relevant_nodes.append(node) elif isinstance(node, ast.ImportFrom): # Include all from imports relevant_nodes.append(node) elif isinstance(node, ast.ClassDef): # Include the tool class and any referenced classes if (node.name == tool_class.name or node.name in referenced_names or self._is_referenced_class(node, tool_class)): relevant_nodes.append(node) elif isinstance(node, ast.FunctionDef): # Include functions that are referenced if (node.name in referenced_names or self._is_referenced_function(node, tool_class)): relevant_nodes.append(node) elif isinstance(node, ast.Assign): # Include variable assignments that might be referenced if self._is_referenced_assignment(node, tool_class): relevant_nodes.append(node) # Sort nodes by line number to maintain order relevant_nodes.sort(key=lambda n: getattr(n, 'lineno', 0)) # Extract source code for each relevant node lines = content.split('\n') code_parts = [] for node in relevant_nodes: start_line = getattr(node, 'lineno', 1) - 1 end_line = getattr(node, 'end_lineno', start_line + 1) # Extract the node's source code node_lines = lines[start_line:end_line] code_parts.append('\n'.join(node_lines)) return '\n\n'.join(code_parts) except Exception as e: print(f"Error extracting code from {file_path}: {e}", file=sys.stderr) return "" def _find_referenced_names(self, node: ast.AST, referenced_names: set) -> None: """Recursively find all referenced names in an AST node.""" for child in ast.walk(node): if isinstance(child, ast.Name): referenced_names.add(child.id) elif isinstance(child, ast.Attribute): referenced_names.add(child.attr) def _is_referenced_class(self, class_node: ast.ClassDef, tool_class: ast.ClassDef) -> bool: """Check if a class is referenced by the tool class.""" referenced_names = set() self._find_referenced_names(tool_class, referenced_names) return class_node.name in referenced_names def _is_referenced_function(self, func_node: ast.FunctionDef, tool_class: ast.ClassDef) -> bool: """Check if a function is referenced by the tool class.""" referenced_names = set() self._find_referenced_names(tool_class, referenced_names) return func_node.name in referenced_names def _is_referenced_assignment(self, assign_node: ast.Assign, tool_class: ast.ClassDef) -> bool: """Check if an assignment is referenced by the tool class.""" referenced_names = set() self._find_referenced_names(tool_class, referenced_names) # Check if any of the assigned variables are referenced for target in assign_node.targets: if isinstance(target, ast.Name) and target.id in referenced_names: return True return False # Create the MCP server instance mcp = FastMCP("IBHack MCP Server") # Initialize tool discovery tool_discovery = ToolDiscovery() # Initialize LLM service (will be created when first needed) llm_service = None # Perform startup scanning if environment variable is set def perform_startup_scan(): """Perform tool scanning during server startup.""" scan_directory = os.getenv('SCAN_DIRECTORY') if scan_directory: print(f"Starting up: Scanning directory {scan_directory} for tools...", file=sys.stderr) try: tools = tool_discovery.scan_directory(scan_directory) print(f"Startup scan complete: Found {len(tools)} tools", file=sys.stderr) for tool_name, tool_info in tools.items(): print(f" - {tool_name}: {tool_info.description}", file=sys.stderr) except Exception as e: print(f"Startup scan failed: {e}", file=sys.stderr) else: print("No SCAN_DIRECTORY environment variable set. Skipping startup scan.", file=sys.stderr) # Run startup scan perform_startup_scan() @mcp.tool() def recommend_tools(query_description: str, top_k: int = 1) -> Dict[str, Any]: """ Find the most relevant tools for a given description using Gemini AI. Args: query_description: Description of what the user wants to do top_k: Number of top tools to return (default: 1) Returns: Dictionary containing: - tool_from_code: The first recommended tool with complete code - tool_create: Boolean indicating if a new tool should be created (true) or existing tool can be updated (false) - composio_tool: Relevant Composio tool data """ global llm_service try: # Initialize LLM service if not already done if llm_service is None: try: llm_service = LLMService() except ValueError as e: return { "success": False, "error": f"LLM service initialization failed: {str(e)}", "tool_from_code": {}, "tool_create": False, "composio_tool": {} } # Get available tools if not tool_discovery.tools: return { "success": False, "error": "No tools available. Please scan a directory first using scan_tools_directory.", "tool_from_code": {}, "tool_create": False, "composio_tool": {} } # Get tool names from LLM print(f"Checking query: {query_description} against indexed code.") recommended_tool_names = llm_service.find_relevant_tools( query_description, tool_discovery.tools, top_k ) # Build tool_from_code from the first recommended tool tool_from_code = {} tool_create = False if recommended_tool_names and recommended_tool_names[0] in tool_discovery.tools: first_tool_name = recommended_tool_names[0] tool_info = tool_discovery.tools[first_tool_name] tool_from_code = { "tool_name": first_tool_name, "description": tool_info.description, "file_path": tool_info.file_path, "class_name": tool_info.class_name, "python_code": tool_info.python_code } # Check if the existing tool can be updated or if a new tool should be created try: print(f"Checking if existing tool should be updated.") update_analysis = llm_service.check_tool_update_vs_new( query_description, tool_info.python_code, first_tool_name ) tool_create = not update_analysis.get('can_update', False) except Exception as e: print(f"Error checking tool update vs new: {e}", file=sys.stderr) tool_create = True # Default to creating new tool on error # Check for relevant Composio tools composio_tool = {} try: # Get Composio tools composio_tools = COMPOSIO_MODULE.tools if composio_tools: # Find relevant Composio tool print(f"Checking for more context in available registry.") relevant_composio_tool_name = llm_service.find_relevant_composio_tool( query_description, composio_tools ) if relevant_composio_tool_name and relevant_composio_tool_name in composio_tools: tool_info = composio_tools[relevant_composio_tool_name] toolkit_slug = tool_info.get('toolkit', '') toolkit_info = COMPOSIO_MODULE.toolkits.get(toolkit_slug, {}) composio_tool = { "tool_name": relevant_composio_tool_name, "description": tool_info.get('description', ''), "toolkit_name": toolkit_info.get('name', ''), "auth_schemes": toolkit_info.get('auth_schemes', []), "input_parameters": tool_info.get('input_parameters', {}), "output_parameters": tool_info.get('output_parameters', {}) } except Exception as e: print(f"Error checking Composio tools: {e}", file=sys.stderr) composio_tool = {} return { "success": True, "query": query_description, "total_available_tools": len(tool_discovery.tools), "recommendations_requested": top_k, "tool_from_code": tool_from_code, "tool_create": tool_create, "composio_tool": composio_tool } except Exception as e: return { "success": False, "error": f"Unexpected error: {str(e)}", "tool_from_code": {}, "tool_create": False, "composio_tool": {} } if __name__ == "__main__": # Run the server mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vigenere92/ibhack-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server