Skip to main content
Glama

MCP Brain Service

by jomapps
mcp_server.py•19.9 kB
import asyncio import logging from typing import Any, Dict, List from mcp.server import Server from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types from .services.knowledge_service import KnowledgeService from .services.batch_service import BatchService from .lib.embeddings import JinaEmbeddingService from .lib.neo4j_client import Neo4jClient from .models.knowledge import Document, WorkflowData, AgentMemory # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("mcp-brain-service") # Initialize services jina_service = JinaEmbeddingService() neo4j_client = Neo4jClient() knowledge_service = KnowledgeService(jina_service, neo4j_client) batch_service = BatchService(knowledge_service) # Create MCP server server = Server("mcp-brain-service") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available MCP tools""" return [ # Original tools types.Tool( name="create_character", description="Create a new character with embedding", inputSchema={ "type": "object", "properties": { "name": {"type": "string"}, "description": {"type": "string"}, "traits": {"type": "array", "items": {"type": "string"}}, "project_id": {"type": "string"} }, "required": ["name", "description", "project_id"] } ), types.Tool( name="find_similar_characters", description="Find characters similar to a description", inputSchema={ "type": "object", "properties": { "description": {"type": "string"}, "project_id": {"type": "string"}, "limit": {"type": "integer", "default": 5} }, "required": ["description", "project_id"] } ), # New embedding tools types.Tool( name="embed_text", description="Generate embedding for a single text", inputSchema={ "type": "object", "properties": { "text": {"type": "string"}, "project_id": {"type": "string"} }, "required": ["text", "project_id"] } ), types.Tool( name="search_by_embedding", description="Search for similar content by embedding", inputSchema={ "type": "object", "properties": { "embedding": {"type": "array", "items": {"type": "number"}}, "project_id": {"type": "string"}, "limit": {"type": "integer", "default": 10} }, "required": ["embedding", "project_id"] } ), types.Tool( name="store_document", description="Store document with automatic embedding", inputSchema={ "type": "object", "properties": { "content": {"type": "string"}, "metadata": {"type": "object"}, "project_id": {"type": "string"} }, "required": ["content", "metadata", "project_id"] } ), # Knowledge graph tools types.Tool( name="create_relationship", description="Create relationship between nodes", inputSchema={ "type": "object", "properties": { "from_id": {"type": "string"}, "to_id": {"type": "string"}, "relationship_type": {"type": "string"}, "properties": {"type": "object", "default": {}} }, "required": ["from_id", "to_id", "relationship_type"] } ), types.Tool( name="query_graph", description="Execute Cypher query on knowledge graph", inputSchema={ "type": "object", "properties": { "cypher_query": {"type": "string"}, "project_id": {"type": "string"}, "parameters": {"type": "object", "default": {}} }, "required": ["cypher_query", "project_id"] } ), types.Tool( name="get_node_neighbors", description="Get neighbors and relationships of a node", inputSchema={ "type": "object", "properties": { "node_id": {"type": "string"}, "project_id": {"type": "string"} }, "required": ["node_id", "project_id"] } ), # Batch processing tools types.Tool( name="batch_embed_texts", description="Batch embed multiple texts efficiently", inputSchema={ "type": "object", "properties": { "texts": {"type": "array", "items": {"type": "string"}}, "project_id": {"type": "string"} }, "required": ["texts", "project_id"] } ), types.Tool( name="bulk_store_documents", description="Bulk store multiple documents with embeddings", inputSchema={ "type": "object", "properties": { "documents": { "type": "array", "items": { "type": "object", "properties": { "content": {"type": "string"}, "metadata": {"type": "object"}, "document_type": {"type": "string"} }, "required": ["content", "metadata", "document_type"] } }, "project_id": {"type": "string"} }, "required": ["documents", "project_id"] } ), # Workflow and agent memory tools types.Tool( name="store_workflow_data", description="Store LangGraph workflow execution data", inputSchema={ "type": "object", "properties": { "workflow_id": {"type": "string"}, "agent_id": {"type": "string"}, "step_name": {"type": "string"}, "input_data": {"type": "object"}, "output_data": {"type": "object"}, "execution_time_ms": {"type": "number"}, "project_id": {"type": "string"} }, "required": ["workflow_id", "agent_id", "step_name", "input_data", "output_data", "execution_time_ms", "project_id"] } ), types.Tool( name="search_similar_workflows", description="Find similar workflow patterns", inputSchema={ "type": "object", "properties": { "query": {"type": "string"}, "project_id": {"type": "string"}, "limit": {"type": "integer", "default": 5} }, "required": ["query", "project_id"] } ), types.Tool( name="store_agent_memory", description="Store agent conversation/decision memory", inputSchema={ "type": "object", "properties": { "agent_id": {"type": "string"}, "memory_type": {"type": "string", "enum": ["conversation", "decision", "context"]}, "content": {"type": "string"}, "metadata": {"type": "object"}, "project_id": {"type": "string"} }, "required": ["agent_id", "memory_type", "content", "metadata", "project_id"] } ), # Batch processing tools types.Tool( name="process_document_batch", description="Process large batches of documents efficiently", inputSchema={ "type": "object", "properties": { "documents": { "type": "array", "items": { "type": "object", "properties": { "content": {"type": "string"}, "metadata": {"type": "object"}, "document_type": {"type": "string"} }, "required": ["content", "metadata", "document_type"] } }, "project_id": {"type": "string"} }, "required": ["documents", "project_id"] } ), types.Tool( name="batch_similarity_search", description="Perform batch similarity searches", inputSchema={ "type": "object", "properties": { "queries": {"type": "array", "items": {"type": "string"}}, "project_id": {"type": "string"}, "limit_per_query": {"type": "integer", "default": 10} }, "required": ["queries", "project_id"] } ), # Health check tool types.Tool( name="health_check", description="Check service health and connectivity", inputSchema={ "type": "object", "properties": {}, "required": [] } ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: """Handle tool calls""" try: if name == "create_character": # Original character creation logic character_data = { "name": arguments["name"], "description": arguments["description"], "traits": arguments.get("traits", []), "project_id": arguments["project_id"] } # Store as document document_id = await knowledge_service.store_document( content=f"Character: {character_data['name']} - {character_data['description']}", metadata={ "document_type": "character", "name": character_data["name"], "traits": character_data["traits"] }, project_id=arguments["project_id"] ) return [types.TextContent( type="text", text=f"Character created with ID: {document_id}" )] elif name == "find_similar_characters": # Find similar characters using embedding search query_embedding = await knowledge_service.embed_text( arguments["description"], arguments["project_id"] ) results = await knowledge_service.search_by_embedding( query_embedding.embedding, arguments["project_id"], arguments.get("limit", 5) ) return [types.TextContent( type="text", text=f"Found {len(results.results)} similar characters in {results.query_time_ms}ms" )] elif name == "embed_text": result = await knowledge_service.embed_text( arguments["text"], arguments["project_id"] ) return [types.TextContent( type="text", text=f"Text embedded with document ID: {result.document_id}" )] elif name == "search_by_embedding": results = await knowledge_service.search_by_embedding( arguments["embedding"], arguments["project_id"], arguments.get("limit", 10) ) return [types.TextContent( type="text", text=f"Found {results.total_count} results in {results.query_time_ms}ms" )] elif name == "store_document": document_id = await knowledge_service.store_document( arguments["content"], arguments["metadata"], arguments["project_id"] ) return [types.TextContent( type="text", text=f"Document stored with ID: {document_id}" )] elif name == "create_relationship": success = await knowledge_service.create_relationship( arguments["from_id"], arguments["to_id"], arguments["relationship_type"], arguments.get("properties", {}) ) return [types.TextContent( type="text", text=f"Relationship created: {success}" )] elif name == "query_graph": results = await knowledge_service.query_graph( arguments["cypher_query"], arguments["project_id"], arguments.get("parameters", {}) ) return [types.TextContent( type="text", text=f"Query executed: {len(results.records)} records in {results.query_time_ms}ms" )] elif name == "get_node_neighbors": results = await knowledge_service.get_node_neighbors( arguments["node_id"], arguments["project_id"] ) return [types.TextContent( type="text", text=f"Found {len(results.neighbors)} neighbors and {len(results.relationships)} relationships" )] elif name == "batch_embed_texts": results = await knowledge_service.batch_embed_texts( arguments["texts"], arguments["project_id"] ) return [types.TextContent( type="text", text=f"Batch embedded {len(results)} texts" )] elif name == "bulk_store_documents": documents = [ Document( content=doc["content"], metadata=doc["metadata"], document_type=doc["document_type"], project_id=arguments["project_id"] ) for doc in arguments["documents"] ] document_ids = await knowledge_service.bulk_store_documents(documents, arguments["project_id"]) return [types.TextContent( type="text", text=f"Bulk stored {len(document_ids)} documents" )] elif name == "store_workflow_data": from datetime import datetime workflow_data = WorkflowData( workflow_id=arguments["workflow_id"], agent_id=arguments["agent_id"], step_name=arguments["step_name"], input_data=arguments["input_data"], output_data=arguments["output_data"], execution_time_ms=arguments["execution_time_ms"], project_id=arguments["project_id"], timestamp=datetime.utcnow() ) workflow_id = await knowledge_service.store_workflow_data(workflow_data) return [types.TextContent( type="text", text=f"Workflow data stored with ID: {workflow_id}" )] elif name == "search_similar_workflows": results = await knowledge_service.search_similar_workflows( arguments["query"], arguments["project_id"], arguments.get("limit", 5) ) return [types.TextContent( type="text", text=f"Found {len(results.results)} similar workflows" )] elif name == "store_agent_memory": from datetime import datetime agent_memory = AgentMemory( agent_id=arguments["agent_id"], memory_type=arguments["memory_type"], content=arguments["content"], metadata=arguments["metadata"], project_id=arguments["project_id"], timestamp=datetime.utcnow() ) memory_id = await knowledge_service.store_agent_memory(agent_memory) return [types.TextContent( type="text", text=f"Agent memory stored with ID: {memory_id}" )] elif name == "process_document_batch": documents = [ Document( content=doc["content"], metadata=doc["metadata"], document_type=doc["document_type"], project_id=arguments["project_id"] ) for doc in arguments["documents"] ] result = await batch_service.process_document_batch(documents, arguments["project_id"]) return [types.TextContent( type="text", text=f"Batch processed: {result['processed_count']}/{result['total_documents']} documents in {result['processing_time_seconds']:.2f}s" )] elif name == "batch_similarity_search": result = await batch_service.batch_similarity_search( arguments["queries"], arguments["project_id"], arguments.get("limit_per_query", 10) ) return [types.TextContent( type="text", text=f"Batch search: {result['successful_searches']}/{result['total_queries']} queries in {result['processing_time_seconds']:.2f}s" )] elif name == "health_check": jina_health = await jina_service.health_check() neo4j_health = await neo4j_client.health_check() return [types.TextContent( type="text", text=f"Health Check - Jina: {jina_health['status']}, Neo4j: {neo4j_health['status']}" )] else: return [types.TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: logger.error(f"Tool call failed: {str(e)}") return [types.TextContent( type="text", text=f"Error: {str(e)}" )] async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="mcp-brain-service", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=None, experimental_capabilities=None, ), ), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jomapps/mcp-brain-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server