text_utils.py•48.6 kB
"""
Text processing utilities for ConceptNet MCP server.
This module provides text normalization, URI construction, and language
processing utilities specifically designed for ConceptNet concept handling. It implements
Unicode-safe text processing, robust URI parsing, and performance-optimized operations
following Python best practices.
Key Features:
- Unicode-safe text normalization using NFC/NFD forms
- ConceptNet URI construction and parsing with validation
- Language detection and validation against supported languages
- Text similarity calculations with multiple algorithms
- Performance-optimized operations with caching
- Security-focused text sanitization
- Comprehensive error handling and validation
- Thread-safe operations for concurrent usage
Security Considerations:
- All text inputs are properly validated and sanitized
- URI encoding prevents injection attacks
- Unicode normalization prevents homograph attacks
- Input length limits prevent DoS attacks
"""
import functools
import re
import unicodedata
import urllib.parse
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional, Tuple, Set, Union
from urllib.parse import quote, unquote
from .exceptions import (
InvalidConceptURIError,
InvalidLanguageError,
ValidationError,
TextValidationError,
URIValidationError,
ErrorCode
)
# Maximum lengths for security
MAX_CONCEPT_LENGTH = 200
MAX_URI_LENGTH = 500
MAX_TEXT_LENGTH = 1000
# Supported ConceptNet languages (ISO 639-1 codes)
SUPPORTED_LANGUAGES = {
'af', 'ar', 'be', 'bg', 'bn', 'ca', 'cs', 'cy', 'da', 'de',
'el', 'en', 'eo', 'es', 'et', 'eu', 'fa', 'fi', 'fr', 'ga',
'gd', 'gl', 'he', 'hi', 'hr', 'hu', 'hy', 'id', 'is', 'it',
'ja', 'ka', 'ko', 'la', 'lt', 'lv', 'mk', 'ms', 'mt', 'nl',
'no', 'pl', 'pt', 'ro', 'ru', 'sk', 'sl', 'sq', 'sv', 'sw',
'ta', 'te', 'th', 'tr', 'uk', 'ur', 'vi', 'zh'
}
# Common language mappings and aliases
LANGUAGE_ALIASES = {
'chinese': 'zh', 'english': 'en', 'spanish': 'es', 'french': 'fr',
'german': 'de', 'italian': 'it', 'portuguese': 'pt',
'russian': 'ru', 'japanese': 'ja', 'korean': 'ko',
'arabic': 'ar', 'hindi': 'hi'
}
# Extended language codes (including region variants)
EXTENDED_LANGUAGE_CODES = SUPPORTED_LANGUAGES | {
'zh-cn', 'zh-tw', 'en-us', 'en-gb', 'pt-br', 'es-es'
}
# Common ConceptNet relation patterns
RELATION_PATTERNS = {
'/r/RelatedTo': 'related to',
'/r/IsA': 'is a',
'/r/PartOf': 'part of',
'/r/HasA': 'has a',
'/r/UsedFor': 'used for',
'/r/CapableOf': 'capable of',
'/r/AtLocation': 'at location',
'/r/Causes': 'causes',
'/r/HasSubevent': 'has subevent',
'/r/HasFirstSubevent': 'has first subevent',
'/r/HasLastSubevent': 'has last subevent',
'/r/HasPrerequisite': 'has prerequisite',
'/r/HasProperty': 'has property',
'/r/MotivatedByGoal': 'motivated by goal',
'/r/ObstructedBy': 'obstructed by',
'/r/Desires': 'desires',
'/r/CreatedBy': 'created by',
'/r/Synonym': 'synonym',
'/r/Antonym': 'antonym',
'/r/DistinctFrom': 'distinct from',
'/r/DerivedFrom': 'derived from',
'/r/SymbolOf': 'symbol of',
'/r/DefinedAs': 'defined as',
'/r/MannerOf': 'manner of',
'/r/LocatedNear': 'located near',
'/r/HasContext': 'has context',
'/r/SimilarTo': 'similar to',
'/r/EtymologicallyRelatedTo': 'etymologically related to',
'/r/EtymologicallyDerivedFrom': 'etymologically derived from',
'/r/CausesDesire': 'causes desire',
'/r/MadeOf': 'made of',
'/r/ReceivesAction': 'receives action',
'/r/NotCapableOf': 'not capable of',
'/r/NotUsedFor': 'not used for',
'/r/NotHasProperty': 'not has property'
}
# Cache for expensive operations - increased cache sizes for better performance
@functools.lru_cache(maxsize=5000)
def _cached_normalize(text: str, form: str) -> str:
"""Cached Unicode normalization for performance."""
return unicodedata.normalize(form, text)
# Add caching to normalize_concept_text for better performance - larger cache
@functools.lru_cache(maxsize=10000)
def _cached_normalize_concept_text(text: str, language: str, max_length_val: int,
preserve_underscores: bool, strip_diacritics: bool, strict_validation: bool) -> str:
"""Cached concept text normalization for performance."""
# This is the actual implementation that will be called by normalize_concept_text
if not text:
return ""
# Use provided max_length or default
max_len = max_length_val
# SECURITY: Validate input length BEFORE processing to prevent DoS (only in strict mode)
if strict_validation and len(text) > max_len:
raise ValidationError(
field="text",
value=f"text of length {len(text)}",
expected=f"text with maximum length {max_len}"
)
# In non-strict mode, truncate input if too long
if not strict_validation and len(text) > max_len:
text = text[:max_len]
# Validate language (don't raise exception, just check)
if not validate_language_code(language, raise_exception=False):
raise InvalidLanguageError(
language=language,
supported_languages=list(SUPPORTED_LANGUAGES)
)
# 1. Strip leading/trailing whitespace
normalized = text.strip()
# Handle whitespace-only input
if not normalized:
return ""
# 2. Normalize Unicode to NFC form for consistent representation
# Apply diacritics stripping if requested
normalized = normalize_unicode(normalized, 'NFC', strip_accents=strip_diacritics)
# 3. Convert to lowercase
normalized = normalized.lower()
# 4. Normalize whitespace: convert multiple whitespace to single spaces
normalized = re.sub(r'\s+', ' ', normalized)
# 5. Replace spaces with underscores for URI compatibility (unless preserving underscores)
if not preserve_underscores:
normalized = normalized.replace(' ', '_')
else:
# Just replace spaces, keep existing underscores
normalized = re.sub(r' +', '_', normalized)
# 6. Remove or replace problematic characters for SECURITY
# Keep: letters, numbers, underscores, hyphens, and VERY LIMITED punctuation
# Remove: control characters, quotes, most punctuation, special symbols for security
normalized = re.sub(r'[^\w\-_]', '', normalized, flags=re.UNICODE)
# 7. Handle multiple consecutive underscores/hyphens
normalized = re.sub(r'[_-]+', '_', normalized)
# 8. Remove leading/trailing underscores and hyphens
normalized = normalized.strip('_-')
# 9. SECURITY: Final length check after normalization (only in strict mode)
if strict_validation and len(normalized) > max_len:
raise ValidationError(
field="normalized_text",
value=f"normalized text of length {len(normalized)}",
expected=f"normalized text with maximum length {max_len}"
)
# In non-strict mode, truncate if still too long after normalization
if not strict_validation and len(normalized) > max_len:
normalized = normalized[:max_len].rstrip('_-')
# 10. Return empty string if nothing remains after normalization
if not normalized:
return ""
return normalized
def validate_text_length(text: str, max_length: int = MAX_TEXT_LENGTH, field_name: str = "text") -> None:
"""
Validate text length for security.
Args:
text: Text to validate
max_length: Maximum allowed length
field_name: Name of the field being validated
Raises:
ValidationError: If text exceeds maximum length
"""
if len(text) > max_length:
raise ValidationError(
field=field_name,
value=f"text of length {len(text)}",
expected=f"text with maximum length {max_length}"
)
# Cache the Unicode normalization with parameters for better performance
@functools.lru_cache(maxsize=5000)
def _cached_normalize_unicode(text: str, form: str, case_fold: bool, strip_accents: bool) -> str:
"""Cached Unicode normalization with all parameters."""
if not text:
return ""
# Use cached normalization for performance
normalized = _cached_normalize(text, form)
# Apply case folding if requested
if case_fold:
normalized = normalized.casefold()
# Strip accents if requested
if strip_accents:
# Use NFD normalization to decompose characters, then filter out combining marks
decomposed = unicodedata.normalize('NFD', normalized)
filtered = ''.join(char for char in decomposed if unicodedata.category(char) != 'Mn')
normalized = unicodedata.normalize('NFC', filtered)
return normalized
def normalize_unicode(text: str, form: str = 'NFC', case_fold: bool = False, strip_accents: bool = False) -> str:
"""
Normalize Unicode text using the specified normalization form.
This function provides robust Unicode normalization following Python best practices
for text processing. It handles various Unicode edge cases and ensures consistent
character representation.
Args:
text: Input text to normalize
form: Unicode normalization form ('NFC', 'NFD', 'NFKC', 'NFKD')
case_fold: Whether to apply case folding for case-insensitive comparison
strip_accents: Whether to remove accent marks from characters
Returns:
Normalized text
Raises:
ValidationError: If the normalization form is invalid
"""
if not text:
return ""
valid_forms = {'NFC', 'NFD', 'NFKC', 'NFKD'}
if form not in valid_forms:
raise ValidationError(
field="normalization_form",
value=form,
expected=f"one of {valid_forms}"
)
# Use cached normalization for performance
try:
return _cached_normalize_unicode(text, form, case_fold, strip_accents)
except Exception as e:
raise ValidationError(
field="text",
value=text[:50] + "..." if len(text) > 50 else text,
expected="valid Unicode text"
) from e
def normalize_concept_text(text: str, language: str = "en", max_length: Optional[int] = None,
preserve_underscores: bool = False, strip_diacritics: bool = False,
strict_validation: bool = False) -> str:
"""
Normalize text for ConceptNet concept representation.
This function applies ConceptNet-specific text normalization rules including
Unicode normalization, case conversion, whitespace handling, and character
sanitization following security best practices.
Args:
text: Input text to normalize
language: Language code for language-specific rules
max_length: Maximum allowed length (defaults to MAX_CONCEPT_LENGTH)
preserve_underscores: Whether to preserve existing underscores
strip_diacritics: Whether to remove accent marks
strict_validation: Whether to raise errors for length violations (False = truncate)
Returns:
Normalized text suitable for ConceptNet URIs
Raises:
AttributeError: If text is None
ValidationError: If text is invalid after normalization (strict_validation=True only)
"""
# Handle None input explicitly to raise AttributeError as expected by tests
if text is None:
raise AttributeError("'NoneType' object has no attribute 'strip'")
# Convert non-string inputs to string
if not isinstance(text, str):
text = str(text).lower()
# Use cached implementation with hashable parameters
max_len = max_length if max_length is not None else MAX_CONCEPT_LENGTH
return _cached_normalize_concept_text(text, language, max_len, preserve_underscores, strip_diacritics, strict_validation)
# Add cache_info attribute to normalize_concept_text for monitoring
normalize_concept_text.cache_info = lambda: _cached_normalize_concept_text.cache_info()
def construct_concept_uri(term: str, language: str = "en", auto_normalize: bool = True, validate: bool = True) -> str:
"""
Construct a valid ConceptNet URI from a term and language.
This function creates properly formatted ConceptNet URIs with robust validation,
encoding, and error handling following ConceptNet API specifications.
Args:
term: Concept term to convert to URI
language: Language code for the concept
auto_normalize: Whether to automatically normalize the term
validate: Whether to validate the resulting URI
Returns:
Properly formatted ConceptNet URI
Raises:
InvalidLanguageError: If language code is not supported
ValidationError: If term is invalid
InvalidConceptURIError: If URI construction fails
"""
if not term:
raise ValidationError(
field="term",
value=term,
expected="non-empty string"
)
# Check for whitespace-only text
if not term.strip():
raise ValidationError(
field="term",
value=term,
expected="non-empty string"
)
# Validate and normalize language (only if validate=True)
if validate:
normalized_language = normalize_language_code(language)
else:
normalized_language = language.lower().strip()
# Normalize the term if auto_normalize is enabled
if auto_normalize:
if validate:
normalized_term = normalize_concept_text(term, normalized_language, strict_validation=True)
# Check if text becomes empty after normalization
if not normalized_term or not normalized_term.strip():
raise ValidationError(
field="term",
value=term,
expected="text that remains non-empty after normalization"
)
else:
# Simplified normalization when validation is disabled
normalized_term = term.strip().lower().replace(' ', '_')
else:
normalized_term = term.strip()
# Check for spaces when auto_normalize is False
if ' ' in normalized_term and validate:
raise ValidationError(
field="term",
value=term,
expected="term without spaces (use auto_normalize=True or replace spaces with underscores)"
)
# For ConceptNet URIs, preserve Unicode characters - minimal encoding
# Only encode characters that are truly problematic for URIs (like spaces)
# Don't encode Unicode letters/characters - they should be preserved as-is
encoded_term = normalized_term
# Construct the URI
uri = f"/c/{normalized_language}/{encoded_term}"
# Validate the constructed URI if validation is enabled
if validate:
validate_concept_uri(uri)
return uri
def parse_concept_uri(uri: str, validate: bool = True, normalize_term: bool = True) -> Dict[str, str]:
"""
Parse a ConceptNet URI into its components with comprehensive validation.
This function safely parses ConceptNet URIs and extracts their components
with proper error handling and validation.
Args:
uri: ConceptNet URI to parse
validate: Whether to validate language codes and URI format
normalize_term: Whether to normalize the extracted term
Returns:
Dictionary containing URI components: language, term, original_uri, normalized_term
Raises:
InvalidConceptURIError: If URI format is invalid
ValidationError: If URI is malformed
"""
if not uri:
if validate:
raise InvalidConceptURIError(
uri=uri,
validation_errors=["empty URI not allowed"]
)
else:
return {'language': '', 'term': '', 'original_uri': uri, 'normalized_term': ''}
# Validate URI length (only if validating)
if validate:
validate_text_length(uri, MAX_URI_LENGTH, "concept_uri")
# Basic format validation (only if validating)
if validate and not uri.startswith('/c/'):
raise InvalidConceptURIError(
uri=uri,
validation_errors=["URI must start with '/c/'"]
)
# Split URI into parts
parts = uri.split('/')
if validate and len(parts) < 4:
raise InvalidConceptURIError(
uri=uri,
validation_errors=["URI must have at least 4 parts: ['', 'c', 'language', 'term']"]
)
# Additional validation for invalid URI formats when validating
if validate:
# Check for empty language or empty term
if len(parts) >= 3 and not parts[2]: # Empty language
raise InvalidConceptURIError(
uri=uri,
validation_errors=["Language code cannot be empty"]
)
if len(parts) >= 4 and not parts[3]: # Empty term
raise InvalidConceptURIError(
uri=uri,
validation_errors=["Term cannot be empty"]
)
# Check for wrong prefix - should be 'c' not 'x'
if len(parts) >= 2 and parts[1] != 'c':
raise InvalidConceptURIError(
uri=uri,
validation_errors=[f"Invalid prefix '{parts[1]}', expected 'c'"]
)
# Extract components
try:
language = parts[2] if len(parts) > 2 else ''
# Handle terms with slashes (compound terms)
encoded_term = '/'.join(parts[3:]) if len(parts) > 3 else ''
# URL decode the term
try:
term = unquote(encoded_term) if encoded_term else ''
except Exception as e:
if validate:
raise InvalidConceptURIError(
uri=uri,
validation_errors=[f"Failed to decode term: {e}"]
) from e
else:
term = encoded_term
# Validate extracted language if validation is enabled
if validate and language and not validate_language_code(language, raise_exception=False):
raise InvalidConceptURIError(
uri=uri,
reason=f"Invalid language code: {language}"
)
# Generate normalized term
if normalize_term and term:
try:
normalized_term = normalize_concept_text(term, language, strict_validation=validate) if (validate and language) else term.lower().replace(' ', '_')
except Exception:
normalized_term = term
else:
normalized_term = term
return {
'language': language,
'term': term,
'original_uri': uri,
'normalized_term': normalized_term
}
except Exception as e:
if isinstance(e, (InvalidConceptURIError, ValidationError)):
raise
if validate:
raise InvalidConceptURIError(
uri=uri,
reason=f"Parse error: {e}"
) from e
else:
# Return best-effort parsing when validation is disabled
return {
'language': parts[2] if len(parts) > 2 else '',
'term': '/'.join(parts[3:]) if len(parts) > 3 else '',
'original_uri': uri,
'normalized_term': '/'.join(parts[3:]) if len(parts) > 3 else ''
}
def validate_concept_uri(uri: str) -> bool:
"""
Validate a ConceptNet URI format and structure.
Args:
uri: URI to validate
Returns:
True if URI is valid
Raises:
InvalidConceptURIError: If URI is invalid
"""
try:
parse_concept_uri(uri, validate=True)
return True
except (InvalidConceptURIError, ValidationError):
return False # Return False instead of raising for better behavior
def normalize_language_code(language: str) -> str:
"""
Normalize and validate a language code.
Args:
language: Language code to normalize
Returns:
Normalized language code
Raises:
InvalidLanguageError: If language is not supported
"""
if not language:
raise InvalidLanguageError(
language=language,
supported_languages=list(SUPPORTED_LANGUAGES)
)
# Convert to lowercase and strip whitespace
normalized = language.lower().strip()
# Check aliases first
if normalized in LANGUAGE_ALIASES:
normalized = LANGUAGE_ALIASES[normalized]
# Check if it's in extended language codes (includes region variants)
if normalized in EXTENDED_LANGUAGE_CODES:
return normalized
# Check base language codes
if normalized in SUPPORTED_LANGUAGES:
return normalized
# Find similar languages for suggestions
similar = find_similar_languages(normalized)
raise InvalidLanguageError(
language=language,
supported_languages=list(EXTENDED_LANGUAGE_CODES),
suggested_languages=similar
)
def validate_language_code(language: str, raise_exception: bool = True) -> bool:
"""
Validate a language code.
Args:
language: Language code to validate
raise_exception: Whether to raise exception for invalid codes (default: True)
Returns:
True if language code is valid for ConceptNet
Raises:
InvalidLanguageError: If language code is invalid and raise_exception=True
"""
# Check for obviously invalid cases first
if not language or not isinstance(language, str):
if raise_exception:
raise InvalidLanguageError(
language=language or "",
supported_languages=list(EXTENDED_LANGUAGE_CODES)
)
return False
normalized = language.lower().strip()
# Check for invalid formats
if len(normalized) < 2 or len(normalized) > 6:
if raise_exception:
raise InvalidLanguageError(
language=language,
supported_languages=list(EXTENDED_LANGUAGE_CODES)
)
return False
# Check for uppercase (should be lowercase) - tests expect this to raise exception when raise_exception=True
if language != normalized:
if raise_exception:
raise InvalidLanguageError(
language=language,
supported_languages=list(EXTENDED_LANGUAGE_CODES)
)
return False
try:
normalize_language_code(language)
return True
except InvalidLanguageError:
if raise_exception:
raise
return False
def find_similar_languages(language: str, max_suggestions: int = 3) -> List[str]:
"""
Find similar language codes for suggestions.
Args:
language: Invalid language code
max_suggestions: Maximum number of suggestions to return
Returns:
List of similar language codes
"""
if not language:
return []
language = language.lower().strip()
suggestions = []
# Check for partial matches
for supported in SUPPORTED_LANGUAGES:
if language in supported or supported in language:
suggestions.append(supported)
# Check aliases
for alias, canonical in LANGUAGE_ALIASES.items():
if language in alias or alias in language:
suggestions.append(canonical)
# Remove duplicates and limit
suggestions = list(dict.fromkeys(suggestions)) # Preserve order while removing duplicates
return suggestions[:max_suggestions]
def clean_text_for_uri(text: str) -> str:
"""
Clean text specifically for URI generation with security considerations.
Args:
text: Text to clean
Returns:
Cleaned text suitable for URI components
"""
if not text:
return ""
# Validate length
validate_text_length(text, MAX_CONCEPT_LENGTH, "text_for_uri")
# Normalize Unicode
cleaned = normalize_unicode(text, 'NFC')
# Remove potentially dangerous characters
# Keep only alphanumeric, spaces, underscores, hyphens
cleaned = re.sub(r'[^\w\s\-_]', '', cleaned)
# Normalize whitespace
cleaned = re.sub(r'\s+', '_', cleaned)
# Remove multiple underscores/hyphens
cleaned = re.sub(r'[_-]+', '_', cleaned)
# Strip leading/trailing separators
cleaned = cleaned.strip('_-')
return cleaned
def validate_concept_text(text: str, min_length: int = 1, max_length: int = 200, allowed_chars: str = None) -> bool:
"""
Validate concept text format and content.
Args:
text: Text to validate
min_length: Minimum allowed length
max_length: Maximum allowed length
allowed_chars: Additional allowed characters beyond defaults
Returns:
True if text is valid
Raises:
TextValidationError: If text is invalid
"""
if not isinstance(text, str):
raise TextValidationError("Invalid text type", text, "string type")
if len(text) < min_length:
raise TextValidationError("Text too short", text, f"minimum {min_length} characters")
if len(text) > max_length:
raise TextValidationError("Text too long", text, f"maximum {max_length} characters")
# If allowed_chars is None (default), use permissive validation
# If allowed_chars is explicitly set (even to ""), use strict validation
if allowed_chars is None:
# Default behavior - allow underscores and basic alphanumeric + some chars
# Check for obviously invalid characters
invalid_chars = ['/', '?', '&', '=', '%', '#', '\t', '\n', '\r']
for char in invalid_chars:
if char in text:
raise TextValidationError("Invalid character found", text, f"text without '{char}'")
# Check for spaces (not allowed by default)
if ' ' in text:
raise TextValidationError("Spaces not allowed", text, "text without spaces")
else:
# Explicit allowed_chars - strict validation
# Check for spaces and invalid characters unless explicitly allowed
if ' ' in text and ' ' not in allowed_chars:
raise TextValidationError("Spaces not allowed", text, "text without spaces")
# Check for other invalid characters
invalid_chars = ['/', '?', '&', '=', '%', '#', '\t', '\n', '\r']
for char in invalid_chars:
if char in text and char not in allowed_chars:
raise TextValidationError("Invalid character found", text, f"text without '{char}'")
# Check for underscore when explicitly restricted
if '_' in text and '_' not in allowed_chars:
raise TextValidationError("Invalid character found", text, "text without '_'")
# Check for valid concept format
try:
normalized = normalize_concept_text(text)
if not normalized:
raise TextValidationError("Empty after normalization", text, "valid concept text")
return True
except Exception as e:
raise TextValidationError("Normalization failed", text, "valid concept text") from e
def get_text_language_hints(text: str) -> List[str]:
"""
Detect potential language of text with confidence scores.
Args:
text: Input text for language detection
Returns:
List of language codes ordered by confidence (most likely first)
"""
if not text:
return ["en"] # Default fallback
# Normalize text for analysis
normalized = normalize_unicode(text.lower(), 'NFC')
total_chars = len(normalized)
if total_chars == 0:
return ["en"]
hints = {}
# Count characters from different scripts
latin_chars = sum(1 for c in normalized if 'a' <= c <= 'z')
cyrillic_chars = sum(1 for c in normalized if '\u0400' <= c <= '\u04ff')
arabic_chars = sum(1 for c in normalized if '\u0600' <= c <= '\u06ff')
chinese_chars = sum(1 for c in normalized if '\u4e00' <= c <= '\u9fff')
japanese_hiragana = sum(1 for c in normalized if '\u3040' <= c <= '\u309f')
japanese_katakana = sum(1 for c in normalized if '\u30a0' <= c <= '\u30ff')
# Calculate confidence scores based on character distribution
if cyrillic_chars > 0:
hints["ru"] = cyrillic_chars / total_chars
if arabic_chars > 0:
hints["ar"] = arabic_chars / total_chars
if chinese_chars > 0:
hints["zh"] = chinese_chars / total_chars
if japanese_hiragana > 0 or japanese_katakana > 0:
hints["ja"] = (japanese_hiragana + japanese_katakana) / total_chars
if latin_chars > 0:
# For Latin scripts, check for common patterns and words
base_confidence = latin_chars / total_chars
# Simple heuristics for different Latin-script languages
if any(word in normalized for word in ["hola", "mundo", "español", "gracias"]):
hints["es"] = base_confidence * 0.9
elif "ñ" in text or "¿" in text or "¡" in text:
hints["es"] = base_confidence * 0.8
elif any(word in normalized for word in ["bonjour", "merci", "français", "monde"]):
hints["fr"] = base_confidence * 0.9
elif "ç" in text or "è" in text or "à" in text or "ù" in text:
hints["fr"] = base_confidence * 0.7
elif any(word in normalized for word in ["hallo", "deutsch", "danke", "welt", "guten", "tag"]):
hints["de"] = base_confidence * 0.9
elif "ä" in text or "ö" in text or "ü" in text or "ß" in text:
hints["de"] = base_confidence * 0.7
elif any(word in normalized for word in ["olá", "obrigado", "português", "mundo"]):
hints["pt"] = base_confidence * 0.9
elif "ç" in text or "ã" in text or "õ" in text:
hints["pt"] = base_confidence * 0.6
else:
# Default to English for Latin characters
hints["en"] = base_confidence * 0.6
# Ensure we have at least some confidence score
if not hints:
hints["en"] = 1.0
# Normalize scores to sum to 1.0
total_confidence = sum(hints.values())
if total_confidence > 0:
hints = {lang: score / total_confidence for lang, score in hints.items()}
# Sort by confidence and return list of language codes
sorted_hints = sorted(hints.items(), key=lambda x: x[1], reverse=True)
return [lang for lang, score in sorted_hints]
def truncate_text_safely(text: str, max_length: int, preserve_words: bool = True,
truncate_indicator: str = "") -> str:
"""
Safely truncate text without breaking words.
Args:
text: Text to truncate
max_length: Maximum allowed length
preserve_words: Whether to avoid breaking words
truncate_indicator: String to append when text is truncated
Returns:
Truncated text
"""
if not text or max_length <= 0:
return ""
if len(text) <= max_length:
return text
# Account for truncation indicator length
effective_length = max_length - len(truncate_indicator)
if effective_length <= 0:
return truncate_indicator[:max_length]
if not preserve_words:
return text[:effective_length] + truncate_indicator
# Find the last word boundary within the limit
truncated = text[:effective_length]
# Look for the last space to avoid breaking words
last_space = truncated.rfind(' ')
if last_space > 0:
truncated = truncated[:last_space]
return truncated + truncate_indicator
def sanitize_text_for_uri(text: str, max_length: int = MAX_CONCEPT_LENGTH) -> str:
"""
Clean text for safe URI generation.
Args:
text: Text to sanitize
max_length: Maximum allowed length
Returns:
Sanitized text suitable for URI components
"""
if not text:
return ""
# Validate length - but handle gracefully instead of throwing exception
if len(text) > max_length:
# Truncate the text rather than failing
text = text[:max_length]
# Normalize Unicode
cleaned = normalize_unicode(text, 'NFC')
# Convert to lowercase for consistency
cleaned = cleaned.lower()
# Remove potentially dangerous characters and control characters
cleaned = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', cleaned)
# Remove potentially dangerous URI patterns for SECURITY
cleaned = re.sub(r'javascript:', '', cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r'data:', '', cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r'vbscript:', '', cleaned, flags=re.IGNORECASE)
# Replace problematic characters with underscores (NOT remove them)
# Convert spaces, slashes, questions marks, @, periods, etc. to underscores
cleaned = re.sub(r'[/\\?@.,;:!+=()\[\]{}\'\"<>&%|]', '_', cleaned)
# Remove completely problematic characters that should not be converted to underscores
cleaned = re.sub(r'[#$^*]', '', cleaned)
# Replace whitespace with underscores
cleaned = re.sub(r'\s+', '_', cleaned)
# Replace multiple separators/underscores with single underscore
cleaned = re.sub(r'[_\-]+', '_', cleaned)
# Strip leading/trailing separators
cleaned = cleaned.strip('_-')
# Handle case where everything gets stripped out (like "!@#$%^&*()")
if not cleaned:
return ""
return cleaned
def clean_whitespace(text: str, normalize_newlines: bool = True) -> str:
"""
Normalize whitespace in text.
Args:
text: Text to clean
normalize_newlines: Whether to normalize different newline types
Returns:
Text with normalized whitespace
"""
if not text:
return ""
cleaned = text
# Normalize different types of whitespace characters
if normalize_newlines:
# Normalize different newline types to \n
cleaned = re.sub(r'\r\n|\r', '\n', cleaned)
# Replace tabs and other whitespace with spaces
cleaned = re.sub(r'[\t\v\f]+', ' ', cleaned)
# Replace multiple spaces with single spaces
cleaned = re.sub(r'[ ]+', ' ', cleaned)
# Replace multiple newlines with single newlines
if normalize_newlines:
cleaned = re.sub(r'\n+', '\n', cleaned)
# Convert newlines to spaces for single-line output
cleaned = re.sub(r'\n', ' ', cleaned)
# Handle Unicode whitespace characters
cleaned = re.sub(r'[\u00A0\u1680\u2000-\u200B\u202F\u205F\u3000]+', ' ', cleaned)
# Final cleanup: multiple spaces to single space
cleaned = re.sub(r' +', ' ', cleaned)
# Strip leading and trailing whitespace
cleaned = cleaned.strip()
return cleaned
def is_valid_concept_format(text: str) -> bool:
"""
Check if text is valid concept format.
Args:
text: Text to validate
Returns:
True if text is valid concept format
"""
if not text or not isinstance(text, str):
return False
# Check length
if len(text) > MAX_CONCEPT_LENGTH:
return False
# Check for spaces and other invalid characters BEFORE normalization
if ' ' in text:
return False
if '\t' in text or '\n' in text or '\r' in text:
return False
# Check for other problematic characters
if any(char in text for char in ['/', '?', '&', '=', '%', '#']):
return False
# Must contain at least some alphanumeric or Unicode letter characters
# Updated to support Unicode properly for international text like "東京", "Москва"
if not re.search(r'[\w]', text, re.UNICODE):
return False
# Should only contain word characters (including Unicode), underscores, hyphens
# This will now properly support Unicode letters
if not re.match(r'^[\w\-_]+$', text, re.UNICODE):
return False
try:
# Try to normalize the concept text to make sure it's processable
normalized = normalize_concept_text(text, strict_validation=False)
return bool(normalized)
except Exception:
return False
def normalize_text_for_display(text: str) -> str:
"""
Normalize text for human-readable display.
This function converts ConceptNet-style text (with underscores) into readable
format by replacing underscores with spaces and applying proper formatting.
Args:
text: Input text to normalize
Returns:
Human-readable text with spaces instead of underscores
"""
if not text:
return ""
# Convert underscores to spaces
normalized = text.replace('_', ' ')
# Normalize multiple spaces
normalized = re.sub(r'\s+', ' ', normalized)
# Clean up whitespace
normalized = normalized.strip()
# Optionally apply title case for better readability
# (only if the text is all lowercase)
if normalized.islower() and len(normalized.split()) <= 5:
# Apply title case for short phrases
normalized = normalized.title()
return normalized
def normalize_uri_to_text(uri: str) -> str:
"""
Convert a ConceptNet URI to readable text with error handling.
Extracts the term from URIs like '/c/en/hot_dog' and normalizes
it to readable format like 'Hot Dog'.
Args:
uri: ConceptNet URI to convert
Returns:
Human-readable text extracted from the URI
"""
if not uri:
return ""
try:
# Parse the URI to extract the term
parsed = parse_concept_uri(uri)
term = parsed['term']
# Normalize for display
return normalize_text_for_display(term)
except (InvalidConceptURIError, ValidationError):
# Fallback: try to extract term manually
try:
parts = uri.split('/')
if len(parts) >= 4:
term = '/'.join(parts[3:]) # Everything after /c/lang/
term = unquote(term) # URL decode
return normalize_text_for_display(term)
except Exception:
pass
# Final fallback: return the original URI
return uri
def normalize_relation_text(relation_uri: str) -> str:
"""
Convert a relation URI to readable text with comprehensive mapping.
Converts URIs like '/r/IsA' to readable format like 'is a' using a
comprehensive mapping of ConceptNet relations.
Args:
relation_uri: ConceptNet relation URI
Returns:
Human-readable relation text
"""
if not relation_uri:
return ""
# Check if it's in our predefined patterns
if relation_uri in RELATION_PATTERNS:
return RELATION_PATTERNS[relation_uri]
# Extract relation name from URI
if relation_uri.startswith('/r/'):
relation_name = relation_uri[3:] # Remove '/r/' prefix
else:
relation_name = relation_uri
# Convert camelCase to readable format
# IsA -> Is A, RelatedTo -> Related To, etc.
readable = re.sub(r'([a-z])([A-Z])', r'\1 \2', relation_name)
# Convert to lowercase
readable = readable.lower()
return readable
def extract_language_from_uri(uri: str) -> Optional[str]:
"""
Extract language code from a ConceptNet URI with error handling.
Args:
uri: ConceptNet URI (e.g., '/c/en/dog')
Returns:
Language code if found and valid, None otherwise
"""
if not uri:
return None
try:
parsed = parse_concept_uri(uri)
return parsed['language']
except (InvalidConceptURIError, ValidationError):
# Fallback: extract manually with validation
try:
parts = uri.split('/')
if len(parts) >= 3 and parts[1] == 'c':
language = parts[2]
return language if validate_language_code(language, raise_exception=False) else None
except Exception:
pass
return None
def split_compound_terms(text: str, language: str = "en") -> List[str]:
"""
Split compound terms into constituent parts with language awareness.
Args:
text: Compound term to split
language: Language for language-specific splitting rules
Returns:
List of constituent terms
"""
if not text:
return []
# Validate language
if not validate_language_code(language, raise_exception=False):
language = "en" # Fallback to English
# Basic splitting on common separators
parts = re.split(r'[-_\s/]+', text)
# Filter out empty parts and normalize
result = []
for part in parts:
part = part.strip()
if part:
# Apply basic normalization
normalized = normalize_concept_text(part, language, strict_validation=False)
if normalized:
result.append(normalized)
return result
@functools.lru_cache(maxsize=2000)
def calculate_text_similarity(text1: str, text2: str, method: str = "sequence") -> float:
"""
Calculate similarity between two text strings using various algorithms.
This function provides multiple similarity calculation methods with caching
for performance optimization.
Args:
text1: First text string
text2: Second text string
method: Similarity method ('sequence', 'jaccard', 'character')
Returns:
Similarity score between 0.0 and 1.0
"""
if not text1 or not text2:
return 0.0
# Normalize texts for comparison
norm1 = normalize_unicode(text1.lower(), 'NFC')
norm2 = normalize_unicode(text2.lower(), 'NFC')
if norm1 == norm2:
return 1.0
if method == "sequence":
# Use SequenceMatcher for sequence-based similarity
return SequenceMatcher(None, norm1, norm2).ratio()
elif method == "jaccard":
# Jaccard similarity using character sets
set1 = set(norm1)
set2 = set(norm2)
intersection = len(set1.intersection(set2))
union = len(set1.union(set2))
return intersection / union if union > 0 else 0.0
elif method == "character":
# Character overlap similarity
set1 = set(norm1)
set2 = set(norm2)
intersection = len(set1.intersection(set2))
return intersection / max(len(set1), len(set2)) if max(len(set1), len(set2)) > 0 else 0.0
else:
# Default to sequence method
return SequenceMatcher(None, norm1, norm2).ratio()
def fuzzy_match_concepts(
query: str,
candidates: List[str],
threshold: float = 0.6,
max_results: int = 10
) -> List[Tuple[str, float]]:
"""
Find fuzzy matches for a concept query against candidate concepts.
Args:
query: Query concept to match
candidates: List of candidate concepts
threshold: Minimum similarity threshold
max_results: Maximum number of results to return
Returns:
List of (concept, similarity_score) tuples sorted by similarity
"""
if not query or not candidates:
return []
matches = []
normalized_query = normalize_concept_text(query, strict_validation=False)
for candidate in candidates:
if not candidate:
continue
try:
normalized_candidate = normalize_concept_text(candidate, strict_validation=False)
similarity = calculate_text_similarity(normalized_query, normalized_candidate)
if similarity >= threshold:
matches.append((candidate, similarity))
except Exception:
# Skip candidates that cause errors
continue
# Sort by similarity (descending) and limit results
matches.sort(key=lambda x: x[1], reverse=True)
return matches[:max_results]
def sanitize_search_query(query: str, max_length: int = 200) -> str:
"""
Sanitize a search query for safe processing with security considerations.
Args:
query: Raw search query text
max_length: Maximum allowed query length
Returns:
Sanitized query suitable for ConceptNet search
Raises:
ValidationError: If query is invalid or too long
"""
if not query:
return ""
# Validate length
validate_text_length(query.strip(), max_length, "search_query")
# Normalize Unicode
sanitized = normalize_unicode(query, 'NFC')
# Remove control characters and potentially dangerous content
sanitized = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', sanitized)
# Normalize whitespace
sanitized = re.sub(r'\s+', ' ', sanitized)
# Strip and ensure we have content
sanitized = sanitized.strip()
return sanitized
def is_valid_concept_text(text: str) -> bool:
"""
Check if text is valid for use as a concept without raising exceptions.
Args:
text: Text to validate
Returns:
True if text is valid for concept usage
"""
if not text:
return False
try:
# Try to normalize the text
normalized = normalize_concept_text(text, strict_validation=False)
return bool(normalized)
except Exception:
return False
def estimate_text_language(text: str) -> str:
"""
Attempt to estimate the language of input text using simple heuristics.
This is a basic implementation that uses character patterns and common words
to make language estimates. For production use, consider integrating with
proper language detection libraries.
Args:
text: Input text for language detection
Returns:
Estimated language code (defaults to "en" if uncertain)
"""
if not text:
return "en"
# Normalize text for analysis
normalized = normalize_unicode(text.lower(), 'NFC')
# Simple character-based detection
# Count characters from different scripts
latin_chars = sum(1 for c in normalized if 'a' <= c <= 'z')
cyrillic_chars = sum(1 for c in normalized if '\u0400' <= c <= '\u04ff')
arabic_chars = sum(1 for c in normalized if '\u0600' <= c <= '\u06ff')
chinese_chars = sum(1 for c in normalized if '\u4e00' <= c <= '\u9fff')
japanese_chars = sum(1 for c in normalized if '\u3040' <= c <= '\u309f' or '\u30a0' <= c <= '\u30ff')
total_chars = len(normalized)
if total_chars == 0:
return "en"
# Determine language based on character distribution
if cyrillic_chars / total_chars > 0.5:
return "ru"
elif arabic_chars / total_chars > 0.5:
return "ar"
elif chinese_chars / total_chars > 0.3:
return "zh"
elif japanese_chars / total_chars > 0.3:
return "ja"
elif latin_chars / total_chars > 0.7:
# For Latin scripts, we default to English
# In a production system, you'd use more sophisticated detection
return "en"
# Default fallback
return "en"
# Performance and caching utilities
def clear_text_caches():
"""Clear all text processing caches to free memory."""
_cached_normalize.cache_clear()
_cached_normalize_concept_text.cache_clear()
_cached_normalize_unicode.cache_clear()
calculate_text_similarity.cache_clear()
def get_cache_info() -> Dict[str, Any]:
"""Get information about text processing cache usage."""
return {
'normalize_cache': _cached_normalize.cache_info()._asdict(),
'normalize_concept_cache': _cached_normalize_concept_text.cache_info()._asdict(),
'normalize_unicode_cache': _cached_normalize_unicode.cache_info()._asdict(),
'similarity_cache': calculate_text_similarity.cache_info()._asdict()
}