Skip to main content
Glama
parser.py9.92 kB
""" JSONL parser for Shannon MCP Server. This module provides JSONL parsing with: - Strict JSON validation - Error recovery - Message type validation - Schema checking """ import json from typing import Dict, Any, Optional, List from dataclasses import dataclass import structlog from ..utils.logging import get_logger from ..utils.errors import ValidationError logger = get_logger("shannon-mcp.parser") class ParseError(Exception): """JSONL parsing error.""" pass @dataclass class MessageSchema: """Schema for message validation.""" required_fields: List[str] optional_fields: List[str] field_types: Dict[str, type] class JSONLParser: """Parses JSONL messages from Claude Code.""" # Message schemas SCHEMAS = { "partial": MessageSchema( required_fields=["type", "content"], optional_fields=["id", "timestamp"], field_types={"type": str, "content": str} ), "response": MessageSchema( required_fields=["type", "content"], optional_fields=["id", "timestamp", "token_count", "metadata"], field_types={"type": str, "content": str, "token_count": int} ), "error": MessageSchema( required_fields=["type", "error_type", "message"], optional_fields=["id", "timestamp", "details", "stack_trace"], field_types={"type": str, "error_type": str, "message": str} ), "notification": MessageSchema( required_fields=["type", "notification_type", "content"], optional_fields=["id", "timestamp", "priority"], field_types={"type": str, "notification_type": str, "content": str} ), "metric": MessageSchema( required_fields=["type", "data"], optional_fields=["id", "timestamp"], field_types={"type": str, "data": dict} ), "debug": MessageSchema( required_fields=["type", "data"], optional_fields=["id", "timestamp", "level"], field_types={"type": str, "data": dict} ), "status": MessageSchema( required_fields=["type", "status"], optional_fields=["id", "timestamp", "details", "progress"], field_types={"type": str, "status": str} ), "checkpoint": MessageSchema( required_fields=["type", "checkpoint_id"], optional_fields=["id", "timestamp", "data"], field_types={"type": str, "checkpoint_id": str} ) } def __init__(self, strict: bool = False): """ Initialize parser. Args: strict: Enable strict schema validation """ self.strict = strict self._line_count = 0 self._error_count = 0 def parse_line(self, line: str) -> Dict[str, Any]: """ Parse a JSONL line. Args: line: JSONL line to parse Returns: Parsed message dictionary Raises: ParseError: If parsing fails """ self._line_count += 1 # Strip whitespace line = line.strip() if not line: raise ParseError("Empty line") try: # Parse JSON message = json.loads(line) # Validate type if not isinstance(message, dict): raise ParseError(f"Expected dict, got {type(message).__name__}") # Validate schema if strict if self.strict: self._validate_schema(message) return message except json.JSONDecodeError as e: self._error_count += 1 raise ParseError(f"Invalid JSON at position {e.pos}: {e.msg}") from e except Exception as e: self._error_count += 1 raise ParseError(f"Parse error: {str(e)}") from e def _validate_schema(self, message: Dict[str, Any]) -> None: """ Validate message against schema. Args: message: Message to validate Raises: ValidationError: If validation fails """ # Get message type msg_type = message.get("type") if not msg_type: raise ValidationError("type", None, "Message missing 'type' field") # Get schema schema = self.SCHEMAS.get(msg_type) if not schema: # Unknown type is allowed but logged logger.warning( "unknown_message_type", message_type=msg_type, line_number=self._line_count ) return # Check required fields for field in schema.required_fields: if field not in message: raise ValidationError( field, None, f"Required field '{field}' missing for message type '{msg_type}'" ) # Check field types for field, expected_type in schema.field_types.items(): if field in message: value = message[field] if not isinstance(value, expected_type): raise ValidationError( field, value, f"Field '{field}' must be {expected_type.__name__}, got {type(value).__name__}" ) # Check for unknown fields all_fields = set(schema.required_fields) | set(schema.optional_fields) unknown_fields = set(message.keys()) - all_fields if unknown_fields and self.strict: logger.warning( "unknown_fields", message_type=msg_type, fields=list(unknown_fields) ) def parse_batch(self, lines: List[str]) -> List[Dict[str, Any]]: """ Parse multiple JSONL lines. Args: lines: Lines to parse Returns: List of parsed messages """ messages = [] errors = [] for i, line in enumerate(lines): try: message = self.parse_line(line) messages.append(message) except ParseError as e: errors.append({ "line_number": i + 1, "line": line[:100], # First 100 chars "error": str(e) }) if errors: logger.warning( "batch_parse_errors", error_count=len(errors), total_lines=len(lines), errors=errors[:5] # First 5 errors ) return messages def reset_stats(self) -> Dict[str, int]: """ Reset and return parser statistics. Returns: Statistics dictionary """ stats = { "lines_parsed": self._line_count, "errors_encountered": self._error_count, "error_rate": self._error_count / self._line_count if self._line_count > 0 else 0 } self._line_count = 0 self._error_count = 0 return stats @staticmethod def format_message( msg_type: str, content: Optional[str] = None, **kwargs ) -> str: """ Format a message for sending to Claude. Args: msg_type: Message type content: Message content **kwargs: Additional fields Returns: JSONL formatted string """ message = {"type": msg_type} if content is not None: message["content"] = content message.update(kwargs) return json.dumps(message, separators=(',', ':')) @staticmethod def validate_jsonl_file(file_path: str) -> Dict[str, Any]: """ Validate an entire JSONL file. Args: file_path: Path to JSONL file Returns: Validation results """ parser = JSONLParser(strict=True) results = { "valid": True, "total_lines": 0, "valid_lines": 0, "errors": [] } try: with open(file_path, 'r') as f: for i, line in enumerate(f): results["total_lines"] += 1 try: parser.parse_line(line) results["valid_lines"] += 1 except Exception as e: results["valid"] = False results["errors"].append({ "line": i + 1, "error": str(e), "content": line.strip()[:100] }) # Limit errors if len(results["errors"]) >= 100: results["errors"].append({ "line": -1, "error": "Too many errors, stopped processing", "content": "" }) break except Exception as e: results["valid"] = False results["errors"].append({ "line": 0, "error": f"File error: {str(e)}", "content": "" }) return results # Export public API __all__ = ['JSONLParser', 'ParseError', 'MessageSchema']

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server