Skip to main content
Glama
llm.py10.7 kB
import asyncio import logging import os from typing import Optional, List import json from openai import AsyncOpenAI, DefaultAioHttpClient from code_flow_graph.core.call_graph_builder import FunctionNode from code_flow_graph.core.vector_store import CodeVectorStore logger = logging.getLogger("mcp.llm") class SummaryGenerator: """Generates natural language summaries for code elements using an LLM.""" def __init__(self, config: dict): self.config = config.get('llm_config', {}) self.api_key = os.environ.get(self.config.get('api_key_env_var', 'OPENAI_API_KEY')) # Priority: Env Var > Config > Default self.base_url = os.environ.get('OPENAI_BASE_URL', self.config.get('base_url', 'https://openrouter.ai/api/v1')) self.model = os.environ.get('OPENAI_SUMMARY_MODEL', self.config.get('model', 'x-ai/grok-4.1-fast')) self.max_tokens = self.config.get('max_tokens', 256) self.max_input_tokens = self.config.get('max_input_tokens', 2000) self.summary_depth = self.config.get('summary_depth', 'standard') # Smart filtering options self.min_complexity = self.config.get('min_complexity', 0) self.min_nloc = self.config.get('min_nloc', 0) self.skip_private = self.config.get('skip_private', False) self.skip_test = self.config.get('skip_test', False) if not self.api_key: logger.warning("No API key found for SummaryGenerator. Summarization will fail.") # Initialize OpenAI client with aiohttp for better concurrency self.client = AsyncOpenAI( api_key=self.api_key, base_url=self.base_url, http_client=DefaultAioHttpClient() ) def should_summarize(self, node: FunctionNode) -> bool: """Determine if a function should be summarized based on filtering rules.""" # Check complexity threshold if node.complexity is not None and node.complexity < self.min_complexity: return False # Check NLOC threshold if node.nloc is not None and node.nloc < self.min_nloc: return False # Skip private functions if configured if self.skip_private: # Python convention: names starting with _ (but not __) if node.name.startswith('_') and not node.name.startswith('__'): return False # TypeScript/Java convention: access_modifier field if node.access_modifier in ['private', 'protected']: return False # Skip test functions if configured if self.skip_test: name_lower = node.name.lower() if name_lower.startswith('test_') or name_lower.endswith('_test') or 'test' in name_lower: return False return True async def generate_summary(self, node: FunctionNode, source_code: str) -> Optional[str]: """ Generate a concise summary for a function node. Args: node: The FunctionNode to summarize. source_code: The source code of the function. Returns: The generated summary string, or None if generation failed. """ if not self.api_key: return None # Check if this function should be summarized if not self.should_summarize(node): logger.debug(f"Skipping {node.fully_qualified_name} based on filtering rules") return None try: prompt = self._construct_prompt(node, source_code) response = await self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "You are an expert software engineer. Generate a concise, natural language summary for the provided code component. Focus on WHAT it does and WHY, not just translating code to text."}, {"role": "user", "content": prompt} ], max_tokens=self.max_tokens, temperature=0.3 ) summary = response.choices[0].message.content.strip() return summary except Exception as e: logger.error(f"Error generating summary for {node.fully_qualified_name}: {e}") return None def _construct_prompt(self, node: FunctionNode, source_code: str) -> str: """Construct the prompt for the LLM based on depth setting.""" signature = f"def {node.name}({', '.join(node.parameters)}) -> {node.return_type}" # Truncate source code if too long if len(source_code) > self.max_input_tokens: # Rough estimate: 1 token ≈ 4 characters max_chars = self.max_input_tokens * 4 source_code = source_code[:max_chars] + "\n... (truncated)" if self.summary_depth == "minimal": # Minimal: Just name, signature, and code return f""" Component Type: {'Method' if node.is_method else 'Function'} Name: {node.name} Signature: {signature} Code: {source_code} Summary: """ elif self.summary_depth == "detailed": # Detailed: Include all available metadata docstring = node.docstring or "None" complexity = f"Complexity: {node.complexity}" if node.complexity else "" nloc = f"NLOC: {node.nloc}" if node.nloc else "" decorators = f"Decorators: {[d.get('name', '') for d in node.decorators]}" if node.decorators else "" return f""" Component Type: {'Method' if node.is_method else 'Function'} Name: {node.name} Signature: {signature} Docstring: {docstring} {complexity} {nloc} {decorators} Code: {source_code} Summary: """ else: # Standard: Name, signature, docstring, code docstring = node.docstring or "None" return f""" Component Type: {'Method' if node.is_method else 'Function'} Name: {node.name} Signature: {signature} Docstring: {docstring} Code: {source_code} Summary: """ class SummaryProcessor: """Manages background processing of summary generation.""" def __init__(self, generator: SummaryGenerator, builder, vector_store: CodeVectorStore, concurrency: int = 5, prioritize_entry_points: bool = False): self.generator = generator self.builder = builder self.vector_store = vector_store self.queue = asyncio.Queue() self.concurrency = concurrency self.workers = [] self.running = False self.prioritize_entry_points = prioritize_entry_points def start(self): """Start the background worker tasks.""" if self.running: return self.running = True # Check if we're in an async context or need to create one try: loop = asyncio.get_running_loop() # We're in an async context (MCP server) for _ in range(self.concurrency): task = asyncio.create_task(self._worker()) self.workers.append(task) except RuntimeError: # No running loop (CLI tool) - we'll create tasks when needed # Workers will be created in the event loop when we call asyncio.run() pass logger.info(f"Started {self.concurrency} summary processor workers") async def start_async(self): """Start workers in an async context.""" if self.running: return self.running = True for _ in range(self.concurrency): task = asyncio.create_task(self._worker()) self.workers.append(task) logger.info(f"Started {self.concurrency} summary processor workers") async def stop(self): """Stop the background workers.""" self.running = False # Send sentinel values to stop workers for _ in range(self.concurrency): await self.queue.put(None) if self.workers: await asyncio.gather(*self.workers) logger.info("Stopped summary processor workers") def enqueue(self, fqn: str): """Enqueue a function FQN for summarization.""" self.queue.put_nowait(fqn) async def _worker(self): """Worker task to process the queue.""" while self.running: try: fqn = await self.queue.get() if fqn is None: self.queue.task_done() break try: # Get node from builder node = self.builder.functions.get(fqn) if not node: logger.warning(f"Node {fqn} not found in builder, skipping summary") continue # Get source code (we need to read file again or have it cached) # For now, read file. Optimization: Cache source in builder or pass it. # Reading file is safer for latest content. try: with open(node.file_path, 'r', encoding='utf-8') as f: full_source = f.read() lines = full_source.split('\n') start = max(0, node.line_start - 1) end = node.line_end func_source = '\n'.join(lines[start:end]) except Exception as e: logger.warning(f"Could not read source for {fqn}: {e}") continue # Generate summary summary = await self.generator.generate_summary(node, func_source) if summary: # Update node node.summary = summary # Update vector store # We need to pass full source to update_function_node as it re-chunks self.vector_store.update_function_node(node, full_source) logger.info(f"Generated and stored summary for {fqn}") except Exception as e: logger.error(f"Error processing summary for {fqn}: {e}") finally: self.queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"Worker error: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mrorigo/code-flow-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server