Skip to main content
Glama

Knowledge Base MCP Server

by Leon4s4
MIT License
kb_server.py18.5 kB
#!/usr/bin/env python3 """ Knowledge Base MCP Server A mem0-like memory system for GitHub Copilot that provides persistent knowledge storage and retrieval capabilities using local ChromaDB. """ import asyncio import json import os import re import subprocess import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Union from urllib.parse import urlparse import chromadb from chromadb.config import Settings from chromadb.utils import embedding_functions from mcp.server.fastmcp import FastMCP # Initialize FastMCP server mcp = FastMCP("knowledge-base") # Global variables for database connection chroma_client: Optional[chromadb.PersistentClient] = None collection: Optional[chromadb.Collection] = None # Configuration KB_DATA_DIR = os.getenv("KB_DATA_DIR", "./kb_data") KB_INITIAL_FILE = os.getenv("KB_INITIAL_FILE", None) EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2" COLLECTION_NAME = "knowledge_base" ui_process: Optional[subprocess.Popen] = None def init_database(): """Initialize ChromaDB with built-in embeddings.""" global chroma_client, collection # Create data directory if it doesn't exist os.makedirs(KB_DATA_DIR, exist_ok=True) try: # Initialize ChromaDB client chroma_client = chromadb.PersistentClient( path=KB_DATA_DIR, settings=Settings( anonymized_telemetry=False, allow_reset=True, is_persistent=True, persist_directory=KB_DATA_DIR ) ) # Create embedding function (uses default model) default_ef = embedding_functions.DefaultEmbeddingFunction() # Get or create collection with embedding function collection = chroma_client.get_or_create_collection( name=COLLECTION_NAME, embedding_function=default_ef, metadata={"description": "Knowledge base for development memories"} ) except Exception as e: print(f"Error initializing ChromaDB: {e}") print("Please check permissions, disk space, and configuration settings.") raise print(f"Knowledge Base initialized at: {KB_DATA_DIR}") print(f"Collection: {COLLECTION_NAME}") print(f"Embedding: Default ChromaDB embeddings") # Load initial knowledge file if specified if KB_INITIAL_FILE: load_initial_knowledge(KB_INITIAL_FILE) def load_initial_knowledge(file_path: str): """Load initial knowledge from a text file.""" if not collection: print("Cannot load initial knowledge: Database not initialized") return try: if not os.path.exists(file_path): print(f"Initial knowledge file not found: {file_path}") return # Check marker file to see if initial knowledge was already loaded marker_file = os.path.join(KB_DATA_DIR, ".initial_knowledge_loaded") if os.path.exists(marker_file): print(f"Initial knowledge already loaded (marker file exists), skipping...") return print(f"Loading initial knowledge from: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() if not content: print("Initial knowledge file is empty") return # Split content by double newlines to separate entries entries = [entry.strip() for entry in content.split('\n\n') if entry.strip()] if not entries: print("No knowledge entries found in file") return loaded_count = 0 for i, entry in enumerate(entries): try: # Generate unique ID for each entry entry_id = f"initial_{i}_{uuid.uuid4().hex[:8]}" # Extract metadata metadata = extract_metadata(entry) metadata["source"] = "initial_knowledge" metadata["initial_load"] = True # Store in ChromaDB collection.add( ids=[entry_id], documents=[entry], metadatas=[metadata] ) loaded_count += 1 except Exception as e: print(f"Failed to load entry {i+1}: {str(e)}") continue print(f"Loaded {loaded_count} knowledge entries from initial file") # Create marker file to indicate initial knowledge has been loaded marker_file = os.path.join(KB_DATA_DIR, ".initial_knowledge_loaded") with open(marker_file, 'w') as f: f.write(f"Initial knowledge loaded on {datetime.now(timezone.utc).isoformat()}\n") f.write(f"Source file: {file_path}\n") f.write(f"Entries loaded: {loaded_count}\n") except Exception as e: print(f"Error loading initial knowledge: {str(e)}") def extract_metadata(content: str) -> Dict[str, Any]: """Extract metadata from content automatically.""" metadata = { "created_at": datetime.now(timezone.utc).isoformat(), "last_accessed": datetime.now(timezone.utc).isoformat(), "access_count": 0, "technologies": "", "urls": "", "language": "", "memory_type": "general" } # Extract URLs url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' urls = re.findall(url_pattern, content) if urls: metadata["urls"] = ",".join(urls) # Extract code blocks and detect language code_pattern = r'```(\w+)?\n(.*?)```' code_matches = re.findall(code_pattern, content, re.DOTALL) if code_matches: metadata["memory_type"] = "code_snippet" for lang, _ in code_matches: if lang: metadata["language"] = lang break # Extract technologies (common ones) tech_keywords = [ "graphql", "sql", "postgresql", "mysql", "mongodb", "redis", "docker", "kubernetes", "aws", "azure", "gcp", "terraform", "react", "vue", "angular", "node", "python", "java", "csharp", "javascript", "typescript", "go", "rust", "c++", "splunk", "dynatrace", "grafana", "prometheus", "jenkins", "git", "github", "gitlab", "bitbucket", "jira", "confluence", "tanzu", "pcf", "openshift", "helm", "istio", "envoy" ] content_lower = content.lower() found_techs = [tech for tech in tech_keywords if tech in content_lower] metadata["technologies"] = ",".join(found_techs) if found_techs else "" # Determine memory type based on content if any(keyword in content_lower for keyword in ["config", "configuration", "environment", "env"]): metadata["memory_type"] = "environment" elif any(keyword in content_lower for keyword in ["architecture", "design", "decision", "pattern"]): metadata["memory_type"] = "architectural" elif any(keyword in content_lower for keyword in ["error", "fix", "troubleshoot", "debug", "restart"]): metadata["memory_type"] = "operational" elif urls or "dashboard" in content_lower: if metadata["memory_type"] == "general": metadata["memory_type"] = "environment" return metadata @mcp.tool( annotations={ "title": "Save Knowledge", "description": "Save a memory or piece of knowledge to the knowledge base", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False } ) async def kb_save( content: str, memory_type: Optional[str] = None, tags: Optional[List[str]] = None ) -> str: """Save knowledge to the persistent memory store. Args: content: The content/memory to save memory_type: Optional type of memory (environment, code_snippet, operational, architectural) tags: Optional list of tags for categorization """ if not collection: return "Error: Knowledge base not initialized" try: # Generate unique ID memory_id = str(uuid.uuid4()) # Extract metadata metadata = extract_metadata(content) # Override memory type if provided if memory_type and memory_type in ["environment", "code_snippet", "operational", "architectural"]: metadata["memory_type"] = memory_type # Add tags if provided if tags: metadata["tags"] = ",".join(tags) # Store in ChromaDB (embeddings auto-generated) collection.add( ids=[memory_id], documents=[content], metadatas=[metadata] ) techs = metadata['technologies'].split(',') if metadata['technologies'] else [] return f"Memory saved successfully!\nID: {memory_id}\nType: {metadata['memory_type']}\nTechnologies: {', '.join(techs) if techs else 'None'}" except Exception as e: return f"Error saving memory: {str(e)}" @mcp.tool( annotations={ "title": "Search Knowledge", "description": "Search the knowledge base for relevant memories", "readOnlyHint": True, "openWorldHint": False, "idempotentHint": True } ) async def kb_search( query: str, limit: int = 5, memory_type: Optional[str] = None, include_metadata: bool = True ) -> str: """Search for relevant knowledge in the memory store. Args: query: Search query to find relevant memories limit: Maximum number of results to return (default: 5) memory_type: Filter by memory type (environment, code_snippet, operational, architectural) include_metadata: Whether to include metadata in results """ if not collection: return "Error: Knowledge base not initialized" try: # Build where clause for filtering where_clause = {} if memory_type: where_clause["memory_type"] = memory_type # Search in ChromaDB (embeddings auto-generated for query) results = collection.query( query_texts=[query], n_results=min(limit, 10), # Cap at 10 results where=where_clause if where_clause else None, include=["documents", "metadatas", "distances"] ) if not results["documents"] or not results["documents"][0]: return "No relevant memories found for your query." # Format results and update access tracking formatted_results = [] documents = results["documents"][0] metadatas = results["metadatas"][0] if results["metadatas"] else [] distances = results["distances"][0] if results["distances"] else [] ids = results["ids"][0] if results["ids"] else [] for i, doc in enumerate(documents): result = f"**Memory {i+1}**:\n{doc}" if include_metadata and i < len(metadatas): metadata = metadatas[i] result += f"\n\n**Details:**" result += f"\n- Type: {metadata.get('memory_type', 'Unknown')}" result += f"\n- Created: {metadata.get('created_at', 'Unknown')}" if metadata.get('technologies'): techs = metadata['technologies'].split(',') if metadata['technologies'] else [] if techs: result += f"\n- Technologies: {', '.join(techs)}" if metadata.get('urls'): urls = metadata['urls'].split(',') if metadata['urls'] else [] if urls: result += f"\n- URLs: {', '.join(urls)}" if i < len(distances): similarity = 1 - distances[i] # Convert distance to similarity result += f"\n- Relevance: {similarity:.2%}" # Update access count and persist to ChromaDB try: if i < len(ids): current_count = metadata.get('access_count', 0) updated_metadata = metadata.copy() updated_metadata['access_count'] = current_count + 1 updated_metadata['last_accessed'] = datetime.now(timezone.utc).isoformat() # Update the metadata in ChromaDB collection.update( ids=[ids[i]], metadatas=[updated_metadata] ) # Show updated count in the result result += f"\n- Access Count: {updated_metadata['access_count']}" except Exception: pass # Ignore metadata update errors formatted_results.append(result) response = f"**Found {len(documents)} relevant memories:**\n\n" response += "\n\n" + "="*50 + "\n\n".join(formatted_results) return response except Exception as e: return f"Error searching memories: {str(e)}" @mcp.tool( annotations={ "title": "List Knowledge", "description": "List all saved memories with optional filtering", "readOnlyHint": True, "openWorldHint": False, "idempotentHint": True } ) async def kb_list( memory_type: Optional[str] = None, limit: int = 10, include_content: bool = False ) -> str: """List saved memories in the knowledge base. Args: memory_type: Filter by memory type (environment, code_snippet, operational, architectural) limit: Maximum number of entries to return include_content: Whether to include full content (default: False, shows summary only) """ if not collection: return "Error: Knowledge base not initialized" try: # Build where clause for filtering where_clause = {} if memory_type: where_clause["memory_type"] = memory_type # Get entries from ChromaDB results = collection.get( where=where_clause if where_clause else None, limit=min(limit, 50), # Cap at 50 entries include=["documents", "metadatas"] ) if not results["ids"]: filter_msg = f" with type '{memory_type}'" if memory_type else "" return f"No memories found{filter_msg}." # Format results formatted_entries = [] for i, memory_id in enumerate(results["ids"]): doc = results["documents"][i] if i < len(results["documents"]) else "" metadata = results["metadatas"][i] if i < len(results["metadatas"]) else {} # Create summary if include_content: content = doc else: content = doc[:100] + "..." if len(doc) > 100 else doc entry = f"**{i+1}.** `{memory_id[:8]}...`\n" entry += f"{content}\n" entry += f"Type: {metadata.get('memory_type', 'Unknown')}" if metadata.get('technologies'): techs = metadata['technologies'].split(',') if metadata['technologies'] else [] if techs: entry += f" | Tech: {', '.join(techs[:3])}" entry += f"\nCreated: {metadata.get('created_at', 'Unknown')[:10]}" if metadata.get('access_count', 0) > 0: entry += f" | Accessed: {metadata['access_count']} times" formatted_entries.append(entry) response = f"**Knowledge Base ({len(results['ids'])} entries" if memory_type: response += f", type: {memory_type}" response += "):**\n\n" response += "\n\n".join(formatted_entries) return response except Exception as e: return f"Error listing memories: {str(e)}" @mcp.tool( annotations={ "title": "Delete Knowledge", "description": "Delete a memory from the knowledge base", "readOnlyHint": False, "destructiveHint": True, "idempotentHint": False } ) async def kb_delete(memory_id: str) -> str: """Delete a memory from the knowledge base. Args: memory_id: The ID of the memory to delete (can be partial ID from kb_list) """ if not collection: return "Error: Knowledge base not initialized" try: # Get all IDs to find matches all_results = collection.get(include=["documents", "metadatas"]) # Find matching IDs (support partial ID matching) matching_ids = [] for full_id in all_results["ids"]: if memory_id.lower() in full_id.lower(): matching_ids.append(full_id) if not matching_ids: return f"No memory found with ID containing: {memory_id}" if len(matching_ids) > 1: return f"Multiple memories match '{memory_id}'. Please use a more specific ID:\n" + \ "\n".join([f"- {mid[:16]}..." for mid in matching_ids[:5]]) # Delete the memory memory_to_delete = matching_ids[0] collection.delete(ids=[memory_to_delete]) return f"Memory deleted successfully: {memory_to_delete[:16]}..." except Exception as e: return f"Error deleting memory: {str(e)}" def main() -> None: """Initialize and run the MCP server.""" print("Initializing Knowledge Base MCP Server...") try: # Initialize database init_database() print("Knowledge Base MCP Server ready!") print("Running on stdio transport...") # Run the server (blocking call) mcp.run(transport="stdio") except KeyboardInterrupt: print("\nKnowledge Base MCP Server shutting down...") except Exception as e: print(f"Error starting server: {e}") raise finally: if ui_process: ui_process.terminate() ui_process.wait(timeout=5) print("Streamlit UI stopped.") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Leon4s4/knowledge-base-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server