Skip to main content
Glama
server.py22 kB
"""ickyMCP - RAG MCP Server for Document Search. Multi-tenant architecture: - Each user has their own database (user_id required for all operations) - Documents are isolated per user - Search can be filtered to specific document IDs for chat/matter isolation """ import asyncio import logging from pathlib import Path from typing import Optional from datetime import datetime from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from .config import ( CHUNK_SIZE, CHUNK_OVERLAP, SUPPORTED_EXTENSIONS, DB_PATH, EMBEDDING_PROVIDER, VOYAGE_MODEL, LOCAL_EMBEDDING_MODEL, get_user_db_path ) from .database import VectorDatabase from .parsers import parse_document, is_supported from .chunker import TextChunker from .embedder import get_embedder # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ickyMCP") # Initialize server server = Server("ickyMCP") # Global instances - chunker is shared, but DB is per-user chunker: Optional[TextChunker] = None # Cache of user databases to avoid reconnecting _user_dbs: dict[str, VectorDatabase] = {} def get_db(user_id: Optional[str] = None) -> VectorDatabase: """Get or create database connection for a user. Args: user_id: User identifier. If None, uses legacy single-DB mode. Returns: VectorDatabase instance for the user """ if user_id is None: # Legacy mode - single shared database if "_legacy" not in _user_dbs: db = VectorDatabase(DB_PATH) db.connect() _user_dbs["_legacy"] = db return _user_dbs["_legacy"] # Multi-tenant mode - per-user database if user_id not in _user_dbs: db_path = get_user_db_path(user_id) db = VectorDatabase(db_path) db.connect() _user_dbs[user_id] = db logger.info(f"Initialized database for user {user_id} at {db_path}") return _user_dbs[user_id] def get_chunker() -> TextChunker: """Get or create chunker instance.""" global chunker if chunker is None: chunker = TextChunker(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) return chunker @server.list_tools() async def list_tools() -> list[Tool]: """List available tools.""" return [ Tool( name="index", description="Index documents from a file or directory for semantic search. Supports: .txt, .md, .pdf, .docx, .pptx, .xlsx", inputSchema={ "type": "object", "properties": { "path": { "type": "string", "description": "File or directory path to index" }, "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation. Each user has their own database." }, "patterns": { "type": "array", "items": {"type": "string"}, "description": "Glob patterns to match (e.g., ['*.pdf', '*.docx']). Default: all supported types", "default": ["*"] }, "exclude": { "type": "array", "items": {"type": "string"}, "description": "Patterns to exclude", "default": [] }, "force": { "type": "boolean", "description": "Force re-index even if file unchanged", "default": False } }, "required": ["path"] } ), Tool( name="search", description="Semantic search across indexed documents. Returns relevant text chunks. Use document_ids to filter to specific documents (e.g., for chat/matter isolation).", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Natural language search query" }, "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation" }, "document_ids": { "type": "array", "items": {"type": "integer"}, "description": "Filter to only search within specific document IDs. Use this to limit search to documents selected for a particular chat or matter." }, "top_k": { "type": "integer", "description": "Number of results to return", "default": 10 }, "path_filter": { "type": "string", "description": "Filter results to paths starting with this prefix" }, "file_types": { "type": "array", "items": {"type": "string"}, "description": "Filter by file types (e.g., ['pdf', 'docx'])" } }, "required": ["query"] } ), Tool( name="similar", description="Find chunks similar to a given text. Useful for finding related content.", inputSchema={ "type": "object", "properties": { "chunk_text": { "type": "string", "description": "Text to find similar chunks for" }, "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation" }, "document_ids": { "type": "array", "items": {"type": "integer"}, "description": "Filter to only search within specific document IDs" }, "top_k": { "type": "integer", "description": "Number of results to return", "default": 10 }, "exclude_same_doc": { "type": "boolean", "description": "Exclude chunks from the same document", "default": True } }, "required": ["chunk_text"] } ), Tool( name="refresh", description="Re-index only files that have changed since last indexing.", inputSchema={ "type": "object", "properties": { "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation" }, "path": { "type": "string", "description": "Optional: refresh only this path/directory" } } } ), Tool( name="list", description="List all indexed documents. Returns document IDs that can be used for filtering searches.", inputSchema={ "type": "object", "properties": { "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation" }, "path_filter": { "type": "string", "description": "Filter to paths starting with this prefix" } } } ), Tool( name="delete", description="Remove documents from the index.", inputSchema={ "type": "object", "properties": { "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation" }, "path": { "type": "string", "description": "Path to delete (file or directory prefix)" }, "document_ids": { "type": "array", "items": {"type": "integer"}, "description": "Delete specific documents by ID" }, "all": { "type": "boolean", "description": "Delete entire index (requires confirmation)", "default": False } } } ), Tool( name="status", description="Get server status and index statistics.", inputSchema={ "type": "object", "properties": { "user_id": { "type": "string", "description": "User identifier for multi-tenant isolation" } } } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls.""" try: if name == "index": result = await handle_index(arguments) elif name == "search": result = await handle_search(arguments) elif name == "similar": result = await handle_similar(arguments) elif name == "refresh": result = await handle_refresh(arguments) elif name == "list": result = await handle_list(arguments) elif name == "delete": result = await handle_delete(arguments) elif name == "status": result = await handle_status(arguments) else: result = f"Unknown tool: {name}" return [TextContent(type="text", text=str(result))] except Exception as e: logger.exception(f"Error in tool {name}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_index(args: dict) -> dict: """Index documents from a path (file or directory). When given a directory, recursively indexes all supported files within it. This is useful for indexing a "matter" folder containing multiple documents. Returns document IDs that can be used for filtering searches. """ path = Path(args["path"]) user_id = args.get("user_id") patterns = args.get("patterns", ["*"]) exclude = args.get("exclude", []) force = args.get("force", False) if not path.exists(): return {"error": f"Path does not exist: {path}"} database = get_db(user_id) text_chunker = get_chunker() embedder = get_embedder() # Ensure model is loaded (only for local embedder) if hasattr(embedder, 'load_model'): embedder.load_model() indexed = 0 skipped = 0 errors = [] indexed_documents = [] # Track indexed docs with their IDs # Collect files to process files_to_process = [] if path.is_file(): if is_supported(path): files_to_process.append(path) else: # Directory - recursively find all matching files for pattern in patterns: for file_path in path.rglob(pattern): if file_path.is_file() and is_supported(file_path): # Check exclusions excluded = any(file_path.match(ex) for ex in exclude) if not excluded and file_path not in files_to_process: files_to_process.append(file_path) # Process each file for file_path in files_to_process: try: stat = file_path.stat() # Check if needs re-indexing if not force and not database.document_needs_reindex( str(file_path), stat.st_mtime, stat.st_size ): skipped += 1 continue # Delete existing if re-indexing database.delete_document(str(file_path)) # Parse document parsed = parse_document(file_path) if parsed is None: errors.append(f"Could not parse: {file_path}") continue # Chunk the text chunks = text_chunker.chunk_text(parsed.text) if not chunks: errors.append(f"No content extracted: {file_path}") continue # Generate embeddings for all chunks chunk_texts = [c.text for c in chunks] embeddings = embedder.embed_documents(chunk_texts, show_progress=False) # Store document doc_id = database.add_document( path=str(file_path), file_type=parsed.file_type, file_size=stat.st_size, modified_time=stat.st_mtime, page_count=parsed.page_count ) # Store chunks with embeddings for chunk, embedding in zip(chunks, embeddings): database.add_chunk( document_id=doc_id, chunk_index=chunk.chunk_index, chunk_text=chunk.text, token_count=chunk.token_count, embedding=embedding, page_number=chunk.page_number, start_char=chunk.start_char, end_char=chunk.end_char ) database.update_document_chunk_count(doc_id, len(chunks)) indexed += 1 # Track indexed document with its ID for return indexed_documents.append({ "id": doc_id, "path": str(file_path), "file_type": parsed.file_type, "chunks": len(chunks), "pages": parsed.page_count }) logger.info(f"Indexed {file_path}: {len(chunks)} chunks (doc_id={doc_id})") except Exception as e: errors.append(f"{file_path}: {str(e)}") logger.exception(f"Error indexing {file_path}") return { "indexed": indexed, "skipped": skipped, "errors": errors if errors else None, "total_files_found": len(files_to_process), "documents": indexed_documents # List of indexed docs with IDs } async def handle_search(args: dict) -> dict: """Search indexed documents.""" query = args["query"] user_id = args.get("user_id") document_ids = args.get("document_ids") top_k = args.get("top_k", 10) path_filter = args.get("path_filter") file_types = args.get("file_types") database = get_db(user_id) embedder = get_embedder() # Ensure model is loaded (only for local embedder) if hasattr(embedder, 'load_model'): embedder.load_model() # Generate query embedding query_embedding = embedder.embed_query(query) # Search with document_ids filter results = database.search( query_embedding=query_embedding, top_k=top_k, path_filter=path_filter, file_types=file_types, document_ids=document_ids ) return { "query": query, "results": results, "count": len(results), "filtered_to_documents": document_ids if document_ids else "all" } async def handle_similar(args: dict) -> dict: """Find similar chunks to given text.""" chunk_text = args["chunk_text"] user_id = args.get("user_id") document_ids = args.get("document_ids") top_k = args.get("top_k", 10) exclude_same_doc = args.get("exclude_same_doc", True) database = get_db(user_id) embedder = get_embedder() # Ensure model is loaded (only for local embedder) if hasattr(embedder, 'load_model'): embedder.load_model() # Generate embedding for the input text text_embedding = embedder.embed_for_similarity(chunk_text) # Search for similar (get extra results if excluding same doc) search_k = top_k * 2 if exclude_same_doc else top_k results = database.search( query_embedding=text_embedding, top_k=search_k, document_ids=document_ids ) # Filter out same document if requested if exclude_same_doc and results: # Try to identify source document from the input text # (this is a heuristic - exact match on first few chars) filtered = [] for r in results: # Simple check: if chunk text is too similar, skip if chunk_text[:100] not in r["chunk_text"][:100]: filtered.append(r) if len(filtered) >= top_k: break results = filtered return { "input_preview": chunk_text[:200] + "..." if len(chunk_text) > 200 else chunk_text, "results": results[:top_k], "count": len(results[:top_k]) } async def handle_refresh(args: dict) -> dict: """Refresh index for changed files.""" user_id = args.get("user_id") path = args.get("path") database = get_db(user_id) # Get all indexed documents docs = database.list_documents(path_filter=path) updated = 0 added = 0 removed = 0 for doc in docs: doc_path = Path(doc["path"]) if not doc_path.exists(): # File was deleted database.delete_document(doc["path"]) removed += 1 elif database.document_needs_reindex(doc["path"], doc_path.stat().st_mtime, doc_path.stat().st_size): # File was modified - re-index it result = await handle_index({"path": doc["path"], "user_id": user_id, "force": True}) if result.get("indexed", 0) > 0: updated += 1 return { "updated": updated, "added": added, "removed": removed, "checked": len(docs) } async def handle_list(args: dict) -> dict: """List indexed documents with their IDs for document selection.""" user_id = args.get("user_id") path_filter = args.get("path_filter") database = get_db(user_id) docs = database.list_documents(path_filter=path_filter) return { "documents": [ { "id": d["id"], # Document ID for use with document_ids filter "path": d["path"], "file_type": d["file_type"], "chunks": d["chunk_count"], "pages": d["page_count"], "indexed_at": d["indexed_at"] } for d in docs ], "count": len(docs) } async def handle_delete(args: dict) -> dict: """Delete documents from index.""" user_id = args.get("user_id") path = args.get("path") document_ids = args.get("document_ids") delete_all = args.get("all", False) database = get_db(user_id) if delete_all: count = database.delete_all() return {"deleted_chunks": count, "message": "Deleted entire index"} # Delete by document IDs if provided if document_ids: deleted_docs = 0 deleted_chunks = 0 for doc_id in document_ids: doc = database.get_document_by_id(doc_id) if doc: chunks = database.delete_document(doc["path"]) deleted_chunks += chunks deleted_docs += 1 return { "deleted_documents": deleted_docs, "deleted_chunks": deleted_chunks } if not path: return {"error": "Must specify 'path', 'document_ids', or 'all: true'"} # Check if it's a prefix (directory) or exact file docs = database.list_documents(path_filter=path) deleted = 0 for doc in docs: chunks = database.delete_document(doc["path"]) deleted += chunks return { "deleted_documents": len(docs), "deleted_chunks": deleted } async def handle_status(args: dict) -> dict: """Get server status.""" user_id = args.get("user_id") database = get_db(user_id) embedder = get_embedder() stats = database.get_stats() model_info = embedder.get_model_info() return { **stats, "user_id": user_id if user_id else "legacy_mode", "provider": model_info.get("provider", EMBEDDING_PROVIDER), "model": model_info.get("model_name", VOYAGE_MODEL if EMBEDDING_PROVIDER == "voyage" else LOCAL_EMBEDDING_MODEL), "chunk_size": CHUNK_SIZE, "chunk_overlap": CHUNK_OVERLAP, "supported_types": list(SUPPORTED_EXTENSIONS.keys()), "embedding_dimensions": embedder.dimensions } async def main(): """Run the MCP server.""" logger.info("Starting ickyMCP server...") # Initialize embedder embedder = get_embedder() model_info = embedder.get_model_info() logger.info(f"Embedder: {model_info.get('provider')} - {model_info.get('model_name')}") # Pre-load local model if using local embeddings if hasattr(embedder, 'load_model'): embedder.load_model() logger.info("Local model loaded successfully") # Initialize database database = get_db() logger.info(f"Database initialized at: {DB_PATH}") # Run server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dl1683/ickyMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server