Skip to main content
Glama

MCP Brain Service

by jomapps
database.py•3.09 kB
"""Database connection and configuration.""" import logging import os from typing import AsyncContextManager, Optional from neo4j import AsyncGraphDatabase, AsyncDriver logger = logging.getLogger(__name__) class Neo4jConnection: """Neo4j database connection manager.""" def __init__( self, uri: str = "neo4j://localhost:7687", user: str = "neo4j", password: str = "password" ): """Initialize Neo4j connection. Args: uri: Neo4j connection URI user: Database username password: Database password """ self.uri = uri self.user = user self.password = password self._driver: Optional[AsyncDriver] = None async def connect(self) -> AsyncDriver: """Connect to Neo4j database. Returns: Async Neo4j driver instance """ if self._driver is None: try: self._driver = AsyncGraphDatabase.driver( self.uri, auth=(self.user, self.password) ) # Test connection async with self._driver.session() as session: await session.run("RETURN 1") logger.info(f"Connected to Neo4j at {self.uri}") except Exception as e: logger.error(f"Failed to connect to Neo4j: {e}") raise return self._driver async def close(self) -> None: """Close database connection.""" if self._driver: await self._driver.close() self._driver = None logger.info("Neo4j connection closed") def session(self) -> AsyncContextManager: """Get database session context manager.""" if self._driver is None: raise RuntimeError("Not connected to database. Call connect() first.") return self._driver.session() def get_neo4j_config() -> dict: """Get Neo4j configuration from environment variables. Returns: Neo4j configuration dictionary """ return { "uri": os.getenv("NEO4J_URI", "neo4j://localhost:7687"), "user": os.getenv("NEO4J_USER", "neo4j"), "password": os.getenv("NEO4J_PASSWORD", "password") } # Global connection instance _neo4j_connection: Optional[Neo4jConnection] = None async def get_neo4j_connection() -> Neo4jConnection: """Get or create Neo4j connection instance. Returns: Neo4j connection instance """ global _neo4j_connection if _neo4j_connection is None: config = get_neo4j_config() _neo4j_connection = Neo4jConnection(**config) await _neo4j_connection.connect() return _neo4j_connection async def close_neo4j_connection() -> None: """Close global Neo4j connection.""" global _neo4j_connection if _neo4j_connection: await _neo4j_connection.close() _neo4j_connection = None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jomapps/mcp-brain-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server