Skip to main content
Glama
python.ts10.6 kB
/** * Python Language Parser * Extracts MCP schemas from Python source files (FastMCP decorators) * * NOTE: This is a basic implementation that uses regex parsing. * For production use, consider using tree-sitter-python or calling * Python's AST module via subprocess. */ import { readFileSync, readdirSync, statSync } from 'fs'; import { join, relative } from 'path'; import type { LanguageParser, ExtractOptions, TraceOptions } from './base.js'; import type { ProducerSchema, ConsumerSchema, JSONSchema, SourceLocation } from '../types.js'; export class PythonParser implements LanguageParser { readonly name = 'python'; readonly filePatterns = ['**/*.py']; // ========================================================================== // Producer Schema Extraction // ========================================================================== async extractSchemas(options: ExtractOptions): Promise<ProducerSchema[]> { console.log(`[Python] Scanning: ${options.rootDir}`); const patterns = options.include || this.filePatterns; const excludePatterns = options.exclude || ['**/node_modules/**', '**/dist/**', '**/__pycache__/**', '**/venv/**', '**/.venv/**']; // Find all Python files const files = this.findFiles(options.rootDir, patterns, excludePatterns); const schemas: ProducerSchema[] = []; for (const filePath of files) { const fileSchemas = await this.extractFromFile(filePath); schemas.push(...fileSchemas); } console.log(`[Python] Found ${schemas.length} tool definitions`); return schemas; } /** * Extract schemas from a single Python file */ private async extractFromFile(filePath: string): Promise<ProducerSchema[]> { const content = readFileSync(filePath, 'utf-8'); const lines = content.split('\n'); const schemas: ProducerSchema[] = []; // Look for @mcp.tool() decorator pattern for (let i = 0; i < lines.length; i++) { const line = lines[i].trim(); // Match: @mcp.tool() or @server.tool() or @app.tool() if (line.match(/@\w+\.tool\(\)/)) { // Next non-empty line should be the function definition let funcLine = i + 1; while (funcLine < lines.length && !lines[funcLine].trim()) { funcLine++; } if (funcLine < lines.length) { const schema = this.parsePythonFunction(lines, funcLine, filePath); if (schema) { schemas.push(schema); } } } } return schemas; } /** * Parse a Python function definition * Pattern: def function_name(arg1: type1, arg2: type2 = default) -> ReturnType: */ private parsePythonFunction(lines: string[], lineIndex: number, filePath: string): ProducerSchema | null { const funcLine = lines[lineIndex].trim(); // Extract function signature const funcMatch = funcLine.match(/def\s+(\w+)\s*\((.*?)\)(?:\s*->\s*(.+?))?\s*:/); if (!funcMatch) { return null; } const [, functionName, argsStr, returnType] = funcMatch; // Extract docstring (if present) let description: string | undefined; if (lineIndex + 1 < lines.length) { const nextLine = lines[lineIndex + 1].trim(); if (nextLine.startsWith('"""') || nextLine.startsWith("'''")) { const quote = nextLine.startsWith('"""') ? '"""' : "'''"; let docstring = nextLine.replace(quote, ''); // Multi-line docstring if (!docstring.endsWith(quote)) { let docLine = lineIndex + 2; while (docLine < lines.length && !lines[docLine].includes(quote)) { docstring += ' ' + lines[docLine].trim(); docLine++; } if (docLine < lines.length) { docstring += ' ' + lines[docLine].trim().replace(quote, ''); } } else { docstring = docstring.replace(quote, ''); } description = docstring.trim(); } } // Parse function arguments const inputSchema = this.parseArguments(argsStr); // Parse return type const outputSchema = this.parseReturnType(returnType); return { toolName: functionName, description, inputSchema, outputSchema, location: { file: filePath, line: lineIndex + 1, }, }; } /** * Parse Python function arguments into JSON Schema * Pattern: arg1: str, arg2: int = 10, arg3: Optional[bool] = None */ private parseArguments(argsStr: string): JSONSchema { const schema: JSONSchema = { type: 'object', properties: {}, required: [], }; // Split by comma, but be careful with nested types like Dict[str, int] const args = this.splitArguments(argsStr); for (const arg of args) { const trimmed = arg.trim(); if (!trimmed || trimmed === 'self') continue; // Match: name: type or name: type = default const argMatch = trimmed.match(/^(\w+)\s*:\s*([^=]+)(?:\s*=\s*(.+))?$/); if (argMatch) { const [, name, typeStr, defaultValue] = argMatch; const type = typeStr.trim(); const hasDefault = defaultValue !== undefined; // Convert Python type to JSON Schema type schema.properties![name] = this.pythonTypeToJsonSchema(type); // If no default value, it's required if (!hasDefault && !type.includes('Optional')) { schema.required!.push(name); } } } return schema; } /** * Split arguments by comma, respecting brackets */ private splitArguments(argsStr: string): string[] { const args: string[] = []; let current = ''; let depth = 0; for (const char of argsStr) { if (char === '[' || char === '(') { depth++; current += char; } else if (char === ']' || char === ')') { depth--; current += char; } else if (char === ',' && depth === 0) { args.push(current); current = ''; } else { current += char; } } if (current.trim()) { args.push(current); } return args; } /** * Convert Python type annotation to JSON Schema type */ private pythonTypeToJsonSchema(pythonType: string): JSONSchema { const type = pythonType.trim(); // Basic types if (type === 'str') return { type: 'string' }; if (type === 'int') return { type: 'integer' }; if (type === 'float') return { type: 'number' }; if (type === 'bool') return { type: 'boolean' }; if (type === 'dict' || type.startsWith('Dict[')) return { type: 'object' }; if (type === 'list' || type.startsWith('List[')) return { type: 'array' }; // Optional types if (type.startsWith('Optional[')) { const innerType = type.slice(9, -1); return this.pythonTypeToJsonSchema(innerType); } // Union types (simplified) if (type.startsWith('Union[')) { return { type: 'unknown' }; } // Default return { type: 'unknown' }; } /** * Parse Python return type annotation */ private parseReturnType(returnType?: string): JSONSchema { if (!returnType) { return { type: 'unknown' }; } return this.pythonTypeToJsonSchema(returnType); } // ========================================================================== // Consumer Usage Tracing // ========================================================================== async traceUsage(options: TraceOptions): Promise<ConsumerSchema[]> { console.log(`[Python] Tracing: ${options.rootDir}`); const patterns = options.include || this.filePatterns; const excludePatterns = options.exclude || ['**/node_modules/**', '**/dist/**', '**/__pycache__/**', '**/venv/**', '**/.venv/**']; const files = this.findFiles(options.rootDir, patterns, excludePatterns); const schemas: ConsumerSchema[] = []; for (const filePath of files) { const fileSchemas = await this.traceFile(filePath); schemas.push(...fileSchemas); } console.log(`[Python] Found ${schemas.length} tool calls`); return schemas; } /** * Trace a single Python file for MCP client calls */ private async traceFile(filePath: string): Promise<ConsumerSchema[]> { const content = readFileSync(filePath, 'utf-8'); const lines = content.split('\n'); const schemas: ConsumerSchema[] = []; // Look for patterns like: client.call_tool("tool_name", {...}) for (let i = 0; i < lines.length; i++) { const line = lines[i]; // Match: .call_tool("name", or .call_tool('name', const callMatch = line.match(/\.call_tool\s*\(\s*["'](\w+)["']/); if (callMatch) { const toolName = callMatch[1]; schemas.push({ toolName, callSite: { file: filePath, line: i + 1, }, argumentsProvided: {}, // TODO: Parse arguments expectedProperties: [], // TODO: Trace property access }); } } return schemas; } // ========================================================================== // File System Utilities // ========================================================================== /** * Find files matching patterns */ private findFiles(rootDir: string, include: string[], exclude: string[]): string[] { const results: string[] = []; const walk = (dir: string) => { try { const entries = readdirSync(dir); for (const entry of entries) { const fullPath = join(dir, entry); const relativePath = relative(rootDir, fullPath); // Check exclusions if (exclude.some(pattern => this.matchPattern(relativePath, pattern))) { continue; } const stat = statSync(fullPath); if (stat.isDirectory()) { walk(fullPath); } else if (stat.isFile()) { // Check inclusions if (include.some(pattern => this.matchPattern(relativePath, pattern))) { results.push(fullPath); } } } } catch (err) { // Ignore permission errors } }; walk(rootDir); return results; } /** * Simple glob pattern matching */ private matchPattern(path: string, pattern: string): boolean { // Convert glob pattern to regex const regexPattern = pattern .replace(/\\/g, '/') .replace(/\*\*/g, '.*') .replace(/\*/g, '[^/]*') .replace(/\?/g, '.'); const regex = new RegExp(`^${regexPattern}$`); return regex.test(path.replace(/\\/g, '/')); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Mnehmos/mnehmos.trace.mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server