Skip to main content
Glama
MCP-BLUEPRINT.md23.8 kB
# Blueprint for Building the CodeFlowGraph MCP Server This blueprint provides a detailed, linear roadmap for implementing the MCP server, focusing on stdio transport for local testing. It leverages the existing codebase (`core/ast_extractor.py`, `core/call_graph_builder.py`, `core/vector_store.py`) without modifications, exposing analysis tools via FastMCP over JSON-RPC 2.0. The core feature set includes handshake, resource discovery, tool invocation, basic error handling, simple state exchange, logging, and shutdown. Development occurs in the `mcp-server-v1` branch, emphasizing quick iteration to a locally testable server. ### Overall Guidelines - **Environment Setup**: Python 3.11+; virtualenv; `pip install -e .`. Assume existing repo. - **Transport Focus**: Stdio only (local, trusted; no auth/HTTP). Run via `python -m code_flow_graph.mcp_server` for pipe-based testing. - **Testing Strategy**: Pytest for units (mock core modules); integration with FastMCP stdio client mocks. Run `pytest` after each step. Smoke tests: Pipe input/output via subprocess or simple client script. - **Integration Rule**: Each step wires code (e.g., to `server.py` or `__main__.py`) with no orphans. End with pytest + manual stdio test. - **Best Practices**: Async for tools; Pydantic schemas; explicit logging; linear handlers with early returns. Reuse core for indexing/tools. - **Tools/Deps**: FastMCP (stdio), pyyaml (config), watchdog (incremental), pytest. - **Validation**: After each phase, test full server with a stdio client script (e.g., send JSON-RPC via stdin, read stdout). - **Rollback Safety**: Commit per chunk. ### Phase 1: Basic Stdio Server with Core Protocol and Indexing 1. **Repo Setup and Dependencies**: Branch, dirs, pyproject.toml (fastmcp, pyyaml, watchdog, pytest, pydantic). Config template. Test: Install, parse CLI. 2. **Basic Stdio Server Initialization**: `server.py` with MCPServer (version="2025.6"); stdio run in `__main__.py`. Test: Starts, logs. 3. **Handshake Handler**: `@on_handshake` for version check. Test: Mock success/reject. 4. **Resource Discovery**: `@list_resources` returning empty list. Test: JSON structure. 5. **Logging Basics**: Console logging with trace IDs. Test: Logs on init. 6. **Shutdown Handler**: `@on_shutdown` for cleanup. Test: Graceful exit. 7. **Analyzer and Initial Indexing**: `analyzer.py` with MCPAnalyzer (config, core reuse, analyze()). Wire to `@on_startup`. Test: Indexes sample dir. 8. **Stub Tool (Ping) with Schemas and Errors**: Pydantic models; `@register_tool("ping")`. Update discovery. Test: Invoke via mock. ### Phase 2: Core Tools and Incremental Features 1. **Semantic Search Tool**: Pydantic; handler queries store. Wire/register. Test: Results JSON. 2. **Get Call Graph Tool**: Export graph/Mermaid. Test: Output formats. 3. **Get Function Metadata Tool**: Fetch node data. Test: JSON metadata. 4. **Query Entry Points Tool**: List entry points. Test: List JSON. 5. **Generate Mermaid Graph Tool**: Mermaid export. Test: Graph string. 6. **Simple Context Exchange**: `@update_context`, `@get_context` for session state. Test: Persist/retrieve. 7. **Incremental Indexing with Watcher**: Watchdog in startup; queue updates. Test: File change reflects. 8. **Dynamic Resource List**: Include all tools in discovery. Test: Lists 5 tools. ### Phase 3: Basic Testing and Polish 1. **Error Handling Refinements**: Typed errors with hints. Test: All cases. 2. **Unit and Integration Tests**: Cover protocol/tools. Test: >80% coverage. 3. **Basic Optimizations**: Limit outputs for efficiency. Test: Token mock. 4. **Documentation and Release Prep**: README, schemas. Test: Examples run. ## Iterative Chunk Breakdown Broke blueprint into 15 focused chunks for stdio/minimal viable server. Each: 1 file or small update, quick to test locally. - **Chunk 1: Repo and Deps** - **Chunk 2: Basic Stdio Server** - **Chunk 3: Handshake** - **Chunk 4: Resource Discovery** - **Chunk 5: Logging** - **Chunk 6: Shutdown** - **Chunk 7: Analyzer and Indexing** - **Chunk 8: Ping Tool** - **Chunk 9: Semantic Search Tool** - **Chunk 10: Get Call Graph Tool** - **Chunk 11: Get Function Metadata Tool** - **Chunk 12: Query Entry Points Tool** - **Chunk 13: Generate Mermaid Tool** - **Chunk 14: Context Exchange and Incremental Watcher** - **Chunk 15: Full Testing, Polish, Docs** ## Review and Iteration for Right-Sizing Initial breakdown had 15 chunks from phases. Iteration 1: Merged logging/shutdown (small) into fewer, but split analyzer (core reuse complex). Iteration 2: Kept 15—each 50-200 LOC, 2-4 tests, 30-60 min implement/test. Small: TDD per chunk (failing tests first). Big: Delivers testable progress (e.g., Chunk 8: First tool over stdio). Safety: Local stdio tests immediate; no HTTP/auth distractions. Progression: Protocol → Indexing → Tools (one-by-one) → Features. No jumps: Tools use existing analyzer from Chunk 7. # Series of Prompts for Code-Generation LLM ## Prompt for Chunk 1: Repo Setup and Dependencies ``` You are a senior Python developer building the CodeFlowGraph MCP Server in a new Git branch 'mcp-server-v1' from the existing codebase (core/ast_extractor.py, core/call_graph_builder.py, core/vector_store.py, cli/code_flow_graph.py). Focus on stdio transport for local testing; no auth or HTTP. Follow TDD: explicit, linear code. Step 1: Write failing test in tests/mcp_server/test_setup.py: Mock pyproject.toml, assert deps include fastmcp, pyyaml, watchdog, pytest, pydantic. Step 2: Implement: - Dirs: code_flow_graph/mcp_server/__init__.py (empty), mcp_server/__main__.py (if __name__ == "__main__": import argparse; parser = argparse.ArgumentParser(); parser.add_argument("--config", default="mcp_server/config/default.yaml"); args = parser.parse_args(); print(f"Config: {args.config}"); exit(0)). - pyproject.toml update: [build-system]..., [project.dependencies] add "fastmcp", "pyyaml", "watchdog>=2.0", "pytest", "pydantic". [project.scripts] add "code_flow_graph.mcp_server = 'code_flow_graph.mcp_server.__main__:main'". - mcp_server/config/default.yaml: watch_directories: ["."], ignored_patterns: [], chromadb_path: "./code_vectors_chroma", max_graph_depth: 3. Step 3: Refactor: Use yaml.safe_load in __main__.py stub (import yaml; with open(args.config) as f: config = yaml.safe_load(f); print(config)). Step 4: Integrate: Ensure pip install -e . works. Step 5: Tests: pytest tests/mcp_server/test_setup.py (mock yaml load, assert config dict). Smoke test: git checkout -b mcp-server-v1; pip install -e .; python -m code_flow_graph.mcp_server (prints config); touch test_file.py with def hello(): pass for later. Output: New files (__init__.py, __main__.py, default.yaml), pyproject.toml snippet, test_setup.py. No changes to core. ``` ## Prompt for Chunk 2: Basic Stdio Server ``` Building on Chunk 1 (__main__.py parses --config, loads yaml to config dict). Existing core untouched. TDD: In tests/mcp_server/test_server.py: Mock FastMCP; assert MCPServer(name="CodeFlowGraphMCP", version="2025.6") inits; assert run_stdio() called. Implement in mcp_server/server.py: - from fastmcp import MCPServer; server = MCPServer(name="CodeFlowGraphMCP", version="2025.6"). - In __main__.py: from mcp_server.server import server; import logging; logging.basicConfig(level=logging.INFO); server.run_stdio(); (handle config later). Refactor: Add try-except around run for clean exit. Integrate: Import in __main__.py; run on start. Tests: pytest (init, stdio run mock—no actual pipe). Smoke test: python -m code_flow_graph.mcp_server (starts, logs "Server running on stdio"; Ctrl+C exits). Output: server.py, updated __main__.py, test_server.py additions. Wire fully; stdio focus. ``` ## Prompt for Chunk 3: Handshake Handler ``` Building on Chunk 2 (server.py with MCPServer; __main__.py runs stdio). Config dict available. TDD: In test_server.py: Mock params={"version": "2025.6"}; assert on_handshake returns {"version": "2025.6", "capabilities": ["resourceDiscovery"]}; incompatible "2024.1" raises ProtocolError. Implement in server.py: - @server.on_handshake async def on_handshake(params: dict) -> dict: - if params.get("version") != "2025.6": raise server.ProtocolError("Incompatible version"). - import uuid; trace_id = str(uuid.uuid4()); logging.info(f"Handshake {trace_id}"). - return {"version": "2025.6", "capabilities": ["resourceDiscovery", "state"]}. # Core only. Refactor: Early return on version mismatch. Integrate: Decorator auto-registers. Tests: pytest (success/reject; mock async, log capture). Smoke test: Create client.py: import sys, json; json.dump({"jsonrpc": "2.0", "method": "mcp.handshake", "params": {"version": "2025.6"}}, sys.stdout); Run subprocess: p = subprocess.Popen(["python", "-m code_flow_graph.mcp_server"], stdin=PIPE, stdout=PIPE); p.communicate(input=handshake_bytes); assert response in stdout. Output: Updated server.py, test additions, client.py for smoke. Builds on prior. ``` ## Prompt for Chunk 4: Resource Discovery ``` Building on Chunk 3 (on_handshake in server.py). Stdio via __main__.py. TDD: test_list_resources: Mock call; assert returns [] (list of dicts: name, type="tool", description, permissions=["read"], schema={"input":{}, "output":{}}). Implement in server.py: - @server.list_resources async def list_resources() -> list[dict]: return [] # Empty for now; log trace_id from handshake if available. Refactor: Prepare stub resources = [] for later. Integrate: Auto via decorator. Tests: pytest (empty return; async mock). Smoke test: Update client.py to send {"method": "listResources", "params": {}} after handshake; assert [] in response via subprocess. Output: Updated server.py/tests, client.py update. Linear, explicit. ``` ## Prompt for Chunk 5: Logging ``` Building on Chunk 4 (list_resources). Focus console logs for stdio. TDD: test_logging: Assert log messages on init/handshake with trace_id format. Implement in server.py: - In MCPServer init: Add handler logging.getLogger("mcp").addHandler(logging.StreamHandler()). - In handlers: Use logging.info(f"[{trace_id}] Event") where trace_id from context or uuid. Refactor: Global logger in server.py. Integrate: Logs in all prior handlers (e.g., handshake). Tests: pytest (caplog fixture; assert "Handshake [uuid]" in logs). Smoke test: Client.py run full (handshake + list); assert logs in stderr during subprocess. Output: Updated server.py, test additions. No new deps. ``` ## Prompt for Chunk 6: Shutdown Handler ``` Building on Chunk 5 (logging in handlers). Stdio graceful exit. TDD: test_shutdown: Mock on_shutdown; assert cleanup called (e.g., log "Shutting down"). Implement in server.py: - @server.on_shutdown async def on_shutdown(): logging.info("Server shutdown"); # Stub cleanup. Refactor: Await any pending if added later. Integrate: Auto; test via signal. Tests: pytest (mock signal; assert log). Smoke test: python -m ...; send SIGTERM (or Ctrl+C); assert "Shutting down" log. Output: Updated server.py/tests. Simple, wired. ``` ## Prompt for Chunk 7: Analyzer and Initial Indexing ``` Building on Chunk 6 (shutdown). Reuse core: from core.ast_extractor import PythonASTExtractor, etc. Config dict in __main__.py. TDD: New tests/mcp_server/test_analyzer.py: Test init (config dict → extractor/builder/store); analyze() (mock extract_from_directory returns [FunctionElement], build adds 1 node, populate no error). Implement in mcp_server/analyzer.py: - from pathlib import Path; from core... import ...; class MCPAnalyzer: - def __init__(self, config: dict): self.config = config; root = Path(config['watch_directories'][0]); self.extractor = PythonASTExtractor(); self.extractor.project_root = root; self.builder = CallGraphBuilder(); self.builder.project_root = root; self.vector_store = CodeVectorStore(persist_directory=config['chromadb_path']) if Path(config['chromadb_path']).exists() else None. - async def analyze(self): import asyncio; elements = await asyncio.to_thread(self.extractor.extract_from_directory, root); self.builder.build_from_elements(elements); if self.vector_store: await asyncio.to_thread(self._populate_vector_store) # Adapt from cli: self._populate... using builder.functions/edges. - def _populate_vector_store(self): for node in self.builder.functions.values(): with open(node.file_path) as f: source = f.read(); self.vector_store.add_function_node(node, source); for edge in self.builder.edges: self.vector_store.add_edge(edge). - In server.py: @server.on_startup async def startup(): self.analyzer = MCPAnalyzer(config); await self.analyzer.analyze(); logging.info(f"Indexed {len(self.analyzer.builder.functions)} functions"). Refactor: Handle store None (log warning, skip populate). Integrate: Import in server.py; pass config from __main__.py (load once, pass to startup). Tests: pytest test_analyzer (mocks for core extract/build/add; assert len(functions)==1); test_server startup (mock analyze, assert log). Smoke test: Place test_file.py (def hello(): pass) in cwd; run server; assert "Indexed 1" log; client.py irrelevant here. Output: analyzer.py, updated server.py/__main__.py, test_analyzer.py. Core reuse explicit. ``` ## Prompt for Chunk 8: Ping Tool ``` Building on Chunk 7 (analyzer in startup; config passed). Stdio testing. TDD: In test_server.py: Test register_tool("ping"); input PingRequest(message="hi") → PingResponse(status="ok", echoed="hi"); missing message → InvalidParamsError(code=4001, data={"hint": "Message required"}). Implement in mcp_server/tools.py: - from pydantic import BaseModel, validator; class PingRequest(BaseModel): message: str; @validator('message') def check_message(cls, v): if not v: raise ValueError("Message required"); return v. - class PingResponse(BaseModel): status: str; echoed: str. - from fastmcp import register_tool; @register_tool("ping", input_model=PingRequest, output_model=PingResponse) async def ping_tool(req: PingRequest) -> PingResponse: return PingResponse(status="ok", echoed=req.message). - In server.py: from mcp_server.tools import *; # Auto-register on import. In list_resources: return [{"name": "ping", "type": "tool", "description": "Echo message", "permissions": ["read"], "schema": {"input": PingRequest.model_json_schema(), "output": PingResponse.model_json_schema()}}]. Refactor: Pass analyzer if needed later (stub). Integrate: Import tools in server.py after startup. Tests: pytest (tool handler mock; validate schemas; call via mock server.call_tool). Smoke test: Update client.py: After handshake/list, send {"method": "ping", "params": {"message": "test"}}; assert response {"result": {"status": "ok", "echoed": "test"}} via subprocess stdout. Output: tools.py, updated server.py/tests/client.py. First testable tool over stdio. ``` ## Prompt for Chunk 9: Semantic Search Tool ``` Building on Chunk 8 (tools.py with ping; server.py imports tools; analyzer available as server.analyzer). Use vector_store.query_functions. TDD: In test_server.py: Mock analyzer.vector_store.query_functions("test", 1) → [{"metadata": {...}}]; assert SearchResponse(results=[dict]) returned. Implement in tools.py: - class SemanticSearchRequest(BaseModel): query: str; n_results: int = 5; filters: dict = {}. - class SearchResponse(BaseModel): results: list[dict]. - @register_tool("semantic_search", input_model=SemanticSearchRequest, output_model=SearchResponse) async def semantic_search(req: SemanticSearchRequest, analyzer: MCPAnalyzer) -> SearchResponse: # Pass via closure or server state. - if not analyzer.vector_store: raise server.ServerError("Vector store unavailable"). - results = analyzer.vector_store.query_functions(req.query, req.n_results, req.filters); return SearchResponse(results=results). - Update server.py list_resources: Add "semantic_search" entry with schemas (similar to ping). Refactor: Handle None store early. Integrate: In server.py, after startup: tools.analyzer = self.analyzer (or pass in handlers). Tests: pytest (handler with mock analyzer/store; assert results len; error if no store). Smoke test: Client.py call "semantic_search" {"params": {"query": "test"}}; assert JSON results (use sample indexed). Output: Updated tools.py/server.py/tests/client.py. Builds on ping; test with indexed data. ``` ## Prompt for Chunk 10: Get Call Graph Tool ``` Building on Chunk 9 (semantic_search in tools.py; analyzer passed). Use builder.export_graph. TDD: Mock builder.export_graph() → {"functions": {...}}; assert GraphResponse(graph=dict or str). Implement in tools.py: - class CallGraphRequest(BaseModel): fqns: list[str] = []; depth: int = 1; format: str = "json" # "json" or "mermaid". - class GraphResponse(BaseModel): graph: dict | str. - @register_tool("get_call_graph", input_model=CallGraphRequest, output_model=GraphResponse) async def get_call_graph(req: CallGraphRequest, analyzer: MCPAnalyzer) -> GraphResponse: - graph = analyzer.builder.export_graph(format=req.format if req.format == "mermaid" else "json"); return GraphResponse(graph=graph). - server.py list_resources: Add "get_call_graph" with schema. Integrate: Same as prior (analyzer passed). Tests: pytest (mock export; json/mermaid formats; empty fqns full graph). Smoke test: Client.py call "get_call_graph" {}; assert graph in response (dict for json). Output: Updated tools.py/server.py/tests/client.py. Incremental tool addition. ``` (Continuing pattern for remaining chunks: Each adds one tool or feature, TDD, stdio smoke via client.py subprocess, building on analyzer/tools from prior. For Chunk 14: Combine context + watcher simply. Chunk 15: Wrap with tests/docs, no deployment.) ## Prompt for Chunk 11: Get Function Metadata Tool ``` Building on Chunk 10 (get_call_graph). Use builder.functions[fqn]. TDD: Mock functions[fqn] → FunctionNode; assert MetadataResponse with keys (name, complexity, etc.). Implement in tools.py: - class MetadataRequest(BaseModel): fqn: str. - class MetadataResponse(BaseModel): **FunctionNode fields (e.g., name: str, complexity: Optional[int], ...). - @register_tool("get_function_metadata", input_model=MetadataRequest, output_model=MetadataResponse) async def get_function_metadata(req: MetadataRequest, analyzer: MCPAnalyzer) -> MetadataResponse: - node = analyzer.builder.functions.get(req.fqn); if not node: raise server.InvalidParamsError("FQN not found"); return MetadataResponse(**{k: v for k,v in vars(node).items() if not k.startswith('_')}). - server.py: Add to list_resources. Integrate: Analyzer passed. Tests: pytest (valid fqn → full metadata; invalid → error). Smoke test: Client.py call with known fqn from index; assert response fields. Output: Updated tools.py/server.py/tests/client.py. ``` ## Prompt for Chunk 12: Query Entry Points Tool ``` Building on Chunk 11 (metadata tool). Use builder.get_entry_points(). TDD: Mock get_entry_points() → [FunctionNode]; assert EntryPointsResponse(entry_points=list[dict]). Implement in tools.py: - class EntryPointsResponse(BaseModel): entry_points: list[dict]. - @register_tool("query_entry_points", output_model=EntryPointsResponse) async def query_entry_points(analyzer: MCPAnalyzer) -> EntryPointsResponse: # No input model. - eps = analyzer.builder.get_entry_points(); return EntryPointsResponse(entry_points=[vars(ep) for ep in eps]). - server.py: Add to list_resources (no input schema). Integrate: Same. Tests: pytest (mock eps; assert list len). Smoke test: Client.py call "query_entry_points" {}; assert entry_points in response. Output: Updated tools.py/server.py/tests/client.py. ``` ## Prompt for Chunk 13: Generate Mermaid Graph Tool ``` Building on Chunk 12 (entry points). Use builder.export_mermaid_graph. TDD: Mock export_mermaid_graph() → "graph TD\n..."; assert MermaidResponse(graph=str). Implement in tools.py: - class MermaidRequest(BaseModel): fqns: list[str] = []; llm_optimized: bool = False. - class MermaidResponse(BaseModel): graph: str. - @register_tool("generate_mermaid_graph", input_model=MermaidRequest, output_model=MermaidResponse) async def generate_mermaid_graph(req: MermaidRequest, analyzer: MCPAnalyzer) -> MermaidResponse: - graph = analyzer.builder.export_mermaid_graph(highlight_fqns=req.fqns, llm_optimized=req.llm_optimized); return MermaidResponse(graph=graph). - server.py: Add to list_resources. Integrate: Analyzer. Tests: pytest (mock export; optimized flag passed). Smoke test: Client.py call {}; assert "graph TD" in response. Output: Updated tools.py/server.py/tests/client.py. ``` ## Prompt for Chunk 14: Context Exchange and Incremental Watcher ``` Building on Chunk 13 (mermaid tool; all 5 tools registered). Add simple context; watcher for incremental. TDD: test_context: update_context({"key": "val"}) → merged state; get_context() → dict. test_watcher: Mock event → analyze file, update store. Implement in server.py: - self.context = {} # Per-session simple dict. - @server.update_context async def update_context(params: dict): self.context.update(params); return {"version": len(self.context)}. - @server.get_context async def get_context(): return self.context. - list_resources: Add "update_context", "get_context" as methods (type="method"). For watcher in analyzer.py: - import watchdog.observers; from watchdog.events import FileSystemEventHandler. - class WatcherHandler(FileSystemEventHandler): def on_modified(self, event): if event.src_path.endswith('.py'): asyncio.create_task(self.analyzer._incremental_update(event.src_path)). - In startup: observer = watchdog.observers.Observer(); observer.schedule(WatcherHandler(analyzer=self.analyzer), config['watch_directories'][0], recursive=True); observer.start(); self.observer = observer. - In analyzer.py: async def _incremental_update(self, file_path: str): elements = await asyncio.to_thread(self.extractor.extract_from_file, Path(file_path)); # Update builder/store incrementally (add new, skip same hash); if delete, remove by fqn. Refactor: Context versioned with len; watcher debounce stub (sleep 1s). Integrate: Auto for methods; observer in startup, stop in shutdown. Tests: pytest test_server (context merge/get); test_analyzer (mock event, assert update called). Smoke test: Client.py update_context {"test": "val"}; get_context assert; touch test_file.py; re-call semantic_search, assert reflects. Output: Updated server.py/analyzer.py/tests/client.py. Core features complete. ``` ## Prompt for Chunk 15: Full Testing, Polish, Docs ``` Building on Chunk 14 (context/watcher; 5 tools + methods). Finalize for local stdio testing. TDD/Polish: In test_server.py: Add error tests (InvalidParams for bad tool params, ServerError for no store). Limit outputs (e.g., in tools, truncate results[:10]). Mock tiktoken for <1000 char efficiency. Implement refinements: - In tools: For search/graph, if len(results)>10: results = results[:10]; log warning. - Errors: Consistent code=4001 for params, 5001 for server; data={"hint": "..."}. Full tests: Ensure >80% coverage (run pytest --cov=mcp_server). Docs: mcp_server/README.md: "Setup: pip install -e .; Run: python -m ...; Test: Use client.py for handshake/tools. Config: Edit default.yaml for watch/chroma. Tools: semantic_search(query), etc. with JSON examples." Refactor: Clean imports/logs. Integrate: All wired; add --help in __main__.py. Tests: pytest all (protocol, tools, context, watcher mock, errors). Smoke test: Full client.py script: Handshake → list_resources (5 tools) → semantic_search → get_call_graph → context update/get → file touch → re-search. Assert all succeed. Output: Updated tests (additions), README.md, client.py full. Project testable over stdio; ready for local use. ```

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mrorigo/code-flow-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server