Skip to main content
Glama
thhart
by thhart
server.py20.3 kB
"""MCP server for inspecting log files in XDG_RUNTIME_DIR/log.""" import os import re import sys import argparse from pathlib import Path from mcp.server import Server from mcp.types import Tool, TextContent, Prompt, PromptMessage, GetPromptResult import mcp.server.stdio # Global variable to store the log directory paths LOG_DIRECTORIES = [] app = Server("log-inspector") def get_log_directories() -> list[Path]: """Get the log directory paths based on priority: CLI args > env var > default.""" global LOG_DIRECTORIES if LOG_DIRECTORIES: return [Path(d) for d in LOG_DIRECTORIES] # Check environment variable (colon-separated like PATH) if env_dirs := os.getenv("LOG_MCP_DIR"): return [Path(d.strip()) for d in env_dirs.split(":") if d.strip()] # Default to XDG_RUNTIME_DIR/log xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR") if not xdg_runtime_dir: raise ValueError("XDG_RUNTIME_DIR not set and no log directory specified") return [Path(xdg_runtime_dir) / "log"] def resolve_log_file(filename: str) -> tuple[Path, Path]: """ Resolve a filename to a full path within allowed directories. Returns: (log_dir, log_file) tuple Raises: ValueError if file is not found or not in allowed directories """ directories = get_log_directories() # If filename is already an absolute path, validate it's in allowed dirs file_path = Path(filename) if file_path.is_absolute(): try: resolved = file_path.resolve() for log_dir in directories: if str(resolved).startswith(str(log_dir.resolve())): return log_dir, resolved except Exception: pass raise ValueError(f"File not in any allowed log directory: {filename}") # Try to find the file in each directory for log_dir in directories: log_file = log_dir / filename if log_file.exists(): return log_dir, log_file.resolve() # If not found, use the first directory (for error messages) if directories: return directories[0], (directories[0] / filename).resolve() raise ValueError("No log directories configured") @app.list_prompts() async def list_prompts() -> list[Prompt]: """List available prompts.""" return [ Prompt( name="runtime-logs", description="Information about runtime log inspection capabilities", ) ] @app.get_prompt() async def get_prompt(name: str, arguments: dict | None = None) -> GetPromptResult: """Get prompt content.""" if name == "runtime-logs": try: log_dirs = get_log_directories() log_dirs_str = "\n".join(f" - {d}" for d in log_dirs) except ValueError: log_dirs_str = " - /run/user/[UID]/log" message = f"""# Runtime Log Inspection Available This MCP server provides access to runtime logs stored in: {log_dirs_str} ## Important: When to Use Log Inspection **ALWAYS check runtime logs when:** - The user reports errors or problems with their code - There are runtime failures, crashes, or unexpected behavior - The user mentions something "not working" or "failing" - Debugging is needed for any application or service - You need to understand what happened during execution ## Available Tools 1. **list_log_files** - Lists all available log files in the runtime directory 2. **get_log_content** - Reads the content of a specific log file (for small files) 3. **read_log_paginated** - Reads a portion of a log file with pagination (for large files) 4. **search_log_file** - Searches log files with regex and shows matching lines with context ## Recommended Workflow When a user reports a problem: 1. First use `list_log_files` to see what logs are available 2. Then use `get_log_content` to inspect relevant log files 3. Analyze the logs to identify the root cause 4. Provide solutions based on the actual error messages found These logs contain real-time diagnostic information that is invaluable for troubleshooting.""" return GetPromptResult( messages=[ PromptMessage( role="user", content=TextContent( type="text", text=message ) ) ] ) else: raise ValueError(f"Unknown prompt: {name}") @app.list_tools() async def list_tools() -> list[Tool]: """List available tools.""" return [ Tool( name="list_log_files", description="Lists all log files in $XDG_RUNTIME_DIR/log. Use this FIRST when user reports errors or problems to see what logs are available for inspection.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_log_content", description="Returns the content of a specific log file from $XDG_RUNTIME_DIR/log. Use this to inspect runtime logs when debugging errors or investigating problems. For large files, use read_log_paginated instead.", inputSchema={ "type": "object", "properties": { "filename": { "type": "string", "description": "Name of the log file to read" } }, "required": ["filename"] } ), Tool( name="read_log_paginated", description="Reads a paginated portion of a log file. Useful for large log files. Returns specific line ranges.", inputSchema={ "type": "object", "properties": { "filename": { "type": "string", "description": "Name of the log file to read" }, "start_line": { "type": "integer", "description": "Starting line number (1-based, default: 1)", "default": 1 }, "num_lines": { "type": "integer", "description": "Number of lines to read (default: 100, max: 1000)", "default": 100 } }, "required": ["filename"] } ), Tool( name="search_log_file", description="Searches a log file using regex pattern and returns matching lines with surrounding context. Supports pagination of results.", inputSchema={ "type": "object", "properties": { "filename": { "type": "string", "description": "Name of the log file to search" }, "pattern": { "type": "string", "description": "Regex pattern to search for" }, "context_lines": { "type": "integer", "description": "Number of lines to show before and after each match (default: 2, max: 10)", "default": 2 }, "case_sensitive": { "type": "boolean", "description": "Whether the search should be case-sensitive (default: false)", "default": False }, "max_matches": { "type": "integer", "description": "Maximum number of matches to return (default: 50, max: 500)", "default": 50 }, "skip_matches": { "type": "integer", "description": "Number of matches to skip (for pagination, default: 0)", "default": 0 } }, "required": ["filename", "pattern"] } ) ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls.""" if name == "list_log_files": try: log_dirs = get_log_directories() except ValueError as e: return [TextContent( type="text", text=f"Error: {e}" )] all_log_files = [] errors = [] # Scan all log directories for log_dir in log_dirs: if not log_dir.exists(): errors.append(f"Directory does not exist: {log_dir}") continue if not log_dir.is_dir(): errors.append(f"Path exists but is not a directory: {log_dir}") continue try: for item in log_dir.iterdir(): if item.is_file(): all_log_files.append(str(item.absolute())) except PermissionError: errors.append(f"Permission denied accessing: {log_dir}") if not all_log_files and not errors: return [TextContent( type="text", text=f"No log files found in any directory" )] result = f"Scanning {len(log_dirs)} log director{'y' if len(log_dirs) == 1 else 'ies'}:\n" result += "\n".join(f" - {d}" for d in log_dirs) result += f"\n\nFound {len(all_log_files)} log file(s):\n\n" result += "\n".join(sorted(all_log_files)) if errors: result += "\n\nWarnings:\n" + "\n".join(f" - {e}" for e in errors) return [TextContent(type="text", text=result)] elif name == "get_log_content": filename = arguments.get("filename") if not filename: return [TextContent( type="text", text="Error: filename parameter is required" )] try: log_dir, log_file = resolve_log_file(filename) except ValueError as e: return [TextContent( type="text", text=f"Error: {e}" )] if not log_file.exists(): return [TextContent( type="text", text=f"Log file does not exist: {log_file}" )] if not log_file.is_file(): return [TextContent( type="text", text=f"Path exists but is not a file: {log_file}" )] try: content = log_file.read_text() return [TextContent( type="text", text=f"Content of {log_file}:\n\n{content}" )] except PermissionError: return [TextContent( type="text", text=f"Permission denied reading: {log_file}" )] except Exception as e: return [TextContent( type="text", text=f"Error reading file: {e}" )] elif name == "read_log_paginated": filename = arguments.get("filename") start_line = arguments.get("start_line", 1) num_lines = arguments.get("num_lines", 100) if not filename: return [TextContent( type="text", text="Error: filename parameter is required" )] # Validate parameters if start_line < 1: return [TextContent( type="text", text="Error: start_line must be >= 1" )] if num_lines < 1 or num_lines > 1000: return [TextContent( type="text", text="Error: num_lines must be between 1 and 1000" )] try: log_dir, log_file = resolve_log_file(filename) except ValueError as e: return [TextContent( type="text", text=f"Error: {e}" )] if not log_file.exists(): return [TextContent( type="text", text=f"Log file does not exist: {log_file}" )] if not log_file.is_file(): return [TextContent( type="text", text=f"Path exists but is not a file: {log_file}" )] try: with open(log_file, 'r') as f: # Read all lines to count total all_lines = f.readlines() total_lines = len(all_lines) # Calculate the slice end_line = min(start_line - 1 + num_lines, total_lines) lines = all_lines[start_line - 1:end_line] result = f"File: {log_file}\n" result += f"Total lines: {total_lines}\n" result += f"Showing lines {start_line}-{start_line - 1 + len(lines)}\n" result += f"\n{'=' * 60}\n\n" for i, line in enumerate(lines, start=start_line): result += f"{i:6d} | {line}" if end_line < total_lines: result += f"\n\n... {total_lines - end_line} more lines available ..." return [TextContent(type="text", text=result)] except PermissionError: return [TextContent( type="text", text=f"Permission denied reading: {log_file}" )] except Exception as e: return [TextContent( type="text", text=f"Error reading file: {e}" )] elif name == "search_log_file": filename = arguments.get("filename") pattern = arguments.get("pattern") context_lines = arguments.get("context_lines", 2) case_sensitive = arguments.get("case_sensitive", False) max_matches = arguments.get("max_matches", 50) skip_matches = arguments.get("skip_matches", 0) if not filename: return [TextContent( type="text", text="Error: filename parameter is required" )] if not pattern: return [TextContent( type="text", text="Error: pattern parameter is required" )] # Validate parameters if context_lines < 0 or context_lines > 10: return [TextContent( type="text", text="Error: context_lines must be between 0 and 10" )] if max_matches < 1 or max_matches > 500: return [TextContent( type="text", text="Error: max_matches must be between 1 and 500" )] if skip_matches < 0: return [TextContent( type="text", text="Error: skip_matches must be >= 0" )] try: log_dir, log_file = resolve_log_file(filename) except ValueError as e: return [TextContent( type="text", text=f"Error: {e}" )] if not log_file.exists(): return [TextContent( type="text", text=f"Log file does not exist: {log_file}" )] if not log_file.is_file(): return [TextContent( type="text", text=f"Path exists but is not a file: {log_file}" )] # Compile regex pattern try: flags = 0 if case_sensitive else re.IGNORECASE regex = re.compile(pattern, flags) except re.error as e: return [TextContent( type="text", text=f"Error: Invalid regex pattern: {e}" )] try: with open(log_file, 'r') as f: lines = f.readlines() total_lines = len(lines) # Find all matches matches = [] for i, line in enumerate(lines): if regex.search(line): matches.append(i) total_matches = len(matches) if total_matches == 0: return [TextContent( type="text", text=f"No matches found for pattern: {pattern}" )] # Apply pagination paginated_matches = matches[skip_matches:skip_matches + max_matches] if not paginated_matches: return [TextContent( type="text", text=f"No more matches (total: {total_matches}, skipped: {skip_matches})" )] result = f"File: {log_file}\n" result += f"Pattern: {pattern}\n" result += f"Total matches: {total_matches}\n" result += f"Showing matches {skip_matches + 1}-{skip_matches + len(paginated_matches)}\n" result += f"Context lines: {context_lines}\n" result += f"\n{'=' * 60}\n\n" for match_idx in paginated_matches: # Calculate context range start = max(0, match_idx - context_lines) end = min(total_lines, match_idx + context_lines + 1) # Show context for i in range(start, end): line_num = i + 1 marker = ">>>" if i == match_idx else " " result += f"{marker} {line_num:6d} | {lines[i]}" result += f"\n{'-' * 60}\n\n" if skip_matches + len(paginated_matches) < total_matches: remaining = total_matches - (skip_matches + len(paginated_matches)) result += f"... {remaining} more matches available (use skip_matches={skip_matches + len(paginated_matches)}) ..." return [TextContent(type="text", text=result)] except PermissionError: return [TextContent( type="text", text=f"Permission denied reading: {log_file}" )] except Exception as e: return [TextContent( type="text", text=f"Error searching file: {e}" )] else: return [TextContent( type="text", text=f"Unknown tool: {name}" )] async def main(): """Run the MCP server.""" async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) def run(): """Entry point for the log-mcp command.""" global LOG_DIRECTORIES parser = argparse.ArgumentParser( description="MCP server for inspecting log files", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Log directory priority (highest to lowest): 1. --log-dir command line arguments (can specify multiple) 2. LOG_MCP_DIR environment variable (colon-separated paths) 3. $XDG_RUNTIME_DIR/log (default) Examples: log-mcp # Use default $XDG_RUNTIME_DIR/log log-mcp --log-dir /var/log # Use single custom directory log-mcp --log-dir /var/log --log-dir /tmp/logs # Use multiple directories LOG_MCP_DIR=/var/log:/tmp/logs log-mcp # Use environment variable (colon-separated) """ ) parser.add_argument( "--log-dir", type=str, action="append", dest="log_dirs", help="Add a log directory to search (can be specified multiple times)" ) args = parser.parse_args() if args.log_dirs: LOG_DIRECTORIES = args.log_dirs # Validate directories exist for log_dir in LOG_DIRECTORIES: if not Path(log_dir).exists(): print(f"Warning: Log directory does not exist: {log_dir}", file=sys.stderr) import asyncio asyncio.run(main()) if __name__ == "__main__": run()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/thhart/log-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server