import asyncio
import json
import logging
import re
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional, Tuple
import uvicorn
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import Resource, TextContent, Tool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("json-mapping-server")
mcp_server = Server("json-mapping-finder")
@dataclass
class PathInfo:
path: str
key: str
type: str
example: str
depth: int
class JSONMappingEngine:
"""Schema-aware explorer with heuristic field matching."""
def __init__(self):
self.samples: list[dict[str, Any] | list[Any]] = []
self.index: list[PathInfo] = []
def add_sample(self, sample: Any) -> None:
if not isinstance(sample, (dict, list)):
raise ValueError("Sample must be a JSON object or array")
self.samples.append(sample)
self.index = self._build_index()
def clear(self) -> None:
self.samples.clear()
self.index.clear()
def _build_index(self) -> list[PathInfo]:
index: list[PathInfo] = []
for sample in self.samples:
self._walk(sample, "$", 0, index)
return index
def _walk(self, node: Any, path: str, depth: int, index: list[PathInfo]) -> None:
if isinstance(node, dict):
for key, value in node.items():
next_path = f"{path}.{key}"
index.append(
PathInfo(
path=next_path,
key=str(key),
type=self._type_name(value),
example=self._example(value),
depth=depth + 1,
)
)
self._walk(value, next_path, depth + 1, index)
elif isinstance(node, list):
# Use wildcard to indicate array elements
next_path = f"{path}[*]"
index.append(
PathInfo(
path=next_path,
key="*",
type="array",
example=self._example(node[0]) if node else "[]",
depth=depth + 1,
)
)
for value in node:
self._walk(value, next_path, depth + 1, index)
def _type_name(self, value: Any) -> str:
if value is None:
return "null"
if isinstance(value, bool):
return "bool"
if isinstance(value, (int, float)):
return "number"
if isinstance(value, str):
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return type(value).__name__
def _example(self, value: Any) -> str:
if isinstance(value, (dict, list)):
try:
text = json.dumps(value, separators=(",", ":"))
return text[:120] + ("..." if len(text) > 120 else "")
except Exception:
return str(type(value))
text = str(value)
return text[:120] + ("..." if len(text) > 120 else "")
def list_schema(self, limit: int = 200) -> list[dict[str, Any]]:
return [
{
"path": p.path,
"key": p.key,
"type": p.type,
"example": p.example,
}
for p in self.index[:limit]
]
def search(self, query: str, top_k: int = 10) -> list[dict[str, Any]]:
if not self.index:
raise ValueError("No JSON samples loaded")
scored: list[tuple[float, PathInfo]] = []
norm_query = self._normalize(query)
for p in self.index:
score = self._score(norm_query, p)
if score > 0:
scored.append((score, p))
scored.sort(key=lambda x: x[0], reverse=True)
return [
{
"path": p.path,
"key": p.key,
"type": p.type,
"example": p.example,
"score": round(score, 3),
}
for score, p in scored[:top_k]
]
def map_targets(self, targets: list[str], top_k: int = 5) -> dict[str, list[dict[str, Any]]]:
return {t: self.search(t, top_k=top_k) for t in targets}
def _normalize(self, text: str) -> str:
return re.sub(r"[^a-z0-9]+", " ", text.lower()).strip()
def _token_set(self, text: str) -> set[str]:
return {t for t in self._normalize(text).split() if t}
def _score(self, norm_query: str, path_info: PathInfo) -> float:
norm_key = self._normalize(path_info.key)
norm_path = self._normalize(path_info.path)
tokens_query = self._token_set(norm_query)
tokens_key = self._token_set(norm_key)
tokens_path = self._token_set(norm_path)
# Exact or substring matches get a boost
exact = 1.0 if norm_query == norm_key else 0.0
substring = 0.8 if norm_query and norm_query in norm_path else 0.0
# Token overlap (Jaccard)
overlap = self._jaccard(tokens_query, tokens_key | tokens_path)
# Fuzzy similarity on key
fuzzy = SequenceMatcher(None, norm_query, norm_key).ratio()
# Depth penalty to prefer shallower fields slightly
depth_penalty = 1.0 / (1 + path_info.depth * 0.1)
score = (1.5 * exact) + substring + (1.2 * overlap) + (0.8 * fuzzy)
return score * depth_penalty
def _jaccard(self, a: set[str], b: set[str]) -> float:
if not a or not b:
return 0.0
return len(a & b) / len(a | b)
engine = JSONMappingEngine()
@mcp_server.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="json-mapping://schema",
name="Schema Index",
description="Current flattened JSON schema paths",
mimeType="application/json",
)
]
@mcp_server.read_resource()
async def read_resource(uri: str) -> str:
if uri != "json-mapping://schema":
raise ValueError(f"Unknown resource: {uri}")
return json.dumps(engine.list_schema(), indent=2)
@mcp_server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="upload_json_sample",
description="Upload a JSON sample to build the schema index.",
inputSchema={
"type": "object",
"properties": {"json_data": {"type": "string"}},
"required": ["json_data"],
},
),
Tool(
name="clear_samples",
description="Clear all loaded samples and reset the index.",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="list_schema",
description="List flattened JSON paths with types and examples.",
inputSchema={"type": "object", "properties": {"limit": {"type": "number"}}},
),
Tool(
name="search_fields",
description="Search for likely field paths by query using heuristics.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "number"},
},
"required": ["query"],
},
),
Tool(
name="map_targets",
description="Suggest mappings for a list of target field names.",
inputSchema={
"type": "object",
"properties": {
"targets": {"type": "array", "items": {"type": "string"}},
"top_k": {"type": "number"},
},
"required": ["targets"],
},
),
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> list[TextContent]:
try:
if name == "upload_json_sample":
raw = arguments.get("json_data")
sample = json.loads(raw)
engine.add_sample(sample)
msg = f"Loaded sample. Indexed {len(engine.index)} paths from {len(engine.samples)} sample(s)."
return [TextContent(type="text", text=msg)]
if name == "clear_samples":
engine.clear()
return [TextContent(type="text", text="Cleared all samples and index.")]
if name == "list_schema":
limit = int(arguments.get("limit") or 200)
data = engine.list_schema(limit=limit)
return [TextContent(type="text", text=json.dumps(data, indent=2))]
if name == "search_fields":
query = arguments.get("query")
top_k = int(arguments.get("top_k") or 10)
results = engine.search(query, top_k=top_k)
return [TextContent(type="text", text=json.dumps(results, indent=2))]
if name == "map_targets":
targets = arguments.get("targets") or []
top_k = int(arguments.get("top_k") or 5)
results = engine.map_targets(targets, top_k=top_k)
return [TextContent(type="text", text=json.dumps(results, indent=2))]
except Exception as exc:
logger.exception("Tool error")
return [TextContent(type="text", text=f"Error: {exc}")]
raise ValueError(f"Unknown tool: {name}")
session_manager = StreamableHTTPSessionManager(mcp_server, json_response=True)
async def _lifespan(scope, receive, send):
message = await receive()
if message["type"] != "lifespan.startup": # pragma: no cover
return
async with session_manager.run():
await send({"type": "lifespan.startup.complete"})
message = await receive()
if message["type"] == "lifespan.shutdown":
await send({"type": "lifespan.shutdown.complete"})
async def app(scope, receive, send):
if scope["type"] == "lifespan":
await _lifespan(scope, receive, send)
return
if scope["type"] == "http":
await session_manager.handle_request(scope, receive, send)
return
from starlette.responses import Response
response = Response("Unsupported scope", status_code=500)
await response(scope, receive, send)
def main():
port = 3004
logger.info("JSON Mapping Finder running at http://localhost:%s", port)
logger.info("Try with: npx -y @modelcontextprotocol/inspector http://localhost:%s", port)
uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
if __name__ == "__main__":
main()