Skip to main content
Glama

MCP Document Indexer

by yairwein
llm.py11.6 kB
"""Local LLM integration for document processing.""" import json import logging from typing import Optional, Dict, Any, List import asyncio from concurrent.futures import ThreadPoolExecutor import ollama from ollama import AsyncClient logger = logging.getLogger(__name__) class LocalLLM: """Interface to local LLM via Ollama.""" def __init__(self, model: str, base_url: str): self.model = model self.base_url = base_url self.client = None self.async_client = None self.executor = ThreadPoolExecutor(max_workers=2) self._available = None async def initialize(self): """Initialize async client.""" self.async_client = AsyncClient(host=self.base_url) self._available = await self.check_availability() if self._available: logger.info(f"Ollama model {self.model} is available") else: logger.warning(f"Ollama model {self.model} is not available, using fallback") @property def is_available(self) -> bool: """Check if the LLM is available.""" return self._available if self._available is not None else False async def check_availability(self) -> bool: """Check if Ollama and the model are available.""" try: # Check if Ollama is running response = await self.async_client.list() # Handle both dict and list responses if isinstance(response, list): model_list = response elif isinstance(response, dict): model_list = response.get('models', []) else: model_list = [] # Extract model names safely model_names = [] for m in model_list: if isinstance(m, dict) and 'name' in m: model_names.append(m['name']) elif isinstance(m, str): model_names.append(m) # Handle model name variations (e.g., llama3.2:3b vs llama3.2) base_model = self.model.split(':')[0] available = any(base_model in name for name in model_names) if not available: logger.info(f"Model {self.model} not found. Available models: {model_names}") # Try to pull the model try: logger.info(f"Attempting to pull model {self.model}...") await self.async_client.pull(self.model) return True except Exception as e: logger.error(f"Failed to pull model: {e}") return False return available except Exception as e: logger.error(f"Ollama not available: {e}") return False async def summarize_document(self, text: str, max_length: int = 500) -> str: """Generate a summary of the document.""" if not self._available: return self._fallback_summary(text, max_length) try: # Truncate very long texts to avoid timeout max_input_chars = 8000 if len(text) > max_input_chars: text = text[:max_input_chars] + "..." prompt = f"""Summarize the following document in {max_length} characters or less. Focus on the main topics, key points, and overall purpose of the document. Document: {text} Summary:""" response = await self.async_client.generate( model=self.model, prompt=prompt, options={ "temperature": 0.3, "top_p": 0.9, "num_predict": max_length } ) summary = response['response'].strip() # Ensure summary is not too long if len(summary) > max_length: summary = summary[:max_length].rsplit(' ', 1)[0] + "..." return summary except Exception as e: logger.error(f"Error generating summary with LLM: {e}") return self._fallback_summary(text, max_length) async def extract_keywords(self, text: str, num_keywords: int = 10) -> List[str]: """Extract keywords from the document.""" if not self._available: return self._fallback_keywords(text, num_keywords) try: # Truncate for keyword extraction max_input_chars = 4000 if len(text) > max_input_chars: text = text[:max_input_chars] prompt = f"""Extract {num_keywords} important keywords or key phrases from this document. Return only the keywords/phrases as a comma-separated list, nothing else. Document: {text} Keywords:""" response = await self.async_client.generate( model=self.model, prompt=prompt, options={ "temperature": 0.2, "top_p": 0.9, "num_predict": 200 } ) keywords_text = response['response'].strip() keywords = [k.strip() for k in keywords_text.split(',')] # Clean and validate keywords keywords = [k for k in keywords if k and len(k) > 2 and len(k) < 50] return keywords[:num_keywords] except Exception as e: logger.error(f"Error extracting keywords with LLM: {e}") return self._fallback_keywords(text, num_keywords) async def generate_embedding_text(self, text: str) -> str: """Generate text optimized for embedding.""" if not self._available: return text[:1000] # Simple truncation as fallback try: # For embedding, we want a concise representation max_input_chars = 3000 if len(text) > max_input_chars: text = text[:max_input_chars] prompt = f"""Create a concise version of this text that captures its main meaning and topics. Keep important keywords and concepts. Maximum 500 characters. Text: {text} Concise version:""" response = await self.async_client.generate( model=self.model, prompt=prompt, options={ "temperature": 0.2, "num_predict": 500 } ) return response['response'].strip() except Exception as e: logger.error(f"Error generating embedding text: {e}") return text[:1000] def _fallback_summary(self, text: str, max_length: int) -> str: """Simple fallback summary when LLM is not available.""" # Take first few sentences sentences = text.split('. ') summary = "" for sentence in sentences: if len(summary) + len(sentence) < max_length: summary += sentence + ". " else: break if not summary: summary = text[:max_length] return summary.strip() def _fallback_keywords(self, text: str, num_keywords: int) -> List[str]: """Simple keyword extraction when LLM is not available.""" import re from collections import Counter # Simple word frequency based extraction words = re.findall(r'\b[a-z]+\b', text.lower()) # Filter out common stop words stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'} words = [w for w in words if w not in stop_words and len(w) > 3] # Get most common words word_counts = Counter(words) keywords = [word for word, _ in word_counts.most_common(num_keywords)] return keywords async def generate(self, prompt: str, max_tokens: int = 1000) -> str: """Generate text from a prompt.""" if not self.is_available: # Use fallback response when Ollama is not available return f"[Mock response for: {prompt[:50]}...]" try: response = await self.async_client.generate( model=self.model, prompt=prompt, stream=False ) # Handle both dict and direct response if isinstance(response, dict): return response.get('response', '') else: return str(response) except Exception as e: logger.error(f"Error generating text: {e}") return f"[Fallback response for: {prompt[:50]}...]" async def close(self): """Clean up resources.""" self.executor.shutdown(wait=False) class DocumentProcessor: """Process documents using local LLM.""" def __init__(self, llm: LocalLLM): self.llm = llm async def process_document(self, doc_data: Dict[str, Any]) -> Dict[str, Any]: """Process a document and add LLM-generated metadata.""" text = doc_data.get('text', '') file_name = doc_data.get('metadata', {}).get('file_name', 'unknown') if not text: logger.info(f" → Empty document, skipping LLM processing") return { **doc_data, 'summary': 'Empty document', 'keywords': [], 'embedding_text': '' } logger.info(f" → Generating summary for {len(text):,} characters...") # Generate summary, keywords, and embedding text summary_task = self.llm.summarize_document(text, max_length=500) keywords_task = self.llm.extract_keywords(text, num_keywords=10) summary, keywords = await asyncio.gather(summary_task, keywords_task) logger.info(f" → Generated {len(summary)} char summary and {len(keywords)} keywords: {', '.join(keywords[:3])}{'...' if len(keywords) > 3 else ''}") # For embedding, use summary + keywords for better representation embedding_text = f"{summary} Keywords: {', '.join(keywords)}" return { **doc_data, 'summary': summary, 'keywords': keywords, 'embedding_text': embedding_text } async def process_batch(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process multiple documents in parallel.""" tasks = [self.process_document(doc) for doc in documents] return await asyncio.gather(*tasks) async def _generate_summary(self, text: str, max_length: int = 500) -> str: """Generate summary for text.""" return await self.llm.summarize_document(text, max_length=max_length) async def _extract_keywords(self, text: str, max_keywords: int = 10) -> List[str]: """Extract keywords from text.""" return await self.llm.extract_keywords(text, num_keywords=max_keywords)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yairwein/document-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server