MCP Server: Scalable OpenAPI Endpoint Discovery and API Request Tool
by baryhuang
- mcp-server-any-openapi
- src
- mcp_server_any_openapi
import json
import logging
from typing import Any, Dict, List
import requests
import numpy as np
import faiss
import os
from urllib.parse import urlparse
from sentence_transformers import SentenceTransformer
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, create_model
from fastapi import FastAPI
from mcp.server import Server
import mcp.types as types
import mcp.server.stdio
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_any_openapi")
def extract_base_url(url: str) -> str:
"""Extract base URL from a full URL."""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
class EndpointSearcher:
def __init__(self):
# Try to use local model if exists, otherwise download from HuggingFace
local_model_path = '/app/models/all-MiniLM-L6-v2'
if os.path.exists(local_model_path):
logger.info(f"Using local model from {local_model_path}")
self.model = SentenceTransformer(local_model_path)
else:
logger.info("Local model not found, downloading from HuggingFace")
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.index = None
self.endpoints = []
self._initialize_endpoints()
def _parse_schema_ref(self, schema_ref: str, components: Dict) -> Dict:
"""Parse schema reference and return the actual schema"""
if not schema_ref.startswith('#/components/schemas/'):
return {}
schema_name = schema_ref.split('/')[-1]
return components['schemas'].get(schema_name, {})
def _format_schema(self, schema: Dict, components: Dict, indent: int = 0) -> str:
"""Format schema into readable text"""
if not schema:
return ""
lines = []
# Handle reference
if '$ref' in schema:
schema = self._parse_schema_ref(schema['$ref'], components)
# Add type
if 'type' in schema:
lines.append(f"{' ' * indent}Type: {schema['type']}")
# Add description
if 'description' in schema:
lines.append(f"{' ' * indent}Description: {schema['description']}")
# Add properties
if 'properties' in schema:
lines.append(f"{' ' * indent}Properties:")
for prop_name, prop_schema in schema['properties'].items():
lines.append(f"{' ' * (indent + 1)}{prop_name}:")
lines.append(self._format_schema(prop_schema, components, indent + 2))
# Add required fields
if 'required' in schema:
lines.append(f"{' ' * indent}Required: {', '.join(schema['required'])}")
return '\n'.join(lines)
def _create_endpoint_document(self, path: str, method: str, operation: Dict, components: Dict) -> str:
"""Create a complete document for an endpoint including all its details"""
doc_parts = []
# Basic info
doc_parts.append(f"Path: {path}")
doc_parts.append(f"Method: {method.upper()}")
# Summary and description
if 'summary' in operation:
doc_parts.append(f"Summary: {operation['summary']}")
if 'description' in operation:
doc_parts.append(f"Description: {operation['description']}")
# Tags
if 'tags' in operation:
doc_parts.append(f"Tags: {', '.join(operation['tags'])}")
# Parameters
if 'parameters' in operation:
doc_parts.append("Parameters:")
for param in operation['parameters']:
param_doc = [f" - {param['name']} ({param['in']})"]
if 'description' in param:
param_doc.append(f" Description: {param['description']}")
if 'required' in param:
param_doc.append(f" Required: {param['required']}")
if 'schema' in param:
param_doc.append(f" Schema:\n{self._format_schema(param['schema'], components, 3)}")
doc_parts.append('\n'.join(param_doc))
# Request Body
if 'requestBody' in operation:
doc_parts.append("Request Body:")
body = operation['requestBody']
if 'description' in body:
doc_parts.append(f" Description: {body['description']}")
if 'content' in body:
for content_type, content in body['content'].items():
doc_parts.append(f" Content Type: {content_type}")
if 'schema' in content:
doc_parts.append(f" Schema:\n{self._format_schema(content['schema'], components, 2)}")
# Responses
if 'responses' in operation:
doc_parts.append("Responses:")
for status, response in operation['responses'].items():
resp_doc = [f" {status}:"]
if 'description' in response:
resp_doc.append(f" Description: {response['description']}")
if 'content' in response:
for content_type, content in response['content'].items():
resp_doc.append(f" Content Type: {content_type}")
if 'schema' in content:
resp_doc.append(f" Schema:\n{self._format_schema(content['schema'], components, 3)}")
doc_parts.append('\n'.join(resp_doc))
return '\n'.join(doc_parts)
def _initialize_endpoints(self):
"""Initialize endpoints from OpenAPI spec"""
# Get OpenAPI spec URL from environment variable
api_url = os.getenv('OPENAPI_JSON_DOCS_URL', 'https://raw.githubusercontent.com/seriousme/fastify-openapi-glue/refs/heads/master/examples/petstore/petstore-openapi.v3.json')
# Extract base URL from OpenAPI spec URL and use it as default for API_REQUEST_BASE_URL
default_base_url = extract_base_url(api_url)
self.base_url = os.getenv('API_REQUEST_BASE_URL', default_base_url)
logger.info(f"Using API base URL: {self.base_url}")
try:
response = requests.get(api_url)
response.raise_for_status()
openapi_spec = response.json()
# Extract endpoints with their full documentation
for path, path_item in openapi_spec['paths'].items():
for method, operation in path_item.items():
if method.lower() not in ['get', 'post', 'put', 'delete', 'patch']:
continue
# Create full document for the endpoint
doc = self._create_endpoint_document(path, method, operation, openapi_spec.get('components', {}))
self.endpoints.append({
'path': path,
'method': method.upper(),
'document': doc,
'operation': operation
})
# Create FAISS index
if self.endpoints:
# Create embeddings for full endpoint documents
documents = [ep['document'] for ep in self.endpoints]
embeddings = self.model.encode(documents)
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(np.array(embeddings).astype('float32'))
logger.info(f"Successfully initialized FAISS index with {len(self.endpoints)} endpoints")
else:
logger.warning("No endpoints found in OpenAPI spec")
except Exception as e:
logger.error(f"Failed to initialize endpoints: {str(e)}")
raise
def search(self, query: str, k: int = 3) -> List[Dict]:
"""Search for endpoints matching the query"""
if not self.index:
return []
# Create query embedding
query_embedding = self.model.encode([query])
distances, indices = self.index.search(np.array(query_embedding).astype('float32'), k)
results = []
for idx in indices[0]:
if idx < len(self.endpoints):
endpoint = self.endpoints[idx]
results.append({
'path': endpoint['path'],
'method': endpoint['method'],
'operation': endpoint['operation'],
'document': endpoint['document']
})
return results
async def main():
"""Run the Any OpenAPI Server"""
logger.info("Any OpenAPI Server starting")
# Get API prefix from environment variable with default value
api_prefix = os.getenv('MCP_API_PREFIX', 'any_openapi')
server = Server(api_prefix)
endpoint_searcher = EndpointSearcher()
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name=f"{api_prefix}_api_request_schema",
description="Get API endpoint schemas that match your intent. Returns endpoint details including path, method, parameters, and response formats.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Describe what you want to do with the API (e.g., 'Get user profile information', 'Create a new job posting')"
}
},
"required": ["query"]
},
),
types.Tool(
name=f"{api_prefix}_make_request",
description="Make an actual REST API request with full control over method, headers, body, and parameters.",
inputSchema={
"type": "object",
"properties": {
"method": {
"type": "string",
"description": "HTTP method (GET, POST, PUT, DELETE, PATCH)",
"enum": ["GET", "POST", "PUT", "DELETE", "PATCH"]
},
"url": {
"type": "string",
"description": "Fully qualified API URL (e.g., https://api.example.com/users/123)"
},
"headers": {
"type": "object",
"description": "Request headers",
"additionalProperties": {
"type": "string"
}
},
"query_params": {
"type": "object",
"description": "Query parameters",
"additionalProperties": {
"type": "string"
}
},
"body": {
"type": "object",
"description": "Request body (for POST, PUT, PATCH)",
"additionalProperties": True
}
},
"required": ["method", "url"]
},
)
]
@server.call_tool()
async def handle_invoke_tool(name: str, inputs: Dict[str, Any]) -> str:
"""Handle tool invocations"""
try:
schema_tool_name = f"{api_prefix}_api_request_schema"
request_tool_name = f"{api_prefix}_make_request"
if name == schema_tool_name:
query = inputs["query"]
results = endpoint_searcher.search(query)
return [types.TextContent(type="text", text=json.dumps({
"api_base_url": endpoint_searcher.base_url,
"matching_endpoints": results
}, indent=2))]
elif name == request_tool_name:
# Prepare request parameters
method = inputs["method"]
url = inputs["url"] # Use the full URL directly
# Optional parameters with defaults
headers = inputs.get("headers", {})
query_params = inputs.get("query_params", {})
body = inputs.get("body", None)
try:
response = requests.request(
method=method,
url=url,
headers=headers,
params=query_params,
json=body if body else None
)
# Try to parse response as JSON
try:
response_data = response.json()
except:
response_data = response.text
return [types.TextContent(type="text", text=json.dumps({
"status_code": response.status_code,
"headers": dict(response.headers),
"body": response_data
}, indent=2))]
except requests.exceptions.RequestException as e:
return [types.TextContent(type="text", text=json.dumps({
"error": str(e)
}, indent=2))]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error invoking tool {name}: {str(e)}")
return [types.TextContent(type="text", text=json.dumps({
"error": str(e)
}, indent=2))]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="any_openapi",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())