MCP Code Expert System
by tomsiwik
Verified
"""Knowledge Graph implementation for MCP Code Expert System.
This module provides a simple graph database for storing code reviews,
code snippets, and relationships between them.
"""
import json
import os
from typing import Dict, List, Optional, Any, Set, Tuple, Union
import datetime
from uuid import uuid4
from pathlib import Path
class Entity:
def __init__(
self,
id: str,
name: str,
entity_type: str,
properties: Dict[str, Any] = None,
):
self.id = id
self.name = name
self.type = entity_type
self.properties = properties or {}
self.relationships: Dict[str, List[str]] = {} # relationship_type -> [entity_id]
def add_relationship(self, relationship_type: str, target_entity_id: str) -> None:
"""Add a relationship to another entity"""
if relationship_type not in self.relationships:
self.relationships[relationship_type] = []
if target_entity_id not in self.relationships[relationship_type]:
self.relationships[relationship_type].append(target_entity_id)
def to_dict(self) -> Dict[str, Any]:
"""Convert entity to dictionary representation"""
return {
"id": self.id,
"name": self.name,
"type": self.type,
"properties": self.properties,
"relationships": self.relationships
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Entity':
"""Create entity from dictionary representation"""
entity = cls(
id=data["id"],
name=data["name"],
entity_type=data["type"],
properties=data["properties"]
)
entity.relationships = data.get("relationships", {})
return entity
class KnowledgeGraphManager:
def __init__(self, storage_path: str = None):
self.entities: Dict[str, Entity] = {}
self.storage_path = storage_path
if storage_path and os.path.exists(storage_path):
self.load()
def add_entity(
self,
name: str,
entity_type: str,
properties: Dict[str, Any] = None
) -> Entity:
"""Add an entity to the knowledge graph"""
entity_id = f"{uuid4()}"
entity = Entity(id=entity_id, name=name, entity_type=entity_type, properties=properties)
self.entities[entity_id] = entity
if self.storage_path:
self.save()
return entity
def get_entity(self, entity_id: str) -> Optional[Entity]:
"""Get an entity from the knowledge graph by ID"""
return self.entities.get(entity_id)
def get_entities_by_type(self, entity_type: str) -> List[Entity]:
"""Get all entities of a specific type"""
return [entity for entity in self.entities.values() if entity.type == entity_type]
def add_relationship(
self,
source_entity_id: str,
relationship_type: str,
target_entity_id: str,
bidirectional: bool = False,
inverse_relationship_type: str = None
) -> None:
"""Add a relationship between two entities"""
source_entity = self.get_entity(source_entity_id)
target_entity = self.get_entity(target_entity_id)
if not source_entity or not target_entity:
raise ValueError("Both source and target entities must exist")
source_entity.add_relationship(relationship_type, target_entity_id)
if bidirectional:
inverse_type = inverse_relationship_type or f"inverse_{relationship_type}"
target_entity.add_relationship(inverse_type, source_entity_id)
if self.storage_path:
self.save()
def search_entities(self, query: str) -> List[Entity]:
"""Search for entities in the knowledge graph"""
results = []
query = query.lower()
for entity in self.entities.values():
# Search in name and type
if query in entity.name.lower() or query in entity.type.lower():
results.append(entity)
continue
# Search in properties
found = False
for key, value in entity.properties.items():
if isinstance(value, str) and query in value.lower():
results.append(entity)
found = True
break
elif isinstance(value, (int, float)) and query in str(value).lower():
results.append(entity)
found = True
break
if found:
continue
# Search in relationships
for rel_type, target_ids in entity.relationships.items():
if query in rel_type.lower():
results.append(entity)
break
return results
def save(self) -> None:
"""Save the knowledge graph to storage"""
os.makedirs(os.path.dirname(self.storage_path), exist_ok=True)
data = {
"entities": {entity_id: entity.to_dict() for entity_id, entity in self.entities.items()},
"version": 1,
"updated_at": datetime.datetime.now().isoformat()
}
with open(self.storage_path, 'w') as f:
json.dump(data, f, indent=2)
def load(self) -> None:
"""Load the knowledge graph from storage"""
if not os.path.exists(self.storage_path):
return
with open(self.storage_path, 'r') as f:
data = json.load(f)
self.entities = {}
for entity_id, entity_data in data.get("entities", {}).items():
self.entities[entity_id] = Entity.from_dict(entity_data)
def clear(self) -> None:
"""Clear all entities from the knowledge graph"""
self.entities = {}
if self.storage_path and os.path.exists(self.storage_path):
os.remove(self.storage_path)
def get_related_entities(self, entity_id: str, relationship_type: Optional[str] = None) -> List[Entity]:
"""Get entities related to the given entity"""
entity = self.get_entity(entity_id)
if not entity:
return []
related_entity_ids: Set[str] = set()
if relationship_type:
# Get entities with specific relationship type
related_entity_ids.update(entity.relationships.get(relationship_type, []))
else:
# Get all related entities
for rel_ids in entity.relationships.values():
related_entity_ids.update(rel_ids)
return [self.get_entity(related_id) for related_id in related_entity_ids if related_id in self.entities]
class KnowledgeGraph:
"""Simple in-memory graph database with JSON persistence."""
def __init__(self, file_path: str = "data/knowledge_graph.json"):
"""Initialize the knowledge graph.
Args:
file_path: Path to the JSON file for persistence
"""
self.file_path = file_path
self.nodes: Dict[str, Dict[str, Any]] = {}
self.edges: List[Dict[str, Any]] = []
self.load()
def load(self) -> None:
"""Load the knowledge graph from the JSON file."""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.file_path), exist_ok=True)
# Load the graph if the file exists
if os.path.exists(self.file_path):
try:
with open(self.file_path, 'r') as f:
data = json.load(f)
self.nodes = data.get('nodes', {})
self.edges = data.get('edges', [])
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading knowledge graph: {e}")
# Initialize with empty graph on error
self.nodes = {}
self.edges = []
def save(self) -> None:
"""Save the knowledge graph to the JSON file."""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.file_path), exist_ok=True)
# Save to file
with open(self.file_path, 'w') as f:
json.dump({
'nodes': self.nodes,
'edges': self.edges
}, f, indent=2)
except IOError as e:
print(f"Error saving knowledge graph: {e}")
def add_node(self, name: str, node_type: str, properties: Dict[str, Any]) -> str:
"""Add a node to the graph.
Args:
name: Node name (unique identifier)
node_type: Type of node (e.g., 'code', 'review')
properties: Node properties
Returns:
Node name (identifier)
"""
# Create node with metadata
self.nodes[name] = {
'type': node_type,
'created_at': datetime.now().isoformat(),
'properties': properties
}
self.save()
return name
def add_edge(self, source: str, target: str, edge_type: str, properties: Dict[str, Any] = None) -> None:
"""Add an edge between two nodes.
Args:
source: Source node name
target: Target node name
edge_type: Type of edge (e.g., 'reviewed_by')
properties: Edge properties
"""
if properties is None:
properties = {}
# Check if nodes exist
if source not in self.nodes or target not in self.nodes:
raise ValueError(f"Cannot create edge between non-existent nodes: {source} -> {target}")
# Create edge with metadata
self.edges.append({
'source': source,
'target': target,
'type': edge_type,
'created_at': datetime.now().isoformat(),
'properties': properties
})
self.save()
def get_node(self, name: str) -> Optional[Dict[str, Any]]:
"""Get a node by name.
Args:
name: Node name
Returns:
Node data or None if not found
"""
return self.nodes.get(name)
def get_nodes_by_type(self, node_type: str) -> List[Dict[str, Any]]:
"""Get all nodes of a specific type.
Args:
node_type: Type of nodes to get
Returns:
List of nodes
"""
return [
{'name': name, **data}
for name, data in self.nodes.items()
if data.get('type') == node_type
]
def search_nodes(self, query: str) -> List[Dict[str, Any]]:
"""Search for nodes by text in their properties.
Args:
query: Text to search for
Returns:
List of matching nodes
"""
results = []
query = query.lower()
for name, data in self.nodes.items():
# Search in name
if query in name.lower():
results.append({'name': name, **data})
continue
# Search in properties
props = data.get('properties', {})
for prop_value in props.values():
if isinstance(prop_value, str) and query in prop_value.lower():
results.append({'name': name, **data})
break
return results
def get_related_nodes(self, node_name: str, edge_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get nodes related to a specific node.
Args:
node_name: Name of the node
edge_type: Optional filter for edge type
Returns:
List of related nodes
"""
related = []
# Find edges where the node is source or target
for edge in self.edges:
if edge['source'] == node_name:
if edge_type is None or edge['type'] == edge_type:
target = edge['target']
if target in self.nodes:
related.append({
'name': target,
**self.nodes[target],
'relation': {
'type': edge['type'],
'direction': 'outgoing',
'properties': edge.get('properties', {})
}
})
elif edge['target'] == node_name:
if edge_type is None or edge['type'] == edge_type:
source = edge['source']
if source in self.nodes:
related.append({
'name': source,
**self.nodes[source],
'relation': {
'type': edge['type'],
'direction': 'incoming',
'properties': edge.get('properties', {})
}
})
return related
def get_all(self) -> Dict[str, Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""Get the entire graph.
Returns:
Dictionary containing all nodes and edges
"""
# Format nodes to include their names
formatted_nodes = [
{'name': name, **data}
for name, data in self.nodes.items()
]
return {
'nodes': formatted_nodes,
'edges': self.edges
}
def clear(self) -> None:
"""Clear the graph."""
self.nodes = {}
self.edges = []
self.save()