Okay, acting as a senior developer, here's a detailed explanation of the NeoCoder framework and a refactoring plan designed for an AI assistant like Claude to implement.
## NeoCoder Framework Explanation
**1. Overall Goal:**
NeoCoder is designed as an extensible, asynchronous server that bridges AI assistants (communicating via MCP - Model Context Protocol, presumably using `FastMCP`) with a Neo4j knowledge graph. The core idea is that the Neo4j graph guides the AI's workflows, and the server can adapt its functionality through different "incarnations" (like plugins) without modifying the core server code. The system aims for easy extensibility, ideally allowing new functionalities (incarnations with associated tools) to be added simply by placing new Python files in a specific directory.
**2. Core Components:**
* **`Neo4jWorkflowServer` (`server.py`):**
* The main application class orchestrating the system.
* Integrates with `FastMCP` for communication with the AI assistant.
* Manages the Neo4j `AsyncDriver` connection.
* Handles the overall asynchronous initialization sequence.
* Inherits functionality from various mixins.
* **Incarnations (`mcp_neocoder/incarnations/`):**
* Represent different operational modes or functionalities (e.g., `knowledge_graph`, `coding`, `research`).
* **`BaseIncarnation` (`base_incarnation.py`):** An abstract base class defining the interface for all incarnations. It includes:
* Common properties (`name`, `description`).
* Methods for schema initialization (`initialize_schema`), providing guidance (`get_guidance_hub`), discovering tools (`list_tool_methods`), and registering tools (`register_tools`).
* Basic Neo4j helper methods (`_read_query`, `_write`).
* **Specific Incarnations (e.g., `KnowledgeGraphIncarnation`):** Concrete subclasses of `BaseIncarnation`. They:
* Define a unique `name` (string identifier).
* Provide a `description`.
* Optionally define `schema_queries` for Neo4j setup and `hub_content` for user guidance.
* Implement specific tool methods (async functions usually returning `List[types.TextContent]`).
* Crucially, they can define an `_tool_methods` list to explicitly declare which methods should be registered as tools, overriding or supplementing automatic inspection.
* **Registries:**
* **`IncarnationRegistry` (`incarnation_registry.py`):**
* Manages the discovery and loading of `BaseIncarnation` subclasses.
* Uses filesystem scanning (`discover`, `discover_dynamic_types`) and `importlib` to find and import incarnation modules.
* Keeps track of loaded incarnation *classes* (`self.incarnations`) and manages shared *instances* (`self.instances`).
* Provides a `create_template_incarnation` helper.
* **`ToolRegistry` (`tool_registry.py`):**
* Acts as a central catalog for all discovered tool *functions*.
* Stores tool functions mapped by name (`self.tools`), categorized (`self.tool_categories`), and described (`self.tool_descriptions`).
* Provides methods to populate the registry from incarnation instances (`register_class_tools`).
* Includes logic to register the collected tools with the MCP server (`register_tools_with_server`) and prevent duplicates (`_mcp_registered_tools`).
* **`PolymorphicAdapterMixin` (`polymorphic_adapter.py`):**
* A mixin added to `Neo4jWorkflowServer`.
* Manages the *currently active* incarnation (`self.current_incarnation`).
* Provides the `switch_incarnation` tool logic (via helper functions) to change the active mode. It uses the `IncarnationRegistry` to get instances.
* Its `set_incarnation` method handles activating an instance and initializing its schema. (Note: It *also* previously called `instance.register_tools`, contributing to registration complexity).
* **Supporting Modules & Mixins:**
* `ActionTemplateMixin`, `ToolProposalMixin`, `CypherSnippetMixin`: Provide specific, potentially shared functionalities (like managing predefined workflows, tool suggestions, Cypher snippets) using Neo4j.
* `event_loop_manager.py`: Attempts to manage asyncio event loop consistency, crucial for `neo4j` async driver stability.
* `generators.py`: Provides high-level tools (callable via MCP) to generate boilerplate code for new incarnations and tools, facilitating easy extension.
* `init_db.py` (Assumed): Handles initial Neo4j database setup (constraints, indexes, potentially initial data).
* **Integration:**
* `FastMCP` (Assumed): Handles the protocol communication, tool discovery (via `mcp.add_tool`), and execution requests from the AI assistant.
* `neo4j` Driver: Provides the asynchronous interface to the Neo4j database.
**3. Startup Sequence:**
1. `main()` function is executed.
2. `create_server()` sets up the event loop, Neo4j driver, and environment variables.
3. `Neo4jWorkflowServer.__init__()` initializes basic attributes, the `FastMCP` instance, and **crucially starts `_initialize_async` in a separate task**. Basic MCP handlers are registered synchronously.
4. `_initialize_async()` runs asynchronously:
* Initializes mixins.
* Checks/initializes the database (`_initialize_database` potentially calling `init_db`).
* Registers *core* tools (defined directly in `server.py` or its mixins).
* Loads incarnations (`_load_incarnations` -> `IncarnationRegistry.discover`).
* Registers **all tools from all incarnations** (`_register_all_incarnation_tools`). This is the complex step involving loops, `instance.register_tools`, and `tool_registry.register_incarnation_tools`.
* **(Needs Improvement):** Sets a signal/event indicating initialization is complete.
**4. Runtime Operation:**
1. The `FastMCP` server listens for requests from the AI assistant.
2. Incoming requests might be for core tools or incarnation-specific tools.
3. The `Neo4jWorkflowServer` (or its mixins/active incarnation) handles the request by executing the corresponding tool method.
4. Tool methods interact with Neo4j via the `AsyncDriver` (ideally using safe patterns) and return results formatted as `List[types.TextContent]`.
5. The `switch_incarnation` tool uses the `PolymorphicAdapterMixin` to change the `self.current_incarnation` instance.
---
## Refactoring Plan (for Claude)
**Goal:** Simplify the tool registration process to improve clarity and robustness, while adhering to the requirement that **all tools from all incarnations are registered at startup**. Enhance async initialization stability and standardize Neo4j error handling.
**Constraint:** All tools from all discoverable incarnations MUST be registered with the MCP server during the initial startup sequence.
**Refactoring Steps:**
**Step 1: Centralize MCP Registration via `ToolRegistry`**
* **Objective:** Make the `ToolRegistry` the single point of contact for registering tools with the MCP server (`server.mcp.add_tool`). `BaseIncarnation` will only be responsible for *identifying* its tools and adding them *to* the `ToolRegistry`.
* **File:** `mcp_neocoder/incarnations/base_incarnation.py`
* **Action:** Modify the `BaseIncarnation.register_tools` method.
* **Remove** the loop that iterates through `tool_methods` to call `server.mcp.add_tool`.
* **Remove** the management and checking of the `self.__class__._registered_tool_methods` set.
* Ensure the method still correctly identifies tool methods using `self.list_tool_methods()`.
* Ensure the method still calls `tool_registry.register_class_tools(self, self.name)` to add the identified tools and their metadata to the central registry.
* Modify the logging to reflect that it's adding tools *to the registry*, not directly to the server.
* **Conceptual Code Change (`BaseIncarnation.register_tools`):**
```python
async def register_tools(self, server):
"""Identify tool methods and register them with the central ToolRegistry."""
# Get all tool methods from this incarnation
tool_methods = self.list_tool_methods()
logger.info(f"Identified tool methods in {self.name}: {tool_methods}")
# Register these tools with the central tool registry for tracking/listing
# Ensure tool_registry is imported correctly
from ..tool_registry import registry as tool_registry
# Let register_class_tools handle adding to the registry's internal structures
# It already prevents duplicates within the registry itself if called multiple times for the same obj
tools_added_to_registry_count = tool_registry.register_class_tools(self, self.name)
# Log based on tools found and added to the registry
logger.info(f"{self.name} incarnation: {tools_added_to_registry_count} tools added to ToolRegistry")
# Return the count of tools identified/added to registry
return len(tool_methods) # Or potentially tools_added_to_registry_count
```
**Step 2: Streamline Server Startup Registration (`_register_all_incarnation_tools`)**
* **Objective:** Modify the server's startup routine to first populate the `ToolRegistry` using the (now modified) `instance.register_tools`, and *then* make a single call to register everything from the `ToolRegistry` with the MCP server.
* **File:** `mcp_neocoder/server.py`
* **Action:** Modify the `Neo4jWorkflowServer._register_all_incarnation_tools` method.
* Keep the main loop iterating through `self.incarnation_registry.items()`.
* Inside the loop, keep the logic to get or create the `instance`.
* Inside the `try` block for each instance:
* **Only call `await instance.register_tools(self)`**. This now solely populates the `ToolRegistry`. Log the count returned (number of tools identified for the registry).
* **Remove** the line `tool_registry_count = tool_registry.register_incarnation_tools(instance, self)`.
* **Remove** the fallback logic that uses `instance.list_tool_methods()` and calls `self.mcp.add_tool` directly – registration now happens centrally *after* the loop.
* **After** the loop finishes:
* Add logging indicating the total number of unique tools collected in `tool_registry.tools`.
* Make a single call: `final_mcp_registered_count = tool_registry.register_tools_with_server(self)`. This method handles iterating through the registry's tools and calling `self.mcp.add_tool` using the registry's own duplicate prevention (`_mcp_registered_tools`).
* Log `final_mcp_registered_count`.
* Return `final_mcp_registered_count`.
* **Conceptual Code Change (`_register_all_incarnation_tools`):**
```python
async def _register_all_incarnation_tools(self):
"""Populate ToolRegistry from all incarnations, then register all with MCP."""
logger.info("Registering tools from all incarnations by populating ToolRegistry...")
if not self.incarnation_registry:
logger.warning("No incarnations registered, skipping tool registration")
return 0
total_tools_identified = 0
# Import the global tool registry
from .tool_registry import registry as tool_registry
for incarnation_type, incarnation_class in list(self.incarnation_registry.items()):
try:
logger.info(f"Processing incarnation for ToolRegistry: {incarnation_type}")
# ... (get or create instance) ...
if instance:
# Populate the ToolRegistry via the instance's register_tools method
# This method no longer calls server.mcp.add_tool directly
count = await instance.register_tools(self)
total_tools_identified += count # Count may vary depending on implementation
logger.info(f"Added {count} tools from {incarnation_type} to ToolRegistry")
else:
logger.error(f"Failed to get instance for {incarnation_type}")
except Exception as e:
logger.error(f"Error processing incarnation {incarnation_type} for ToolRegistry: {e}")
# ... (traceback logging) ...
# After iterating through all incarnations and populating the registry:
logger.info(f"ToolRegistry population complete. Found {len(tool_registry.tools)} unique tools.")
logger.info("Registering all collected tools with MCP server...")
# Perform the actual registration with the MCP server using the central registry method
final_mcp_registered_count = tool_registry.register_tools_with_server(self)
logger.info(f"Successfully registered {final_mcp_registered_count} unique tools with MCP server.")
return final_mcp_registered_count
```
**Step 3: Implement Robust Async Startup Synchronization**
* **Objective:** Prevent the server from processing tool requests before the entire initialization (including tool registration) is complete.
* **File:** `mcp_neocoder/server.py`
* **Actions:**
1. In `Neo4jWorkflowServer.__init__`: Add `self.initialized_event = asyncio.Event()`.
2. In `Neo4jWorkflowServer._initialize_async`:
* Inside the `try` block, *after* the call to `_register_all_incarnation_tools` completes successfully, add the line: `self.initialized_event.set()`.
* Consider if you want `self.initialized_event.set()` inside the `except` block as well (allowing the server to run partially) or *only* on full success. Assuming full success is required: only set it in the `try` block.
* **File:** `mcp_neocoder/server.py` (or wherever `create_server` is called if external)
* **Actions:**
1. Modify `create_server` (or the calling code):
* Remove `asyncio.sleep(0.5)`.
* After the `server = Neo4jWorkflowServer(...)` line (within the `async_server_setup` function):
* Add `logger.info("Waiting for server initialization to complete...")`
* Add `await asyncio.wait_for(server.initialized_event.wait(), timeout=60.0)` (Adjust timeout as needed).
* Add a `try...except asyncio.TimeoutError:` block around the `wait_for` call to handle cases where initialization takes too long. Log an error and potentially raise an exception if it times out.
* Add `logger.info("Server initialization complete.")` after the wait succeeds.
**Step 4: Standardize Neo4j Error Handling (Adopt Safe Patterns)**
* **Objective:** Prevent Neo4j transaction scope errors consistently and remove the need for error suppression.
* **Files:** All files performing Neo4j queries (incarnations, mixins like `ActionTemplateMixin`, `ToolProposalMixin`, etc.).
* **Actions:**
1. Review all methods that execute Neo4j queries using `session.execute_read` or `session.execute_write`.
2. Identify places where results are processed *after* the `await session.execute_read(...)` or `await session.execute_write(...)` call. These are prone to scope errors.
3. Refactor these methods to use the pattern seen in `KnowledgeGraphIncarnation`: define a nested `async def execute_and_process_in_tx(tx): ...` function that performs the `tx.run()`, result processing (e.g., `await result.values()`, converting data, maybe even `json.dumps`), and returns the final, processed data. Pass this nested function to `session.execute_read` or `session.execute_write`.
4. Update the `_read_query` and `_write` methods in `BaseIncarnation` (and ensure mixins use them or similar safe patterns) to follow this principle of processing results *within* the transaction scope. The `_read_query` in `BaseIncarnation` already uses `to_eager_result()`, which is one way to achieve safety. Ensure this is used consistently or replaced by the nested function pattern if more complex processing is needed.
* **File:** `mcp_neocoder/server.py`
* **Action:**
1. **Remove** the `_register_error_suppression_handler` method entirely.
2. **Remove** the call to `self._register_error_suppression_handler()` from `_register_basic_handlers`.
**Step 5: Verification**
* **Objective:** Ensure the refactoring didn't break existing functionality and that the improvements work as expected.
* **Actions:**
1. Run the existing verification scripts (`verify_tools.py`, `verify_incarnations.py`). Update them if necessary to reflect the new registration flow (e.g., they might now primarily check the `ToolRegistry` content rather than direct MCP registration status).
2. Manually start the server and check the logs carefully:
* Confirm all incarnations are discovered.
* Confirm tools are added to the `ToolRegistry`.
* Confirm the `ToolRegistry.register_tools_with_server` logs the final count of tools registered with MCP.
* Confirm the `initialized_event` is set and the server finishes initialization cleanly.
3. Connect an AI assistant (or test client) and verify that tools from *all* discovered incarnations are available and executable immediately after startup.
4. Test Neo4j-interacting tools thoroughly to confirm that transaction scope errors are gone.
5. Test the `switch_incarnation` tool – it should still switch the active context, but tool *registration* should no longer happen during the switch.
**Final Note for Claude:** Apply these steps sequentially. After each major step (especially Step 1 & 2), test the startup sequence and tool availability. Pay close attention to log messages to understand the flow. Ensure imports are correct after moving logic between modules. The goal is a clearer separation of concerns: Incarnations know *their* tools, the `ToolRegistry` *catalogs* all tools, and the server startup sequence uses the registry to *register* all tools with MCP.