Skip to main content
Glama

Gemini MCP Server

log_utils.pyโ€ข10.4 kB
""" Centralized log utility for simulator tests. This module provides common log reading and parsing functionality used across multiple simulator test files to reduce code duplication. """ import logging import re import subprocess from typing import Optional, Union class LogUtils: """Centralized logging utilities for simulator tests.""" # Log file paths MAIN_LOG_FILE = "logs/mcp_server.log" ACTIVITY_LOG_FILE = "logs/mcp_activity.log" @classmethod def get_server_logs_since(cls, since_time: Optional[str] = None) -> str: """ Get server logs from both main and activity log files. Args: since_time: Currently ignored, returns all available logs Returns: Combined logs from both log files """ try: main_logs = "" activity_logs = "" # Read main server log try: with open(cls.MAIN_LOG_FILE) as f: main_logs = f.read() except FileNotFoundError: pass # Read activity log try: with open(cls.ACTIVITY_LOG_FILE) as f: activity_logs = f.read() except FileNotFoundError: pass return main_logs + "\n" + activity_logs except Exception as e: logging.warning(f"Failed to read server logs: {e}") return "" @classmethod def get_recent_server_logs(cls, lines: int = 500) -> str: """ Get recent server logs from the main log file. Args: lines: Number of recent lines to retrieve (default: 500) Returns: Recent log content as string """ try: with open(cls.MAIN_LOG_FILE) as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return "".join(recent_lines) except FileNotFoundError: logging.warning(f"Log file {cls.MAIN_LOG_FILE} not found") return "" except Exception as e: logging.warning(f"Failed to read recent server logs: {e}") return "" @classmethod def get_server_logs_subprocess(cls, lines: int = 500) -> str: """ Get server logs using subprocess (alternative method). Args: lines: Number of recent lines to retrieve Returns: Recent log content as string """ try: result = subprocess.run( ["tail", "-n", str(lines), cls.MAIN_LOG_FILE], capture_output=True, text=True, timeout=10 ) return result.stdout + result.stderr except Exception as e: logging.warning(f"Failed to get server logs via subprocess: {e}") return "" @classmethod def check_server_logs_for_errors(cls, lines: int = 500) -> list[str]: """ Check server logs for error messages. Args: lines: Number of recent lines to check Returns: List of error messages found """ logs = cls.get_recent_server_logs(lines) error_patterns = [r"ERROR.*", r"CRITICAL.*", r"Failed.*", r"Exception.*", r"Error:.*"] errors = [] for line in logs.split("\n"): for pattern in error_patterns: if re.search(pattern, line, re.IGNORECASE): errors.append(line.strip()) break return errors @classmethod def extract_conversation_usage_logs(cls, logs: str) -> list[dict[str, int]]: """ Extract token budget calculation information from logs. Args: logs: Log content to parse Returns: List of dictionaries containing token usage data """ usage_data = [] pattern = r"\[CONVERSATION_DEBUG\] Token budget calculation:" for line in logs.split("\n"): if re.search(pattern, line): # Parse the token usage information usage_info = {} # Extract total capacity capacity_match = re.search(r"Total capacity: ([\d,]+)", line) if capacity_match: usage_info["total_capacity"] = int(capacity_match.group(1).replace(",", "")) # Extract content allocation content_match = re.search(r"Content allocation: ([\d,]+)", line) if content_match: usage_info["content_allocation"] = int(content_match.group(1).replace(",", "")) # Extract conversation tokens conv_match = re.search(r"Conversation tokens: ([\d,]+)", line) if conv_match: usage_info["conversation_tokens"] = int(conv_match.group(1).replace(",", "")) # Extract remaining tokens remaining_match = re.search(r"Remaining tokens: ([\d,]+)", line) if remaining_match: usage_info["remaining_tokens"] = int(remaining_match.group(1).replace(",", "")) if usage_info: usage_data.append(usage_info) return usage_data @classmethod def extract_conversation_token_usage(cls, logs: str) -> list[int]: """ Extract conversation token usage values from logs. Args: logs: Log content to parse Returns: List of token usage values """ pattern = r"Conversation history token usage:\s*([\d,]+)" usage_values = [] for match in re.finditer(pattern, logs): usage_value = int(match.group(1).replace(",", "")) usage_values.append(usage_value) return usage_values @classmethod def extract_thread_creation_logs(cls, logs: str) -> list[dict[str, str]]: """ Extract thread creation logs with parent relationships. Args: logs: Log content to parse Returns: List of dictionaries with thread relationship data """ thread_data = [] pattern = r"\[THREAD\] Created new thread (\w+)(?: with parent (\w+))?" for match in re.finditer(pattern, logs): thread_info = {"thread_id": match.group(1), "parent_id": match.group(2) if match.group(2) else None} thread_data.append(thread_info) return thread_data @classmethod def extract_history_traversal_logs(cls, logs: str) -> list[dict[str, Union[str, int]]]: """ Extract conversation history traversal logs. Args: logs: Log content to parse Returns: List of dictionaries with traversal data """ traversal_data = [] pattern = r"\[THREAD\] Retrieved chain of (\d+) messages for thread (\w+)" for match in re.finditer(pattern, logs): traversal_info = {"chain_length": int(match.group(1)), "thread_id": match.group(2)} traversal_data.append(traversal_info) return traversal_data @classmethod def validate_file_deduplication_in_logs(cls, logs: str, tool_name: str, test_file: str) -> bool: """ Validate that logs show file deduplication behavior. Args: logs: Log content to parse tool_name: Name of the tool being tested test_file: Name of the test file to check for deduplication Returns: True if deduplication evidence is found, False otherwise """ # Look for embedding calculation embedding_pattern = f"Calculating embeddings for {test_file}" has_embedding = bool(re.search(embedding_pattern, logs)) # Look for filtering message filtering_pattern = f"Filtering {test_file} to prevent duplication" has_filtering = bool(re.search(filtering_pattern, logs)) # Look for skip message skip_pattern = f"Skipping {test_file} \\(already processed" has_skip = bool(re.search(skip_pattern, logs)) # Look for tool-specific processing tool_pattern = f"\\[{tool_name.upper()}\\].*{test_file}" has_tool_processing = bool(re.search(tool_pattern, logs, re.IGNORECASE)) # Deduplication is confirmed if we see evidence of processing and filtering/skipping return has_embedding and (has_filtering or has_skip) and has_tool_processing @classmethod def search_logs_for_pattern( cls, pattern: str, logs: Optional[str] = None, case_sensitive: bool = False ) -> list[str]: """ Search logs for a specific pattern. Args: pattern: Regex pattern to search for logs: Log content to search (if None, reads recent logs) case_sensitive: Whether the search should be case sensitive Returns: List of matching lines """ if logs is None: logs = cls.get_recent_server_logs() flags = 0 if case_sensitive else re.IGNORECASE matches = [] for line in logs.split("\n"): if re.search(pattern, line, flags): matches.append(line.strip()) return matches @classmethod def get_log_file_info(cls) -> dict[str, dict[str, Union[str, int, bool]]]: """ Get information about log files. Returns: Dictionary with file information for each log file """ import os file_info = {} for log_file in [cls.MAIN_LOG_FILE, cls.ACTIVITY_LOG_FILE]: if os.path.exists(log_file): stat = os.stat(log_file) file_info[log_file] = { "exists": True, "size_bytes": stat.st_size, "size_mb": round(stat.st_size / (1024 * 1024), 2), "last_modified": stat.st_mtime, "readable": os.access(log_file, os.R_OK), } else: file_info[log_file] = { "exists": False, "size_bytes": 0, "size_mb": 0, "last_modified": 0, "readable": False, } return file_info

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BeehiveInnovations/gemini-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server