Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
llm_json_fixer.py12.2 kB
#!/usr/bin/env python3 """ LLM JSON Fixer Module A battle-tested JSON repair utility for handling malformed LLM outputs """ import json import re import logging from typing import Any, Dict, Optional, Tuple logger = logging.getLogger(__name__) class LLMJSONFixer: """Robust JSON fixer for LLM outputs""" @staticmethod def fix_json(json_str: str, aggressive: bool = True) -> str: """ Fix common JSON issues in LLM outputs Args: json_str: Potentially malformed JSON string aggressive: If True, applies more aggressive fixes Returns: Fixed JSON string """ if not json_str.strip(): return "{}" # Apply fixes in order of least to most aggressive fixed = json_str # Level 1: Basic fixes fixed = LLMJSONFixer._extract_json_object(fixed) fixed = LLMJSONFixer._fix_basic_issues(fixed) # Level 2: Structural fixes fixed = LLMJSONFixer._fix_missing_commas(fixed) fixed = LLMJSONFixer._fix_quotes_and_escapes(fixed) if aggressive: # Level 3: Aggressive fixes fixed = LLMJSONFixer._fix_newlines_in_strings(fixed) fixed = LLMJSONFixer._fix_unterminated_strings(fixed) fixed = LLMJSONFixer._ensure_proper_closure(fixed) return fixed @staticmethod def parse_with_fallbacks(content: str) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """ Try multiple strategies to parse JSON Returns: Tuple of (parsed_dict, error_message) """ # Strategy 1: Direct parse try: return json.loads(content), None except json.JSONDecodeError as e: error1 = f"Direct parse failed: {e}" # Strategy 2: Extract and parse try: extracted = LLMJSONFixer._extract_json_object(content) return json.loads(extracted), None except json.JSONDecodeError as e: error2 = f"Extracted parse failed: {e}" # Strategy 3: Fix and parse try: fixed = LLMJSONFixer.fix_json(content, aggressive=False) return json.loads(fixed), None except json.JSONDecodeError as e: error3 = f"Fixed parse failed: {e}" # Strategy 4: Aggressive fix and parse try: fixed = LLMJSONFixer.fix_json(content, aggressive=True) return json.loads(fixed), None except json.JSONDecodeError as e: error4 = f"Aggressive fix failed: {e}" # Strategy 5: Last resort - try to salvage what we can try: # Try to extract key-value pairs manually salvaged = LLMJSONFixer._salvage_json(content) if salvaged: return salvaged, "Warning: JSON was salvaged, may be incomplete" except Exception as e: error5 = f"Salvage failed: {e}" # All strategies failed all_errors = "\n".join([error1, error2, error3, error4, error5]) return None, all_errors @staticmethod def _extract_json_object(text: str) -> str: """Extract JSON object from surrounding text""" # Try to find JSON in markdown code blocks code_block_match = re.search(r'```(?:json)?\s*(.*?)\s*```', text, re.DOTALL) if code_block_match: return code_block_match.group(1).strip() # Try to find raw JSON object json_match = re.search(r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})', text, re.DOTALL) if json_match: return json_match.group(1) return text.strip() @staticmethod def _fix_basic_issues(json_str: str) -> str: """Fix basic JSON formatting issues""" # Remove trailing commas json_str = re.sub(r',\s*}', '}', json_str) json_str = re.sub(r',\s*]', ']', json_str) # Remove comments (not valid in JSON) json_str = re.sub(r'//.*$', '', json_str, flags=re.MULTILINE) json_str = re.sub(r'/\*.*?\*/', '', json_str, flags=re.DOTALL) # Fix single quotes (JSON requires double quotes) # This is tricky - only fix quotes that are clearly field/value delimiters json_str = re.sub(r"'([^']*)'(\s*:)", r'"\1"\2', json_str) # Field names json_str = re.sub(r":\s*'([^']*)'", r': "\1"', json_str) # String values return json_str @staticmethod def _fix_missing_commas(json_str: str) -> str: """Add missing commas between elements""" # Add commas between consecutive strings json_str = re.sub(r'"\s*\n\s*"', '",\n"', json_str) # Add commas between array elements json_str = re.sub(r'}\s*\n\s*{', '},\n{', json_str) json_str = re.sub(r']\s*\n\s*\[', '],\n[', json_str) # Add commas after values before new fields # String values json_str = re.sub(r'(":\s*"[^"]*")\s*\n\s*"', r'\1,\n"', json_str) # Number values json_str = re.sub(r'(":\s*\d+(?:\.\d+)?)\s*\n\s*"', r'\1,\n"', json_str) # Boolean values json_str = re.sub(r'(":\s*(?:true|false))\s*\n\s*"', r'\1,\n"', json_str) # Null values json_str = re.sub(r'(":\s*null)\s*\n\s*"', r'\1,\n"', json_str) # Array values json_str = re.sub(r'(":\s*\[[^\]]*\])\s*\n\s*"', r'\1,\n"', json_str) # Object values (careful with nested objects) json_str = re.sub(r'(":\s*\{[^}]*\})\s*\n\s*"', r'\1,\n"', json_str) return json_str @staticmethod def _fix_quotes_and_escapes(json_str: str) -> str: """Fix quote and escape issues""" # Fix improperly escaped quotes inside strings # This is complex - we need to identify string boundaries first # Fix obvious cases of unescaped quotes # Look for patterns like: "text "quoted" text" def fix_internal_quotes(match): content = match.group(1) # Escape any unescaped quotes inside fixed = re.sub(r'(?<!\\)"', r'\"', content) return f'"{fixed}"' # Apply to string values (after colons) json_str = re.sub(r':\s*"([^"]*(?:"[^"]*)*)"', lambda m: ': ' + fix_internal_quotes(m), json_str) return json_str @staticmethod def _fix_newlines_in_strings(json_str: str) -> str: """Replace newlines inside strings with spaces""" def fix_newlines(match): content = match.group(1) # Replace newlines with spaces fixed = content.replace('\n', ' ').replace('\r', ' ') # Collapse multiple spaces fixed = re.sub(r'\s+', ' ', fixed) return f'"{fixed}"' # Find strings and fix newlines within them # This is tricky because we need to handle escaped quotes parts = [] in_string = False current_string_start = 0 i = 0 while i < len(json_str): if json_str[i] == '"' and (i == 0 or json_str[i-1] != '\\'): if not in_string: in_string = True current_string_start = i else: # End of string string_content = json_str[current_string_start:i+1] fixed_string = fix_newlines(re.match(r'"(.*)"', string_content, re.DOTALL)) parts.append(json_str[len(''.join(parts)):current_string_start]) parts.append(fixed_string) in_string = False i += 1 if not in_string: parts.append(json_str[len(''.join(parts)):]) return ''.join(parts) else: # Unterminated string, return original return json_str @staticmethod def _fix_unterminated_strings(json_str: str) -> str: """Fix unterminated strings""" lines = json_str.split('\n') fixed_lines = [] i = 0 while i < len(lines): line = lines[i] # Count unescaped quotes unescaped_quotes = len(re.findall(r'(?<!\\)"', line)) if unescaped_quotes % 2 == 1: # Odd number of quotes - likely unterminated if i + 1 < len(lines): # Look ahead to find the terminating quote combined = line j = i + 1 while j < len(lines): combined += ' ' + lines[j].strip() total_quotes = len(re.findall(r'(?<!\\)"', combined)) if total_quotes % 2 == 0: # Found the closing quote fixed_lines.append(combined) i = j + 1 break j += 1 else: # Couldn't find closing quote, add one fixed_lines.append(line + '"') i += 1 else: # Last line with unterminated string fixed_lines.append(line + '"') i += 1 else: fixed_lines.append(line) i += 1 return '\n'.join(fixed_lines) @staticmethod def _ensure_proper_closure(json_str: str) -> str: """Ensure all brackets and braces are properly closed""" # Count opening and closing brackets/braces open_braces = json_str.count('{') close_braces = json_str.count('}') open_brackets = json_str.count('[') close_brackets = json_str.count(']') # Add missing closures if open_braces > close_braces: json_str += '}' * (open_braces - close_braces) if open_brackets > close_brackets: json_str += ']' * (open_brackets - close_brackets) return json_str @staticmethod def _salvage_json(text: str) -> Optional[Dict[str, Any]]: """Last resort - try to salvage what we can from the text""" result = {} # Try to find key-value pairs # Look for patterns like "key": "value" or "key": value patterns = [ r'"([^"]+)"\s*:\s*"([^"]*)"', # String values r'"([^"]+)"\s*:\s*(\d+(?:\.\d+)?)', # Number values r'"([^"]+)"\s*:\s*(\btrue\b|\bfalse\b)', # Boolean values r'"([^"]+)"\s*:\s*(\bnull\b)', # Null values ] for pattern in patterns: matches = re.findall(pattern, text) for key, value in matches: if key not in result: # Don't overwrite existing keys # Convert value to appropriate type if value == 'true': result[key] = True elif value == 'false': result[key] = False elif value == 'null': result[key] = None elif re.match(r'^\d+$', value): result[key] = int(value) elif re.match(r'^\d+\.\d+$', value): result[key] = float(value) else: result[key] = value return result if result else None # Convenience function for quick fixes def fix_llm_json(content: str) -> Dict[str, Any]: """ Convenience function to fix and parse LLM JSON Args: content: Potentially malformed JSON string from LLM Returns: Parsed dictionary Raises: ValueError: If JSON cannot be parsed after all fix attempts """ result, error = LLMJSONFixer.parse_with_fallbacks(content) if result is None: raise ValueError(f"Could not parse JSON after all attempts:\n{error}") return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server