Skip to main content
Glama

MCP Brain Service

by jomapps
neo4j_client.py6.31 kB
import os import asyncio from typing import List, Dict, Any, Optional from datetime import datetime import logging try: from neo4j import AsyncGraphDatabase NEO4J_AVAILABLE = True except ImportError: NEO4J_AVAILABLE = False logger = logging.getLogger(__name__) class Neo4jClient: def __init__(self): """Initialize Neo4j client - REQUIRES valid Neo4j connection""" self.uri = os.getenv("NEO4J_URI") self.user = os.getenv("NEO4J_USER") self.password = os.getenv("NEO4J_PASSWORD") # Validate required environment variables if not self.uri: raise ValueError( "NEO4J_URI environment variable is required. " "Neo4j is critical for Brain service operation. " "Please set NEO4J_URI (e.g., bolt://localhost:7687 or neo4j://host:7687)" ) if not self.user: raise ValueError( "NEO4J_USER environment variable is required. " "Neo4j is critical for Brain service operation." ) if not self.password: raise ValueError( "NEO4J_PASSWORD environment variable is required. " "Neo4j is critical for Brain service operation." ) # Check if Neo4j driver is available if not NEO4J_AVAILABLE: raise ImportError( "Neo4j driver is not installed. " "Neo4j is critical for Brain service operation. " "Install with: pip install neo4j" ) # Initialize driver try: logger.info(f"Connecting to Neo4j at {self.uri}") self.driver = AsyncGraphDatabase.driver( self.uri, auth=(self.user, self.password) ) logger.info("Neo4j driver initialized successfully") except Exception as e: error_msg = ( f"Failed to initialize Neo4j driver: {str(e)}. " f"Neo4j is critical for Brain service operation. " f"Please verify NEO4J_URI={self.uri}, NEO4J_USER={self.user}, " f"and NEO4J_PASSWORD are correct and Neo4j is running." ) logger.error(error_msg) raise ConnectionError(error_msg) from e async def verify_connection(self): """Verify Neo4j connection on startup""" try: async with self.driver.session() as session: result = await session.run("RETURN 1 as test") record = await result.single() if record and record["test"] == 1: logger.info("✅ Neo4j connection verified successfully") return True else: raise Exception("Invalid response from Neo4j") except Exception as e: error_msg = ( f"Neo4j connection verification failed: {str(e)}. " f"Neo4j is critical for Brain service operation. " f"Please verify Neo4j is running at {self.uri} and credentials are correct." ) logger.error(error_msg) raise ConnectionError(error_msg) from e async def create_node(self, labels: List[str], properties: Dict[str, Any]) -> str: """Create a node with given labels and properties""" try: labels_str = ":".join(labels) query = f"CREATE (n:{labels_str}) SET n = $properties RETURN n.id as id" async with self.driver.session() as session: result = await session.run(query, properties=properties) record = await result.single() node_id = record["id"] if record else properties.get("id", "unknown") logger.debug(f"Created node {node_id} with labels {labels}") return node_id except Exception as e: logger.error(f"Failed to create node: {str(e)}") raise Exception(f"Neo4j create_node failed: {str(e)}") from e async def run_query(self, query: str, parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Run a Cypher query and return results""" try: async with self.driver.session() as session: result = await session.run(query, parameters or {}) records = [] async for record in result: records.append(dict(record)) logger.debug(f"Query returned {len(records)} records") return records except Exception as e: logger.error(f"Failed to run query: {str(e)}") raise Exception(f"Neo4j query failed: {str(e)}") from e async def health_check(self) -> Dict[str, Any]: """Check Neo4j connection health""" try: async with self.driver.session() as session: result = await session.run("RETURN 1 as test") record = await result.single() if record and record["test"] == 1: return { "status": "healthy", "uri": self.uri, "timestamp": datetime.utcnow().isoformat() } else: return { "status": "unhealthy", "error": "Invalid response from Neo4j", "uri": self.uri, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "unhealthy", "error": str(e), "uri": self.uri, "timestamp": datetime.utcnow().isoformat() } async def close(self): """Close the Neo4j connection""" if self.driver: await self.driver.close() logger.info("Neo4j connection closed") # Global instance _neo4j_client = None async def get_neo4j_client() -> Neo4jClient: """Get or create Neo4j client instance""" global _neo4j_client if _neo4j_client is None: _neo4j_client = Neo4jClient() # Verify connection on first initialization await _neo4j_client.verify_connection() return _neo4j_client

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jomapps/mcp-brain-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server