Skip to main content
Glama

CCGLM MCP Server

by nosolosoft
logging_utils.py14 kB
#!/usr/bin/env python3 """ Advanced logging utilities for CCGLM-MCP server Provides dual logging: stderr (human-readable) + JSONL file (structured) """ import json import logging import logging.handlers import os import re import hashlib import time import uuid import threading from datetime import datetime, timezone from logging.handlers import RotatingFileHandler from pathlib import Path from typing import Any, Dict, Optional from queue import Queue class SafeJSONFormatter(logging.Formatter): """ JSON formatter that sanitizes sensitive data and truncates long fields """ def __init__(self, max_preview_len: int = 512, max_trace_len: int = 4000): super().__init__() self.max_preview_len = max_preview_len self.max_trace_len = max_trace_len # Patterns to redact sensitive information self.sensitive_patterns = [ (re.compile(r'(token|api[_-]?key|secret|authorization|password|bearer)[\'"\s]*[:=][\'"\s]*([a-zA-Z0-9_-]+)', re.IGNORECASE), '***REDACTED***'), (re.compile(r'GLM_AUTH_TOKEN[\'"\s]*[:=][\'"\s]*([a-zA-Z0-9._-]+)', re.IGNORECASE), 'GLM_AUTH_TOKEN=***REDACTED***'), ] def format(self, record: logging.LogRecord) -> str: """Format log record as JSON line""" try: # Create base log entry log_entry = { "ts": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'), "level": record.levelname, "logger": record.name, } # Add structured data if present in record.__dict__ if hasattr(record, 'event'): log_entry['event'] = record.event # Add standard fields if present for field in ['request_id', 'session_id', 'instance_id', 'tool', 'method', 'prompt_preview', 'prompt_sha256', 'response_preview', 'latency_ms', 'exit_code', 'files_created', 'files_modified', 'new_files', 'modified_files', 'stderr_preview', 'model', 'transport', 'pid', 'error_type', 'error_message', 'traceback', 'cmd_preview', 'cwd']: if hasattr(record, field): log_entry[field] = getattr(record, field) # Handle message field if hasattr(record, 'msg') and record.msg: if isinstance(record.msg, dict): # If msg is already a dict, merge it log_entry.update(record.msg) else: # If msg is a string, add as message field log_entry['message'] = str(record.msg) # Sanitize sensitive data log_entry = self._sanitize_dict(log_entry) # Truncate long fields log_entry = self._truncate_fields(log_entry) return json.dumps(log_entry, ensure_ascii=False, default=str) except Exception as e: # Fallback to simple format if JSON formatting fails return json.dumps({ "ts": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'), "level": "ERROR", "logger": "ccglm-mcp", "event": "format_error", "error": f"Failed to format log: {str(e)}", "original_message": str(record.msg) if hasattr(record, 'msg') else "" }, ensure_ascii=False) def _sanitize_dict(self, data: Any) -> Any: """Recursively sanitize dictionary values""" if isinstance(data, dict): sanitized = {} for key, value in data.items(): # Check if key name looks sensitive if any(pattern[0].search(key) for pattern in self.sensitive_patterns): sanitized[key] = "***REDACTED***" else: sanitized[key] = self._sanitize_dict(value) return sanitized elif isinstance(data, (list, tuple)): return [self._sanitize_dict(item) for item in data] elif isinstance(data, str): # Check for sensitive patterns in string values for pattern, replacement in self.sensitive_patterns: data = pattern.sub(replacement, data) return data else: return data def _truncate_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: """Truncate long text fields""" truncated = data.copy() # Text fields to truncate text_fields = ['prompt_preview', 'response_preview', 'stderr_preview', 'message'] for field in text_fields: if field in truncated and isinstance(truncated[field], str): if len(truncated[field]) > self.max_preview_len: truncated[field] = truncated[field][:self.max_preview_len] + "...[TRUNCATED]" # Traceback field if 'traceback' in truncated and isinstance(truncated['traceback'], str): if len(truncated['traceback']) > self.max_trace_len: truncated['traceback'] = truncated['traceback'][:self.max_trace_len] + "...[TRUNCATED]" # Limit array sizes array_fields = ['new_files', 'modified_files'] for field in array_fields: if field in truncated and isinstance(truncated[field], (list, tuple)): if len(truncated[field]) > 10: truncated[field] = list(truncated[field][:10]) + [f"...and {len(truncated[field]) - 10} more"] return truncated def hash_text(text: str) -> str: """Calculate SHA256 hash of text for correlation without exposing content""" return hashlib.sha256(text.encode('utf-8')).hexdigest()[:16] class CCGLMLogger: """Enhanced logger with dual sinks (stderr + JSONL file)""" def __init__(self, name: str = "ccglm-mcp"): self.name = name self.instance_id = str(uuid.uuid4()) self.pid = os.getpid() self.session_id = os.getenv('CLAUDE_SESSION') # Determine log directory and file path self.log_dir = self._get_log_directory() self.log_file = self._get_log_file_path() # Setup logging infrastructure self.logger = self._setup_logging() # Log startup self.logger.info({ "event": "startup", "log_file": str(self.log_file), "log_dir": str(self.log_dir) }) def _get_log_directory(self) -> Path: """Determine log directory based on environment variables""" # Priority: CCGLM_MCP_LOG_PATH > CCGLM_MCP_LOG_DIR > CLAUDE_LOG_DIR > ~/.claude/logs if os.getenv('CCGLM_MCP_LOG_PATH'): return Path(os.getenv('CCGLM_MCP_LOG_PATH')).parent log_dir = ( os.getenv('CCGLM_MCP_LOG_DIR') or os.getenv('CLAUDE_LOG_DIR') or Path.home() / '.claude' / 'logs' ) return Path(log_dir) def _get_log_file_path(self) -> Path: """Determine log file path""" # Check if full path is specified if os.getenv('CCGLM_MCP_LOG_PATH'): return Path(os.getenv('CCGLM_MCP_LOG_PATH')) # Determine per-process logging per_process = os.getenv('CCGLM_MCP_PER_PROCESS_LOGS', 'true').lower() == 'true' if per_process: filename = f"ccglm-mcp-{self.pid}.jsonl" else: filename = "ccglm-mcp.jsonl" return self.log_dir / filename def _setup_logging(self) -> logging.Logger: """Setup dual logging infrastructure""" logger = logging.getLogger(self.name) logger.setLevel(logging.INFO) # Clear any existing handlers logger.handlers.clear() # Determine log level from environment log_level = os.getenv('CCGLM_MCP_LOG_LEVEL', 'INFO').upper() logger.setLevel(getattr(logging, log_level, logging.INFO)) # Create log directory if it doesn't exist try: self.log_dir.mkdir(parents=True, exist_ok=True) except PermissionError: # Fallback to local logs directory fallback_dir = Path.cwd() / 'logs' fallback_dir.mkdir(exist_ok=True) self.log_dir = fallback_dir self.log_file = fallback_dir / self.log_file.name # Setup queue for non-blocking logging self.log_queue = Queue(maxsize=10000) queue_handler = logging.handlers.QueueHandler(self.log_queue) # Setup file handler with JSON formatter try: file_handler = RotatingFileHandler( self.log_file, maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, delay=True, encoding='utf-8' ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(SafeJSONFormatter()) # Setup stderr handler with human-readable format stderr_handler = logging.StreamHandler() stderr_handler.setLevel(logging.INFO) stderr_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) stderr_handler.setFormatter(stderr_formatter) # Setup queue listener self.queue_listener = logging.handlers.QueueListener( self.log_queue, file_handler, stderr_handler, respect_handler_level=True ) self.queue_listener.start() # Add queue handler to logger logger.addHandler(queue_handler) logger.propagate = False except Exception as e: # Fallback to basic stderr logging only print(f"Warning: Failed to setup file logging: {e}") basic_handler = logging.StreamHandler() basic_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) logger.addHandler(basic_handler) return logger def create_request_context(self, tool: str, args: Dict[str, Any]) -> Dict[str, Any]: """Create request context with sanitized data""" request_id = str(uuid.uuid4()) prompt = args.get('prompt', '') return { "request_id": request_id, "session_id": self.session_id, "instance_id": self.instance_id, "tool": tool, "method": "call_tool", "pid": self.pid, "prompt_preview": prompt[:self.max_preview_len] if hasattr(self, 'max_preview_len') else prompt[:512], "prompt_sha256": hash_text(prompt) if prompt else None, "args": self._sanitize_args(args) } def _sanitize_args(self, args: Dict[str, Any]) -> Dict[str, Any]: """Sanitize function arguments""" sanitized = args.copy() if 'prompt' in sanitized: prompt = sanitized['prompt'] if len(prompt) > 512: sanitized['prompt'] = prompt[:512] + "...[TRUNCATED]" return sanitized def log_request(self, context: Dict[str, Any]) -> None: """Log request event""" log_data = { "event": "request", **context } self.logger.info(log_data) def log_response(self, context: Dict[str, Any], result: Dict[str, Any], start_time: float) -> None: """Log response event""" latency_ms = (time.perf_counter() - start_time) * 1000 log_data = { "event": "response", **context, "latency_ms": round(latency_ms, 2), "model": result.get('model', 'glm-4.6'), "response_preview": str(result.get('response', ''))[:512], "files_created": result.get('files_created', 0), "files_modified": result.get('files_modified', 0), "new_files": result.get('new_files', [])[:10], "modified_files": result.get('modified_files', [])[:10] } if 'error' in result: log_data['error_type'] = 'MCPError' log_data['error_message'] = result['error'] self.logger.info(log_data) def log_error(self, context: Dict[str, Any], error: Exception, start_time: float) -> None: """Log error event""" latency_ms = (time.perf_counter() - start_time) * 1000 log_data = { "event": "error", **context, "latency_ms": round(latency_ms, 2), "error_type": type(error).__name__, "error_message": str(error) } # Add traceback if available import traceback log_data['traceback'] = traceback.format_exc() self.logger.error(log_data, exc_info=False) def log_process_event(self, context: Dict[str, Any], step: str, cmd_preview: str = None, cwd: str = None, **kwargs) -> None: """Log subprocess process events""" log_data = { "event": "process", **context, "step": step } if cmd_preview: log_data['cmd_preview'] = cmd_preview if cwd: log_data['cwd'] = cwd log_data.update(kwargs) self.logger.info(log_data) def shutdown(self) -> None: """Graceful shutdown""" try: self.logger.info({ "event": "shutdown", "instance_id": self.instance_id, "pid": self.pid }) # Stop queue listener if it exists if hasattr(self, 'queue_listener'): self.queue_listener.stop() except Exception: pass # Ignore errors during shutdown # Global logger instance _ccglm_logger = None def get_logger() -> CCGLMLogger: """Get or create global CCGLM logger instance""" global _ccglm_logger if _ccglm_logger is None: _ccglm_logger = CCGLMLogger() return _ccglm_logger

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nosolosoft/ccglm-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server