Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
generator.py36.7 kB
import contextlib import json from pathlib import Path import secrets import string import time from typing import Any from jinja2 import Environment, FileSystemLoader class APIGenerationError(Exception): """Custom exception for API generation errors.""" pass TEMPLATE_DIR = Path(__file__).parent / "templates" if not TEMPLATE_DIR.is_dir(): TEMPLATE_DIR = Path("src/mockloop_mcp/templates") if not TEMPLATE_DIR.is_dir(): raise APIGenerationError("Template directory not found at expected locations.") # Note: autoescape=False is intentional here as we're generating Python code, not HTML # This is safe because we control all template inputs and don't render user-provided content jinja_env = Environment(loader=FileSystemLoader(TEMPLATE_DIR), autoescape=False) # noqa: S701 # nosec B701 # Add base64 encode filter for admin UI template import base64 def b64encode_filter(s): """Base64 encode filter for Jinja2 templates""" if isinstance(s, str): s = s.encode("utf-8") return base64.b64encode(s).decode("ascii") jinja_env.filters["b64encode"] = b64encode_filter # Add Python boolean conversion filter def python_bool_filter(value): """Convert JavaScript-style boolean values to Python boolean values""" if isinstance(value, str): js_to_python = {"true": True, "false": False, "null": None} return js_to_python.get(value, value) return value def convert_js_to_python(obj): """Recursively convert JavaScript-style boolean values to Python values""" if isinstance(obj, dict): return {k: convert_js_to_python(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_js_to_python(item) for item in obj] elif isinstance(obj, str): js_to_python = {"true": True, "false": False, "null": None} return js_to_python.get(obj, obj) return obj jinja_env.filters["python_bool"] = python_bool_filter jinja_env.filters["convert_js_to_python"] = convert_js_to_python def _to_bool(value: Any) -> bool: if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in ("true", "yes", "1", "on") if isinstance(value, int): return value != 0 return bool(value) def _generate_mock_data_from_schema(schema: dict[str, Any]) -> Any: if not schema: return None schema_type = schema.get("type") if schema_type == "string": format_type = schema.get("format", "") if format_type == "date-time": return "2023-01-01T00:00:00Z" if format_type == "date": return "2023-01-01" if format_type == "email": return "user@example.com" if format_type == "uuid": return "00000000-0000-0000-0000-000000000000" length = schema.get("minLength", 5) if schema.get("maxLength") and schema.get("maxLength") < length: length = schema.get("maxLength") return "".join(secrets.choice(string.ascii_letters) for _ in range(length)) if schema_type in {"number", "integer"}: minimum = schema.get("minimum", 0) maximum = schema.get("maximum", 100) return ( secrets.randbelow(maximum - minimum + 1) + minimum if schema_type == "integer" else round( secrets.randbelow(int((maximum - minimum) * 100)) / 100 + minimum, 2 ) ) if schema_type == "boolean": return secrets.choice([True, False]) if schema_type == "array": items_schema = schema.get("items", {}) min_items = schema.get("minItems", 1) max_items = schema.get("maxItems", 3) num_items = secrets.randbelow(max_items - min_items + 1) + min_items return [_generate_mock_data_from_schema(items_schema) for _ in range(num_items)] if schema_type == "object": result = {} properties = schema.get("properties", {}) required = schema.get("required", []) for prop_name, prop_schema in properties.items(): if prop_name in required or secrets.randbelow(10) > 2: result[prop_name] = _generate_mock_data_from_schema(prop_schema) return result if "$ref" in schema: return {"$ref_placeholder": schema["$ref"]} for key in ["oneOf", "anyOf"]: if key in schema and isinstance(schema[key], list) and len(schema[key]) > 0: return _generate_mock_data_from_schema(secrets.choice(schema[key])) if ( "allOf" in schema and isinstance(schema["allOf"], list) and len(schema["allOf"]) > 0 ): merged_schema = {} for sub_schema in schema["allOf"]: if isinstance(sub_schema, dict): merged_schema.update(sub_schema) return _generate_mock_data_from_schema(merged_schema) return "mock_data" def generate_mock_api( spec_data: dict[str, Any], output_base_dir: str | Path | None = None, mock_server_name: str | None = None, auth_enabled: Any = True, webhooks_enabled: Any = True, admin_ui_enabled: Any = True, storage_enabled: Any = True, business_port: int = 8000, admin_port: int | None = None, ) -> Path: auth_enabled_bool = _to_bool(auth_enabled) webhooks_enabled_bool = _to_bool(webhooks_enabled) admin_ui_enabled_bool = _to_bool(admin_ui_enabled) storage_enabled_bool = _to_bool(storage_enabled) # Set admin port to business_port + 1 if not specified if admin_port is None: admin_port = business_port + 1 try: api_title = ( spec_data.get("info", {}) .get("title", "mock_api") .lower() .replace(" ", "_") .replace("-", "_") ) api_version = ( spec_data.get("info", {}).get("version", "v1").lower().replace(".", "_") ) _mock_server_name = mock_server_name if not _mock_server_name: _mock_server_name = f"{api_title}_{api_version}_{int(time.time())}" _mock_server_name = "".join( c if c.isalnum() or c in ["_", "-"] else "_" for c in _mock_server_name ) _output_base_dir = output_base_dir if _output_base_dir is None: project_root = Path(__file__).parent.parent.parent _output_base_dir = project_root / "generated_mocks" mock_server_dir = Path(_output_base_dir) / _mock_server_name mock_server_dir.mkdir(parents=True, exist_ok=True) requirements_content = "fastapi\nuvicorn[standard]\npsutil\n" with open( mock_server_dir / "requirements_mock.txt", "w", encoding="utf-8" ) as f: f.write(requirements_content) if auth_enabled_bool: auth_middleware_template = jinja_env.get_template( "auth_middleware_template.j2" ) random_suffix = "".join( secrets.choice(string.ascii_letters + string.digits) for _ in range(8) ) auth_middleware_code = auth_middleware_template.render( random_suffix=random_suffix ) with open( mock_server_dir / "auth_middleware.py", "w", encoding="utf-8" ) as f: f.write(auth_middleware_code) with open( mock_server_dir / "requirements_mock.txt", "a", encoding="utf-8" ) as f: f.write("pyjwt\n") f.write("python-multipart\n") # Add python-multipart here if webhooks_enabled_bool: webhook_template = jinja_env.get_template("webhook_template.j2") webhook_code = webhook_template.render() with open( mock_server_dir / "webhook_handler.py", "w", encoding="utf-8" ) as f: f.write(webhook_code) with open( mock_server_dir / "requirements_mock.txt", "a", encoding="utf-8" ) as f: f.write("httpx\n") if storage_enabled_bool: storage_template = jinja_env.get_template("storage_template.j2") storage_code = storage_template.render() with open(mock_server_dir / "storage.py", "w", encoding="utf-8") as f: f.write(storage_code) (mock_server_dir / "mock_data").mkdir(exist_ok=True) if admin_ui_enabled_bool: # Load analytics charts and functions templates analytics_charts_template = jinja_env.get_template( "analytics_charts_template.j2" ) analytics_charts_code = analytics_charts_template.render() analytics_functions_template = jinja_env.get_template( "analytics_functions_template.j2" ) analytics_functions_code = analytics_functions_template.render() admin_ui_template = jinja_env.get_template("admin_ui_template.j2") admin_ui_code = admin_ui_template.render( api_title=spec_data.get("info", {}).get("title", "Mock API"), api_version=spec_data.get("info", {}).get("version", "1.0.0"), auth_enabled=auth_enabled_bool, webhooks_enabled=webhooks_enabled_bool, storage_enabled=storage_enabled_bool, analytics_charts_js=analytics_charts_code, analytics_functions_js=analytics_functions_code, ) (mock_server_dir / "templates").mkdir(exist_ok=True) with open( mock_server_dir / "templates" / "admin.html", "w", encoding="utf-8" ) as f: f.write(admin_ui_code) with open( mock_server_dir / "requirements_mock.txt", "a", encoding="utf-8" ) as f: f.write("jinja2\n") # Generate log analyzer module for admin UI analytics log_analyzer_template = jinja_env.get_template("log_analyzer_template.j2") log_analyzer_code = log_analyzer_template.render() with open(mock_server_dir / "log_analyzer.py", "w", encoding="utf-8") as f: f.write(log_analyzer_code) # Copy favicon.ico to prevent 404s in admin UI import shutil favicon_source_paths = [ Path(__file__).parent.parent.parent / "favicon.ico", # Project root Path(__file__).parent / "favicon.ico", # Template directory Path("favicon.ico"), # Current directory ] for favicon_source in favicon_source_paths: if favicon_source.exists(): try: shutil.copy2(favicon_source, mock_server_dir / "favicon.ico") break except Exception as e: # Log the error and continue to next path if copy fails print(f"Failed to copy favicon from {favicon_source}: {e}") continue routes_code_parts: list[str] = [] paths = spec_data.get("paths", {}) for path_url, methods in paths.items(): for method, details in methods.items(): valid_methods = [ "get", "post", "put", "delete", "patch", "options", "head", "trace", ] if method.lower() not in valid_methods: continue path_params = "" parameters = details.get("parameters", []) path_param_list = [] for param in parameters: if param.get("in") == "path": param_name = param.get("name") param_type = param.get("schema", {}).get("type", "string") python_type = "str" if param_type == "integer": python_type = "int" elif param_type == "number": python_type = "float" elif param_type == "boolean": python_type = "bool" path_param_list.append(f"{param_name}: {python_type}") if path_param_list: path_params = ", ".join(path_param_list) example_response = None responses = details.get("responses", {}) for status_code, response_info in responses.items(): if status_code.startswith("2"): content = response_info.get("content", {}) for content_type, content_schema in content.items(): if "application/json" in content_type: if "example" in content_schema: converted_example = convert_js_to_python( content_schema["example"] ) example_response = repr(converted_example) break schema = content_schema.get("schema", {}) if "example" in schema: converted_example = convert_js_to_python( schema["example"] ) example_response = repr(converted_example) break examples = content_schema.get("examples", {}) if examples: first_example = next(iter(examples.values()), {}) if "value" in first_example: converted_example = convert_js_to_python( first_example["value"] ) example_response = repr(converted_example) break if example_response: break if not example_response: for status_code, response_info in responses.items(): if status_code.startswith("2"): content = response_info.get("content", {}) for content_type, content_schema in content.items(): if "application/json" in content_type: schema = content_schema.get("schema", {}) mock_data = _generate_mock_data_from_schema(schema) if mock_data: # Convert JavaScript-style values to Python values before repr() converted_data = convert_js_to_python(mock_data) # Use repr() to ensure Python boolean values are properly formatted example_response = repr(converted_data) break if example_response: break route_template = jinja_env.get_template("route_template.j2") route_code = route_template.render( method=method.lower(), path=path_url, summary=details.get("summary", f"{method.upper()} {path_url}"), path_params=path_params, example_response=example_response, webhooks_enabled=webhooks_enabled_bool, ) routes_code_parts.append(route_code) # Add favicon route when admin UI is enabled to prevent 404s if admin_ui_enabled_bool: favicon_route = '''@app.get("/favicon.ico", summary="Favicon", tags=["_system"]) async def favicon(): """Serve favicon to prevent 404 errors in admin UI""" from fastapi.responses import FileResponse import os # Try to find favicon.ico in common locations favicon_paths = [ "favicon.ico", "../favicon.ico", "../../favicon.ico", os.path.join(os.path.dirname(__file__), "favicon.ico"), os.path.join(os.path.dirname(__file__), "..", "favicon.ico"), os.path.join(os.path.dirname(__file__), "..", "..", "favicon.ico") ] for favicon_path in favicon_paths: if os.path.exists(favicon_path): return FileResponse(favicon_path, media_type="image/x-icon") # If no favicon found, return a simple 1x1 transparent PNG as fallback from fastapi.responses import Response # 1x1 transparent PNG in base64 transparent_png = b'\\x89PNG\\r\\n\\x1a\\n\\x00\\x00\\x00\\rIHDR\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x01\\x08\\x06\\x00\\x00\\x00\\x1f\\x15\\xc4\\x89\\x00\\x00\\x00\\rIDATx\\x9cc\\xf8\\x0f\\x00\\x00\\x01\\x00\\x01\\x00\\x18\\xdd\\x8d\\xb4\\x00\\x00\\x00\\x00IEND\\xaeB`\\x82' return Response(content=transparent_png, media_type="image/png")''' routes_code_parts.append(favicon_route) all_routes_code = "\n\n".join(routes_code_parts) middleware_template = jinja_env.get_template("middleware_log_template.j2") logging_middleware_code = middleware_template.render() with open( mock_server_dir / "logging_middleware.py", "w", encoding="utf-8" ) as f: f.write(logging_middleware_code) # Generate separate admin logging middleware if admin UI is enabled if admin_ui_enabled_bool: admin_middleware_template = jinja_env.get_template( "admin_middleware_log_template.j2" ) admin_logging_middleware_code = admin_middleware_template.render() with open( mock_server_dir / "admin_logging_middleware.py", "w", encoding="utf-8" ) as f: f.write(admin_logging_middleware_code) common_imports = "from fastapi import FastAPI, Request, Depends, HTTPException, status, Form, Body, Query, Path, BackgroundTasks\nfrom fastapi.responses import HTMLResponse, JSONResponse\nfrom fastapi.templating import Jinja2Templates\nfrom fastapi.staticfiles import StaticFiles\nfrom fastapi.middleware.cors import CORSMiddleware\nfrom typing import List, Dict, Any, Optional\nimport json\nimport os\nimport time\nimport sqlite3\nimport logging\nfrom datetime import datetime\nfrom pathlib import Path\nfrom logging_middleware import LoggingMiddleware\n" auth_imports = ( "from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer\nfrom auth_middleware import verify_api_key, verify_jwt_token, generate_token_response\n" if auth_enabled_bool else "" ) webhook_imports = ( 'from webhook_handler import register_webhook, get_webhooks, delete_webhook, get_webhook_history, trigger_webhooks, test_webhook\n\n# Configure logging for webhook functionality\nlogger = logging.getLogger("webhook_handler")\n' if webhooks_enabled_bool else "" ) storage_imports = ( "from storage import StorageManager, get_storage_stats, get_collections\n" if storage_enabled_bool else "" ) imports_section = ( common_imports + auth_imports + webhook_imports + storage_imports ) app_setup = 'app = FastAPI(title="{{ api_title }}", version="{{ api_version }}")\ntemplates = Jinja2Templates(directory="templates")\napp.add_middleware(LoggingMiddleware)\napp.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])\n\n# Setup database path for logs (same as in middleware)\ndb_dir = Path("db")\ndb_dir.mkdir(exist_ok=True)\nDB_PATH = db_dir / "request_logs.db"\n\n# Global variable for active scenario\nactive_scenario = None\n\n# Initialize active scenario from database on startup\ndef load_active_scenario():\n global active_scenario\n try:\n conn = sqlite3.connect(str(DB_PATH))\n conn.row_factory = sqlite3.Row\n cursor = conn.cursor()\n cursor.execute("SELECT id, name, config FROM mock_scenarios WHERE is_active = 1")\n row = cursor.fetchone()\n if row:\n active_scenario = {\n "id": row[0],\n "name": row[1],\n "config": json.loads(row[2]) if row[2] else {}\n }\n conn.close()\n except Exception as e:\n print(f"Error loading active scenario: {e}")\n active_scenario = None\n\n# Load active scenario on startup\nload_active_scenario()\n' auth_endpoints_str = ( '@app.post("/token", summary="Get access token", tags=["authentication"])\nasync def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):\n return generate_token_response(form_data.username, form_data.password)\n' if auth_enabled_bool else "" ) admin_api_endpoints_str = "" if admin_ui_enabled_bool: admin_api_endpoints_str = """ # --- Admin API Endpoints --- @admin_app.get("/api/export", tags=["_admin"]) async def export_data(): import io import zipfile from fastapi.responses import StreamingResponse try: # Create in-memory zip file zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: # Export request logs conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get all request logs cursor.execute(''' SELECT * FROM request_logs ORDER BY created_at DESC ''') logs = cursor.fetchall() # Convert to JSON logs_data = [] for row in logs: log_entry = dict(row) if log_entry.get('headers'): try: log_entry['headers'] = json.loads(log_entry['headers']) except: pass logs_data.append(log_entry) # Add logs to zip zip_file.writestr("request_logs.json", json.dumps(logs_data, indent=2)) # Export performance metrics if available try: cursor.execute('SELECT * FROM performance_metrics ORDER BY recorded_at DESC') metrics = [dict(row) for row in cursor.fetchall()] zip_file.writestr("performance_metrics.json", json.dumps(metrics, indent=2)) except: pass # Export test sessions if available try: cursor.execute('SELECT * FROM test_sessions ORDER BY created_at DESC') sessions = [dict(row) for row in cursor.fetchall()] zip_file.writestr("test_sessions.json", json.dumps(sessions, indent=2)) except: pass conn.close() # Add metadata metadata = { "export_timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime()), "total_logs": len(logs_data), "database_path": str(DB_PATH), "server_info": { "business_port": """ + str(business_port) + """, "admin_port": """ + str(admin_port) + """, "active_scenario": active_scenario } } zip_file.writestr("metadata.json", json.dumps(metadata, indent=2)) zip_buffer.seek(0) # Return as streaming response def iter_zip(): yield zip_buffer.getvalue() timestamp = time.strftime('%Y%m%d_%H%M%S', time.gmtime()) filename = f"mockloop_export_{timestamp}.zip" print(f"DEBUG ADMIN: Exported {len(logs_data)} logs to {filename}") return StreamingResponse( iter_zip(), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={filename}"} ) except Exception as e: print(f"DEBUG ADMIN: Error exporting data: {e}") return {"error": str(e)} @admin_app.get("/api/requests", tags=["_admin"]) async def get_request_logs(limit: int = 100, offset: int = 0): try: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get total count cursor.execute("SELECT COUNT(*) FROM request_logs") total_count = cursor.fetchone()[0] # Get paginated logs with all available columns cursor.execute(''' SELECT id, timestamp, type, method, path, status_code, process_time_ms, client_host, client_port, headers, query_params, request_body, response_body, session_id, test_scenario, correlation_id, user_agent, response_size, is_admin, created_at FROM request_logs ORDER BY created_at DESC LIMIT ? OFFSET ? ''', (limit, offset)) logs = [] for row in cursor.fetchall(): log_entry = { "id": row["id"], "timestamp": row["timestamp"], "type": row["type"], "method": row["method"], "path": row["path"], "status_code": row["status_code"], "process_time_ms": row["process_time_ms"], "client_host": row["client_host"], "client_port": row["client_port"], "headers": json.loads(row["headers"]) if row["headers"] else {}, "query_params": row["query_params"], "request_body": row["request_body"], "response_body": row["response_body"], "session_id": row["session_id"], "test_scenario": row["test_scenario"], "correlation_id": row["correlation_id"], "user_agent": row["user_agent"], "response_size": row["response_size"], "is_admin": bool(row["is_admin"]), "created_at": row["created_at"] } logs.append(log_entry) conn.close() print(f"DEBUG ADMIN: Retrieved {len(logs)} logs from database (total: {total_count})") return {"logs": logs, "count": total_count} except Exception as e: print(f"DEBUG ADMIN: Error querying database: {e}") return {"logs": [], "count": 0, "error": str(e)} @admin_app.get("/api/debug", tags=["_admin"]) async def get_debug_info(): try: # Get database info conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() # Check database tables and counts cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] table_info = {} for table in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] table_info[table] = count # Get recent logs count cursor.execute("SELECT COUNT(*) FROM request_logs WHERE created_at > datetime('now', '-1 hour')") recent_logs = cursor.fetchone()[0] # Get schema version cursor.execute("SELECT MAX(version) FROM schema_version") schema_version = cursor.fetchone()[0] or 0 conn.close() debug_info = { "status": "ok", "database": { "path": str(DB_PATH), "tables": table_info, "schema_version": schema_version, "recent_logs_1h": recent_logs }, "server": { "business_port": """ + str(business_port) + """, "admin_port": """ + str(admin_port) + """, "active_scenario": active_scenario }, "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S%z', time.gmtime()) } print(f"DEBUG ADMIN: Debug info retrieved successfully") return debug_info except Exception as e: print(f"DEBUG ADMIN: Error getting debug info: {e}") return {"status": "error", "error": str(e)}""" # noqa: S608 webhook_api_endpoints_str = "" if webhooks_enabled_bool and admin_ui_enabled_bool: _webhook_api_endpoints_raw = """ @admin_app.get("/api/webhooks", tags=["_admin"]) async def admin_get_webhooks(): return get_webhooks() @admin_app.post("/api/webhooks", tags=["_admin"]) async def admin_register_webhook(webhook_data: dict = Body(...)): event_type = webhook_data.get("event_type") url = webhook_data.get("url") description = webhook_data.get("description") if not event_type or not url: raise HTTPException(status_code=400, detail="event_type and url are required") return register_webhook(event_type, url, description) @admin_app.delete("/api/webhooks/{webhook_id}", tags=["_admin"]) async def admin_delete_webhook(webhook_id: str): return delete_webhook(webhook_id) @admin_app.post("/api/webhooks/{webhook_id}/test", tags=["_admin"]) async def admin_test_webhook(webhook_id: str): return await test_webhook(webhook_id) @admin_app.get("/api/webhooks/history", tags=["_admin"]) async def admin_get_webhook_history(): return get_webhook_history()""" webhook_api_endpoints_str = _webhook_api_endpoints_raw.strip() storage_api_endpoints_str = "" if storage_enabled_bool and admin_ui_enabled_bool: _storage_api_endpoints_raw = """ @admin_app.get("/api/storage/stats", tags=["_admin"]) async def admin_get_storage_stats(): return get_storage_stats() @admin_app.get("/api/storage/collections", tags=["_admin"]) async def admin_get_collections(): return get_collections() """ storage_api_endpoints_str = _storage_api_endpoints_raw.strip() if admin_ui_enabled_bool: admin_ui_endpoint_str = f''' @admin_app.get("/", response_class=HTMLResponse, summary="Admin UI", tags=["_system"]) async def read_admin_ui(request: Request): return templates.TemplateResponse("admin.html", {{ "request": request, "api_title": "{api_title}", "api_version": "{api_version}", "auth_enabled": {auth_enabled_bool}, "webhooks_enabled": {webhooks_enabled_bool}, "storage_enabled": {storage_enabled_bool} }})''' else: admin_ui_endpoint_str = " @app.get(\"/\")\n async def no_admin(): return {'message': 'Admin UI not enabled'}" health_endpoint_str = '@app.get("/health", summary="Health check endpoint", tags=["_system"])\nasync def health_check(): return {"status": "healthy"}\n' # Create separate main sections for mocked API and admin servers business_main_section_str = f'''if __name__ == "__main__": import uvicorn import threading import time def run_business_server(): uvicorn.run(app, host="0.0.0.0", port={business_port}) # Create admin app at module level admin_app = FastAPI(title="{api_title} Admin", version="{api_version}") admin_app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) # Add admin-specific middleware for separate logging from admin_logging_middleware import AdminLoggingMiddleware admin_app.add_middleware(AdminLoggingMiddleware) def run_admin_server(): # Run the admin server uvicorn.run(admin_app, host="0.0.0.0", port={admin_port}) # Admin endpoints {admin_api_endpoints_str if admin_ui_enabled_bool else ""} {webhook_api_endpoints_str if webhooks_enabled_bool and admin_ui_enabled_bool else ""} {storage_api_endpoints_str if storage_enabled_bool and admin_ui_enabled_bool else ""} {admin_ui_endpoint_str if admin_ui_enabled_bool else ""} # Add health check for admin server @admin_app.get("/health", summary="Admin health check", tags=["_system"]) async def admin_health_check(): return {{"status": "healthy", "server": "admin"}} # Start both servers if {admin_ui_enabled_bool}: # Start admin server in separate thread admin_thread = threading.Thread(target=run_admin_server, daemon=True) admin_thread.start() time.sleep(1) # Give admin server time to start print(f"Mocked API server starting on port {business_port}") print(f"Admin UI server running on port {admin_port}") else: print(f"Mocked API server starting on port {business_port}") # Start mocked API server (main thread) run_business_server() ''' main_app_template_str = ( imports_section + app_setup + auth_endpoints_str + "\n# --- Generated Routes ---\n{{ routes_code }}\n# --- End Generated Routes ---\n" + health_endpoint_str + business_main_section_str ) main_app_jinja_template = jinja_env.from_string(main_app_template_str) main_py_content = main_app_jinja_template.render( api_title=api_title, api_version=api_version, routes_code=all_routes_code, default_port=business_port, ) with open(mock_server_dir / "main.py", "w", encoding="utf-8") as f: f.write(main_py_content) dockerfile_template = jinja_env.get_template("dockerfile_template.j2") dockerfile_content = dockerfile_template.render( python_version="3.9-slim", port=business_port, auth_enabled=auth_enabled_bool, webhooks_enabled=webhooks_enabled_bool, storage_enabled=storage_enabled_bool, admin_ui_enabled=admin_ui_enabled_bool, ) with open(mock_server_dir / "Dockerfile", "w", encoding="utf-8") as f: f.write(dockerfile_content) compose_template = jinja_env.get_template("docker_compose_template.j2") timestamp_for_id = str(int(time.time()))[-6:] raw_api_title = spec_data.get("info", {}).get("title", "mock_api") clean_service_name = "".join( c if c.isalnum() else "-" for c in raw_api_title.lower() ) while "--" in clean_service_name: clean_service_name = clean_service_name.replace("--", "-") clean_service_name = clean_service_name.strip("-") if not clean_service_name: clean_service_name = "mock-api" final_service_name = f"{clean_service_name}-mock" compose_content = compose_template.render( service_name=final_service_name, business_port=business_port, admin_port=admin_port, admin_ui_enabled=admin_ui_enabled_bool, timestamp_id=timestamp_for_id, ) with open(mock_server_dir / "docker-compose.yml", "w", encoding="utf-8") as f: f.write(compose_content) return mock_server_dir except Exception as e: raise APIGenerationError(f"Failed to generate mock API: {e}") from e if __name__ == "__main__": dummy_spec = { "openapi": "3.0.0", "info": {"title": "Test API", "version": "1.0.1"}, "paths": {"/items": {"get": {"summary": "Get all items"}}}, } with contextlib.suppress(APIGenerationError): generated_path = generate_mock_api( dummy_spec, mock_server_name="my_test_api_main" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server