Skip to main content
Glama
srwlli

Documentation Generator MCP Server

by srwlli
api_generator.py29.5 kB
"""API inventory generator for discovering REST/GraphQL endpoints across multiple frameworks.""" import json import ast import re from pathlib import Path from typing import List, Optional, Dict, Any, Set from datetime import datetime import jsonschema import sys # Add parent directory to path for constants import sys.path.insert(0, str(Path(__file__).parent.parent)) from constants import Paths from logger_config import logger, log_error, log_security_event class ApiGenerator: """Helper class for generating comprehensive API endpoint inventories.""" def __init__(self, project_path: Path): """ Initialize API inventory generator. Args: project_path: Path to project directory to analyze """ self.project_path = project_path self.inventory_dir = project_path / Paths.INVENTORY_DIR self.schema_path = self.inventory_dir / "api-schema.json" self.schema = self._load_schema() logger.info(f"Initialized ApiGenerator for {project_path}") def _load_schema(self) -> Optional[Dict[str, Any]]: """ Load JSON schema for API manifest validation (SEC-002). Returns: Schema dictionary or None if schema file doesn't exist Raises: json.JSONDecodeError: If schema JSON is malformed """ if not self.schema_path.exists(): logger.warning(f"No schema found at {self.schema_path}") return None try: with open(self.schema_path, 'r', encoding='utf-8') as f: schema = json.load(f) logger.debug(f"Loaded schema from {self.schema_path}") return schema except json.JSONDecodeError as e: log_error('schema_load_error', f"Malformed schema file: {self.schema_path}", error=str(e)) raise json.JSONDecodeError( f"Malformed schema file: {self.schema_path}", e.doc, e.pos ) def validate_manifest(self, data: Dict[str, Any]) -> None: """ Validate manifest data against JSON schema (SEC-002). Args: data: Manifest dictionary to validate Raises: jsonschema.ValidationError: If data doesn't match schema jsonschema.SchemaError: If schema itself is invalid """ if self.schema is None: logger.warning("No schema available, skipping validation") return try: jsonschema.validate(data, self.schema) logger.debug("API manifest validation passed") except jsonschema.ValidationError as e: log_error('manifest_validation_error', f"API manifest validation failed: {str(e)}", error=str(e)) raise def detect_frameworks(self, frameworks: Optional[List[str]] = None) -> List[str]: """ Detect API frameworks used in the project. Supports: FastAPI, Flask, Express Args: frameworks: Optional list of frameworks to check. If ['all'], check all supported frameworks. Returns: List of detected framework names Raises: PermissionError: If project directory cannot be accessed """ if frameworks is None or frameworks == ['all']: # Check all supported frameworks frameworks_to_check = ['fastapi', 'flask', 'express'] else: frameworks_to_check = [fw.lower() for fw in frameworks] detected = set() logger.info(f"Detecting API frameworks in {self.project_path}") try: # Scan Python files for FastAPI/Flask if 'fastapi' in frameworks_to_check or 'flask' in frameworks_to_check: for py_file in self.project_path.rglob('*.py'): # Skip common exclude directories if any(exclude in py_file.parts for exclude in ['.git', 'node_modules', '__pycache__', 'venv', '.venv']): continue try: with open(py_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Check for FastAPI if 'fastapi' in frameworks_to_check: if re.search(r'from\s+fastapi\s+import|import\s+fastapi', content, re.IGNORECASE): detected.add('fastapi') logger.debug(f"FastAPI detected in {py_file}") # Check for Flask if 'flask' in frameworks_to_check: if re.search(r'from\s+flask\s+import|import\s+flask', content, re.IGNORECASE): detected.add('flask') logger.debug(f"Flask detected in {py_file}") except Exception as e: logger.debug(f"Error reading {py_file}: {str(e)}") continue # Scan JavaScript/TypeScript files for Express if 'express' in frameworks_to_check: for js_file in list(self.project_path.rglob('*.js')) + list(self.project_path.rglob('*.ts')): # Skip common exclude directories if any(exclude in js_file.parts for exclude in ['.git', 'node_modules', 'dist', 'build']): continue try: with open(js_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Check for Express if re.search(r'require\([\'"]express[\'"]\)|import\s+.*\s+from\s+[\'"]express[\'"]', content): detected.add('express') logger.debug(f"Express detected in {js_file}") except Exception as e: logger.debug(f"Error reading {js_file}: {str(e)}") continue logger.info(f"Detected frameworks: {list(detected)}") return sorted(list(detected)) except PermissionError as e: log_security_event('permission_denied', f"Cannot access project directory: {self.project_path}", path=str(self.project_path)) raise PermissionError(f"Cannot access project directory: {self.project_path}") def extract_endpoints( self, frameworks: List[str], include_graphql: bool = False ) -> List[Dict[str, Any]]: """ Extract API endpoints from source code. Args: frameworks: List of detected frameworks include_graphql: Whether to parse GraphQL schemas Returns: List of endpoint dictionaries """ endpoints = [] logger.info(f"Extracting endpoints for frameworks: {frameworks}") # Extract from each detected framework if 'fastapi' in frameworks: endpoints.extend(self._extract_fastapi_endpoints()) if 'flask' in frameworks: endpoints.extend(self._extract_flask_endpoints()) if 'express' in frameworks: endpoints.extend(self._extract_express_endpoints()) # Extract GraphQL endpoints if requested if include_graphql: endpoints.extend(self._extract_graphql_endpoints()) logger.info(f"Extracted {len(endpoints)} total endpoints") return endpoints def _extract_fastapi_endpoints(self) -> List[Dict[str, Any]]: """ Extract endpoints from FastAPI applications using AST parsing. Returns: List of endpoint dictionaries """ endpoints = [] logger.info("Extracting FastAPI endpoints") for py_file in self.project_path.rglob('*.py'): # Skip common exclude directories if any(exclude in py_file.parts for exclude in ['.git', 'node_modules', '__pycache__', 'venv', '.venv']): continue try: with open(py_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Parse Python AST tree = ast.parse(content, filename=str(py_file)) # Find FastAPI route decorators for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): for decorator in node.decorator_list: endpoint = self._parse_fastapi_decorator(decorator, node, py_file) if endpoint: endpoints.append(endpoint) except SyntaxError: logger.debug(f"Syntax error parsing {py_file}, skipping") continue except Exception as e: logger.debug(f"Error extracting FastAPI endpoints from {py_file}: {str(e)}") continue logger.info(f"Found {len(endpoints)} FastAPI endpoints") return endpoints def _parse_fastapi_decorator( self, decorator: ast.expr, func_node: ast.FunctionDef, file_path: Path ) -> Optional[Dict[str, Any]]: """ Parse a FastAPI decorator to extract endpoint information. Args: decorator: AST decorator node func_node: Function definition node file_path: Path to source file Returns: Endpoint dictionary or None if not a route decorator """ # Check if decorator is a route method (app.get, app.post, etc.) if isinstance(decorator, ast.Call): if isinstance(decorator.func, ast.Attribute): method = decorator.func.attr if method in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: # Extract path from first argument if decorator.args and isinstance(decorator.args[0], ast.Constant): path = decorator.args[0].value # Get function parameters params = [arg.arg for arg in func_node.args.args if arg.arg != 'self'] # Check if function has docstring docstring = ast.get_docstring(func_node) relative_path = file_path.relative_to(self.project_path) return { "path": path, "method": method.upper(), "framework": "fastapi", "file": str(relative_path).replace('\\', '/'), "line": func_node.lineno, "function": func_node.name, "parameters": params, "documented": bool(docstring), "doc_coverage": 100 if docstring else 0 } return None def _extract_flask_endpoints(self) -> List[Dict[str, Any]]: """ Extract endpoints from Flask applications using AST parsing. Returns: List of endpoint dictionaries """ endpoints = [] logger.info("Extracting Flask endpoints") for py_file in self.project_path.rglob('*.py'): # Skip common exclude directories if any(exclude in py_file.parts for exclude in ['.git', 'node_modules', '__pycache__', 'venv', '.venv']): continue try: with open(py_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() # Parse Python AST tree = ast.parse(content, filename=str(py_file)) # Find Flask route decorators for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): for decorator in node.decorator_list: endpoint = self._parse_flask_decorator(decorator, node, py_file) if endpoint: endpoints.append(endpoint) except SyntaxError: logger.debug(f"Syntax error parsing {py_file}, skipping") continue except Exception as e: logger.debug(f"Error extracting Flask endpoints from {py_file}: {str(e)}") continue logger.info(f"Found {len(endpoints)} Flask endpoints") return endpoints def _parse_flask_decorator( self, decorator: ast.expr, func_node: ast.FunctionDef, file_path: Path ) -> Optional[Dict[str, Any]]: """ Parse a Flask decorator to extract endpoint information. Args: decorator: AST decorator node func_node: Function definition node file_path: Path to source file Returns: Endpoint dictionary or None if not a route decorator """ # Check if decorator is @app.route or @blueprint.route if isinstance(decorator, ast.Call): if isinstance(decorator.func, ast.Attribute) and decorator.func.attr == 'route': # Extract path from first argument if decorator.args and isinstance(decorator.args[0], ast.Constant): path = decorator.args[0].value # Extract methods from keyword arguments methods = ['GET'] # Flask default for keyword in decorator.keywords: if keyword.arg == 'methods': if isinstance(keyword.value, ast.List): methods = [ elt.value for elt in keyword.value.elts if isinstance(elt, ast.Constant) ] # Get function parameters params = [arg.arg for arg in func_node.args.args if arg.arg != 'self'] # Check if function has docstring docstring = ast.get_docstring(func_node) relative_path = file_path.relative_to(self.project_path) # Create endpoint entry for each HTTP method results = [] for method in methods: results.append({ "path": path, "method": method.upper(), "framework": "flask", "file": str(relative_path).replace('\\', '/'), "line": func_node.lineno, "function": func_node.name, "parameters": params, "documented": bool(docstring), "doc_coverage": 100 if docstring else 0 }) # Return first endpoint (will be called multiple times if multiple methods) return results[0] if results else None return None def _extract_express_endpoints(self) -> List[Dict[str, Any]]: """ Extract endpoints from Express applications using regex patterns. Note: JavaScript AST parsing deferred to Phase 3.5. Using regex for now. Returns: List of endpoint dictionaries """ endpoints = [] logger.info("Extracting Express endpoints") for js_file in list(self.project_path.rglob('*.js')) + list(self.project_path.rglob('*.ts')): # Skip common exclude directories if any(exclude in js_file.parts for exclude in ['.git', 'node_modules', 'dist', 'build']): continue try: with open(js_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() lines = content.split('\n') # Regex pattern for Express routes: app.get('/path', ... pattern = r'(app|router)\.(get|post|put|delete|patch)\s*\(\s*[\'"]([^\'"]+)[\'"]' for i, line in enumerate(lines, 1): for match in re.finditer(pattern, line): method = match.group(2).upper() path = match.group(3) relative_path = js_file.relative_to(self.project_path) endpoints.append({ "path": path, "method": method, "framework": "express", "file": str(relative_path).replace('\\', '/'), "line": i, "function": "unknown", # Hard to extract from regex "parameters": [], "documented": False, # Would need deeper analysis "doc_coverage": 0 }) except Exception as e: logger.debug(f"Error extracting Express endpoints from {js_file}: {str(e)}") continue logger.info(f"Found {len(endpoints)} Express endpoints") return endpoints def _extract_graphql_endpoints(self) -> List[Dict[str, Any]]: """ Extract GraphQL schemas (optional feature). Returns: List of GraphQL endpoint dictionaries """ endpoints = [] logger.info("Extracting GraphQL endpoints") for graphql_file in self.project_path.rglob('*.graphql'): try: with open(graphql_file, 'r', encoding='utf-8') as f: content = f.read() # Simple regex to find Query and Mutation types query_pattern = r'type\s+Query\s*\{([^}]+)\}' mutation_pattern = r'type\s+Mutation\s*\{([^}]+)\}' relative_path = graphql_file.relative_to(self.project_path) # Extract queries for match in re.finditer(query_pattern, content, re.DOTALL): fields = match.group(1) for field_match in re.finditer(r'(\w+)\s*\([^)]*\)\s*:\s*(\w+)', fields): endpoints.append({ "path": f"/graphql (query: {field_match.group(1)})", "method": "POST", "framework": "graphql", "file": str(relative_path).replace('\\', '/'), "line": 0, "function": field_match.group(1), "parameters": [], "documented": False, "doc_coverage": 0 }) # Extract mutations for match in re.finditer(mutation_pattern, content, re.DOTALL): fields = match.group(1) for field_match in re.finditer(r'(\w+)\s*\([^)]*\)\s*:\s*(\w+)', fields): endpoints.append({ "path": f"/graphql (mutation: {field_match.group(1)})", "method": "POST", "framework": "graphql", "file": str(relative_path).replace('\\', '/'), "line": 0, "function": field_match.group(1), "parameters": [], "documented": False, "doc_coverage": 0 }) except Exception as e: logger.debug(f"Error extracting GraphQL from {graphql_file}: {str(e)}") continue logger.info(f"Found {len(endpoints)} GraphQL endpoints") return endpoints def parse_openapi_docs(self) -> Dict[str, Dict[str, Any]]: """ Parse OpenAPI/Swagger documentation files using pyyaml. Returns: Dictionary mapping endpoint paths to documentation metadata """ docs = {} logger.info("Parsing OpenAPI/Swagger documentation") # Search for OpenAPI/Swagger files in project root and docs directories search_patterns = ['openapi.yaml', 'openapi.yml', 'openapi.json', 'swagger.yaml', 'swagger.yml', 'swagger.json'] openapi_files = [] for pattern in search_patterns: openapi_files.extend(self.project_path.rglob(pattern)) if not openapi_files: logger.info("No OpenAPI/Swagger files found") return docs # Try to import pyyaml try: import yaml except ImportError: logger.warning("pyyaml not installed, skipping OpenAPI parsing. Install with: pip install pyyaml>=6.0") return docs for doc_file in openapi_files: try: logger.info(f"Parsing OpenAPI/Swagger file: {doc_file}") # Read and parse the file with open(doc_file, 'r', encoding='utf-8') as f: if doc_file.suffix == '.json': spec = json.load(f) else: spec = yaml.safe_load(f) # Extract paths from OpenAPI spec paths = spec.get('paths', {}) for path, path_item in paths.items(): # Iterate through HTTP methods for method in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: if method in path_item: operation = path_item[method] # Build endpoint key (path + method) endpoint_key = f"{method.upper()}:{path}" # Extract documentation metadata docs[endpoint_key] = { "summary": operation.get('summary', ''), "description": operation.get('description', ''), "tags": operation.get('tags', []), "deprecated": operation.get('deprecated', False), "parameters": [ p.get('name', '') for p in operation.get('parameters', []) ], "operationId": operation.get('operationId', ''), "source_file": str(doc_file.relative_to(self.project_path)) } logger.info(f"Extracted {len(docs)} documented endpoints from {doc_file.name}") except json.JSONDecodeError as e: logger.warning(f"Invalid JSON in {doc_file}: {str(e)}") continue except yaml.YAMLError as e: logger.warning(f"Invalid YAML in {doc_file}: {str(e)}") continue except Exception as e: logger.debug(f"Error parsing OpenAPI file {doc_file}: {str(e)}") continue logger.info(f"Total OpenAPI/Swagger documented endpoints: {len(docs)}") return docs def calculate_documentation_coverage( self, endpoints: List[Dict[str, Any]], openapi_docs: Dict[str, Dict[str, Any]] ) -> int: """ Calculate percentage of endpoints with documentation. Enriches endpoints with OpenAPI documentation where available. Args: endpoints: List of discovered endpoints openapi_docs: Dictionary of OpenAPI documentation Returns: Documentation coverage percentage (0-100) """ if not endpoints: return 0 documented_count = 0 # Match OpenAPI docs with source endpoints for endpoint in endpoints: # Build endpoint key (method:path) endpoint_key = f"{endpoint.get('method', 'GET')}:{endpoint.get('path', '')}" # Check if endpoint has OpenAPI documentation if endpoint_key in openapi_docs: doc = openapi_docs[endpoint_key] # Enrich endpoint with OpenAPI data endpoint['summary'] = doc.get('summary', '') endpoint['description'] = doc.get('description', '') endpoint['tags'] = doc.get('tags', []) endpoint['deprecated'] = doc.get('deprecated', False) # Mark as documented if doc.get('summary') or doc.get('description'): endpoint['documented'] = True endpoint['doc_coverage'] = 100 documented_count += 1 elif endpoint.get('documented', False): # Already marked as documented (has docstring) documented_count += 1 coverage = int((documented_count / len(endpoints)) * 100) logger.info(f"Documentation coverage: {coverage}% ({documented_count}/{len(endpoints)} endpoints)") return coverage def generate_manifest( self, frameworks: Optional[List[str]] = None, include_graphql: bool = False, scan_documentation: bool = True ) -> Dict[str, Any]: """ Generate comprehensive API inventory manifest. Args: frameworks: List of frameworks to check (None = all) include_graphql: Whether to include GraphQL endpoints scan_documentation: Whether to scan for OpenAPI/Swagger docs Returns: Complete API manifest dictionary Raises: IOError: If manifest cannot be generated """ logger.info(f"Generating API inventory manifest") try: # Detect frameworks detected_frameworks = self.detect_frameworks(frameworks) # Extract endpoints endpoints = self.extract_endpoints(detected_frameworks, include_graphql) # Parse documentation if requested openapi_docs = {} if scan_documentation: openapi_docs = self.parse_openapi_docs() # Calculate metrics doc_coverage = self.calculate_documentation_coverage(endpoints, openapi_docs) # Count endpoints by framework and method framework_counts = {} method_counts = {} for endpoint in endpoints: fw = endpoint.get('framework', 'unknown') framework_counts[fw] = framework_counts.get(fw, 0) + 1 method = endpoint.get('method', 'UNKNOWN') method_counts[method] = method_counts.get(method, 0) + 1 # Build manifest structure manifest = { "project_name": self.project_path.name, "project_path": str(self.project_path), "generated_at": datetime.now().isoformat(), "frameworks": detected_frameworks, "endpoints": endpoints, "metrics": { "total_endpoints": len(endpoints), "documented_endpoints": sum(1 for ep in endpoints if ep.get('documented', False)), "documentation_coverage": doc_coverage, "frameworks_detected": detected_frameworks, "framework_breakdown": framework_counts, "method_breakdown": method_counts, "rest_endpoints": len([ep for ep in endpoints if ep.get('framework') != 'graphql']), "graphql_endpoints": len([ep for ep in endpoints if ep.get('framework') == 'graphql']) } } # Validate manifest self.validate_manifest(manifest) logger.info(f"API manifest generation complete: {len(endpoints)} endpoints") return manifest except Exception as e: log_error('manifest_generation_error', f"Failed to generate API manifest: {str(e)}", error=str(e)) raise IOError(f"Failed to generate API inventory manifest: {str(e)}") def save(self, manifest: Dict[str, Any], output_file: Optional[Path] = None) -> Path: """ Save API manifest to JSON file. Args: manifest: Manifest dictionary to save output_file: Optional custom output file path (defaults to coderef/inventory/api.json) Returns: Path to saved manifest file Raises: IOError: If file cannot be written """ if output_file is None: self.inventory_dir.mkdir(parents=True, exist_ok=True) output_file = self.inventory_dir / "api.json" try: # Validate before saving self.validate_manifest(manifest) with open(output_file, 'w', encoding='utf-8') as f: json.dump(manifest, f, indent=2, ensure_ascii=False) f.write('\n') # Add trailing newline logger.info(f"API manifest saved to {output_file}") return output_file except Exception as e: log_error('manifest_save_error', f"Failed to save API manifest: {str(e)}", path=str(output_file)) raise IOError(f"Failed to save API manifest to {output_file}: {str(e)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/srwlli/docs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server