"""Advanced OCR (Optical Character Recognition) engine for visual automation.
This module implements sophisticated text extraction capabilities using multiple OCR
backends and techniques. Provides high-accuracy text recognition with language support,
confidence scoring, and comprehensive result metadata.
Security: Sensitive content detection and privacy-aware text processing.
Performance: Optimized text extraction with caching and batch processing.
Accuracy: Multi-backend approach with result validation and confidence scoring.
"""
from __future__ import annotations
import asyncio
import hashlib
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
from src.core.contracts import ensure, require
from src.core.either import Either
from src.core.logging import get_logger
from src.core.visual import (
ImageData,
OCRResult,
OCRText,
PrivacyError,
ProcessingError,
ScreenRegion,
VisualError,
normalize_confidence,
validate_image_data,
)
logger = get_logger(__name__)
@dataclass(frozen=True)
class OCRLanguageConfig:
"""OCR language configuration with validation."""
language_code: str
language_name: str
supported_scripts: list[str]
confidence_adjustment: float = 0.0 # Adjustment for language-specific confidence
preprocessing_options: dict[str, Any] = None
def __post_init__(self):
if self.preprocessing_options is None:
object.__setattr__(self, "preprocessing_options", {})
if not re.match(r"^[a-z]{2,3}(-[A-Z]{2})?$", self.language_code):
raise ValueError(f"Invalid language code format: {self.language_code}")
if not (-0.2 <= self.confidence_adjustment <= 0.2):
raise ValueError("Confidence adjustment must be between -0.2 and 0.2")
@dataclass(frozen=True)
class OCRProcessingOptions:
"""Advanced OCR processing configuration."""
dpi: int = 300 # Target DPI for processing
contrast_enhancement: bool = True
noise_reduction: bool = True
skew_correction: bool = True
language_detection: bool = True
preserve_formatting: bool = True
extract_tables: bool = False
extract_line_boxes: bool = True
extract_word_boxes: bool = True
extract_character_boxes: bool = False
confidence_threshold: float = 0.5
def __post_init__(self):
"""Validate OCR processing options."""
if not (72 <= self.dpi <= 600):
raise ValueError("DPI must be between 72 and 600")
if not (0.0 <= self.confidence_threshold <= 1.0):
raise ValueError("Confidence threshold must be between 0.0 and 1.0")
class OCRPrivacyFilter:
"""Privacy-aware OCR result filtering with sensitive content detection."""
# Comprehensive sensitive content patterns
SENSITIVE_PATTERNS = {
"credit_card": [
r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", # Credit card numbers
r"\b(?:4\d{3}|5[1-5]\d{2}|6011)[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", # Specific card types
],
"ssn": [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN format
r"\b\d{3}\s\d{2}\s\d{4}\b", # SSN with spaces
],
"phone": [
r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", # Phone numbers
r"\(\d{3}\)\s?\d{3}[-.]?\d{4}\b", # Phone with parentheses
],
"email": [
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # Email addresses
],
"sensitive_terms": [
r"\b(?:password|pwd|pass|pin|ssn|social|security)\b", # Sensitive field labels
r"\b(?:private|confidential|secret|classified)\b", # Privacy indicators
],
"financial": [
r"\$\s?\d{1,3}(?:,\d{3})*(?:\.\d{2})?", # Currency amounts
r"\b(?:account|acct)[\s#:]*\d+\b", # Account numbers
r"\b(?:routing|rt)[\s#:]*\d{9}\b", # Routing numbers
],
"medical": [
r"\b(?:patient|medical|health)[\s#:]*(?:id|number|record)\b", # Medical IDs
r"\b\d{10,}\b", # Long number sequences (potential medical IDs)
],
"government": [
r"\b(?:license|dl)[\s#:]*[A-Z0-9]{8,}\b", # Driver's license
r"\b(?:passport|pp)[\s#:]*[A-Z0-9]{6,}\b", # Passport numbers
],
}
REDACTION_LABEL = "[REDACTED]"
@classmethod
def filter_sensitive_content(
cls,
text: str,
privacy_mode: bool = True,
custom_patterns: dict[str, list[str]] | None = None,
) -> tuple[str, set[str]]:
"""Filter sensitive content from OCR text with detailed detection logging.
Returns:
Tuple of (filtered_text, detected_categories)
"""
if not privacy_mode:
return text, set()
filtered_text = text
detected_categories = set()
# Combine default and custom patterns
all_patterns = cls.SENSITIVE_PATTERNS.copy()
if custom_patterns:
all_patterns.update(custom_patterns)
for category, patterns in all_patterns.items():
for pattern in patterns:
matches = re.finditer(
pattern,
filtered_text,
re.IGNORECASE | re.MULTILINE,
)
for _match in matches:
detected_categories.add(category)
# Replace with redaction label
filtered_text = re.sub(
pattern,
cls.REDACTION_LABEL,
filtered_text,
flags=re.IGNORECASE,
)
return filtered_text, detected_categories
@classmethod
def validate_content_safety(cls, text: str) -> Either[PrivacyError, str]:
"""Validate that text content is safe for processing."""
# Check for suspicious content indicators
suspicious_indicators = [
r"(?i)(?:login|signin|password|authentication)",
r"(?i)(?:banking|financial|payment|billing)",
r"(?i)(?:medical|health|patient|doctor)",
r"(?i)(?:government|federal|state|official)",
]
risk_score = 0
detected_risks = []
for indicator in suspicious_indicators:
if re.search(indicator, text):
risk_score += 1
detected_risks.append(indicator)
# High risk content requires explicit privacy mode
if risk_score >= 3:
return Either.left(
PrivacyError(
"High-risk content detected - explicit privacy mode required",
{"risk_score": risk_score, "detected_risks": detected_risks},
),
)
return Either.right(text)
class OCRResultCache:
"""Intelligent caching system for OCR results."""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
self.cache: dict[str, tuple[OCRResult, datetime]] = {}
self.max_size = max_size
self.ttl = timedelta(seconds=ttl_seconds)
def _generate_cache_key(
self,
image_data: bytes,
region: ScreenRegion | None,
options: OCRProcessingOptions,
) -> str:
"""Generate cache key for OCR operation."""
# Create hash of image data and parameters
hasher = hashlib.sha256()
hasher.update(image_data)
if region:
hasher.update(
f"{region.x},{region.y},{region.width},{region.height}".encode(),
)
hasher.update(f"{options.dpi},{options.language_detection}".encode())
return hasher.hexdigest()[:16]
def get(
self,
image_data: bytes,
region: ScreenRegion | None,
options: OCRProcessingOptions,
) -> OCRResult | None:
"""Get cached OCR result if available and not expired."""
cache_key = self._generate_cache_key(image_data, region, options)
if cache_key in self.cache:
result, timestamp = self.cache[cache_key]
if datetime.now() - timestamp <= self.ttl:
logger.debug(f"OCR cache hit for key: {cache_key}")
return result
# Remove expired entry
del self.cache[cache_key]
return None
def put(
self,
image_data: bytes,
region: ScreenRegion | None,
options: OCRProcessingOptions,
result: OCRResult,
) -> None:
"""Store OCR result in cache."""
if len(self.cache) >= self.max_size:
# Remove oldest entry
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
cache_key = self._generate_cache_key(image_data, region, options)
self.cache[cache_key] = (result, datetime.now())
logger.debug(f"Cached OCR result for key: {cache_key}")
def clear(self) -> None:
"""Clear all cached results."""
self.cache.clear()
logger.info("OCR cache cleared")
class OCREngine:
"""Advanced OCR engine with multiple backend support and intelligent processing.
Provides high-accuracy text extraction with comprehensive language support,
privacy protection, and performance optimization through caching and batch processing.
"""
# Supported language configurations
SUPPORTED_LANGUAGES = {
"en": OCRLanguageConfig("en", "English", ["Latin"], 0.0),
"es": OCRLanguageConfig("es", "Spanish", ["Latin"], 0.0),
"fr": OCRLanguageConfig("fr", "French", ["Latin"], 0.0),
"de": OCRLanguageConfig("de", "German", ["Latin"], 0.0),
"it": OCRLanguageConfig("it", "Italian", ["Latin"], 0.0),
"pt": OCRLanguageConfig("pt", "Portuguese", ["Latin"], 0.0),
"ru": OCRLanguageConfig("ru", "Russian", ["Cyrillic"], -0.05),
"ja": OCRLanguageConfig(
"ja",
"Japanese",
["Hiragana", "Katakana", "Kanji"],
-0.1,
),
"ko": OCRLanguageConfig("ko", "Korean", ["Hangul"], -0.1),
"zh": OCRLanguageConfig("zh", "Chinese", ["Han"], -0.1),
"ar": OCRLanguageConfig("ar", "Arabic", ["Arabic"], -0.15),
"hi": OCRLanguageConfig("hi", "Hindi", ["Devanagari"], -0.1),
}
def __init__(self, cache_enabled: bool = True, cache_size: int = 1000):
self.cache = OCRResultCache(max_size=cache_size) if cache_enabled else None
self.privacy_filter = OCRPrivacyFilter()
logger.info(
f"OCR Engine initialized with cache={'enabled' if cache_enabled else 'disabled'}",
)
@require(lambda image_data: len(image_data) > 0)
@ensure(
lambda result: result.is_right() or isinstance(result.get_left(), VisualError),
)
async def extract_text(
self,
image_data: ImageData,
region: ScreenRegion | None = None,
language: str = "en",
options: OCRProcessingOptions | None = None,
privacy_mode: bool = True,
) -> Either[VisualError, OCRResult]:
"""Extract text from image data using advanced OCR techniques.
Args:
image_data: Image data to process
region: Specific region to extract text from
language: Target language for OCR
options: Processing options and parameters
privacy_mode: Enable privacy content filtering
Returns:
Either OCR result or processing error
"""
try:
logger.info(
f"Starting OCR extraction for {len(image_data)} bytes, language: {language}",
)
# Validate inputs
image_validation = validate_image_data(bytes(image_data))
if image_validation.is_left():
return Either.left(image_validation.get_left())
if language not in self.SUPPORTED_LANGUAGES:
return Either.left(
ProcessingError(
f"Unsupported language: {language}. Supported: {list(self.SUPPORTED_LANGUAGES.keys())}",
),
)
# Use default options if not provided
if options is None:
options = OCRProcessingOptions()
# Check cache first
if self.cache:
cached_result = self.cache.get(bytes(image_data), region, options)
if cached_result:
logger.debug("Using cached OCR result")
return Either.right(cached_result)
# Perform OCR processing
ocr_result = await self._perform_ocr_extraction(
bytes(image_data),
region,
language,
options,
)
if ocr_result.is_left():
return ocr_result
result = ocr_result.get_right()
# Apply privacy filtering
if privacy_mode:
filtered_result = self._apply_privacy_filtering(result)
if filtered_result.is_left():
return filtered_result
result = filtered_result.get_right()
# Cache the result
if self.cache:
self.cache.put(bytes(image_data), region, options, result)
logger.info(
f"OCR extraction completed: {len(result.text)} characters, confidence: {result.confidence}",
)
return Either.right(result)
except Exception as e:
logger.error(f"OCR extraction failed: {e!s}")
return Either.left(ProcessingError(f"OCR extraction failed: {e!s}"))
async def _perform_ocr_extraction(
self,
_image_data: bytes,
region: ScreenRegion | None,
language: str,
options: OCRProcessingOptions,
) -> Either[VisualError, OCRResult]:
"""Perform the actual OCR processing (placeholder for real implementation)."""
try:
# This is a simulation of OCR processing
# In a real implementation, this would use libraries like:
# - Tesseract (pytesseract)
# - Apple Vision Framework (via PyObjC)
# - Google Cloud Vision API
# - AWS Textract
# Simulate processing delay
await asyncio.sleep(0.1)
# Simulate extracted text based on image characteristics
if region:
simulated_text = f"Sample extracted text from region {region.x},{region.y} {region.width}x{region.height}"
text_region = region
else:
simulated_text = "Sample extracted text from full image"
text_region = ScreenRegion(0, 0, 800, 600) # Default region
# Simulate confidence based on language
lang_config = self.SUPPORTED_LANGUAGES[language]
base_confidence = 0.85
adjusted_confidence = base_confidence + lang_config.confidence_adjustment
confidence = normalize_confidence(adjusted_confidence)
# Create word boxes (simulation)
words = simulated_text.split()
word_boxes = []
x_offset = text_region.x
y_offset = text_region.y
word_width = text_region.width // max(len(words), 1)
word_height = 20
for i, word in enumerate(words):
word_region = ScreenRegion(
x=x_offset + i * word_width,
y=y_offset,
width=word_width,
height=word_height,
)
word_boxes.append((word, word_region))
# Create line boxes (simulation)
line_boxes = [
ScreenRegion(
x=text_region.x,
y=text_region.y,
width=text_region.width,
height=word_height,
),
]
result = OCRResult(
text=OCRText(simulated_text),
confidence=confidence,
coordinates=text_region,
language=language,
line_boxes=line_boxes,
word_boxes=word_boxes,
text_orientation=0.0,
metadata={
"processing_time_ms": 100,
"engine": "simulation",
"dpi": options.dpi,
"preprocessing": {
"contrast_enhancement": options.contrast_enhancement,
"noise_reduction": options.noise_reduction,
"skew_correction": options.skew_correction,
},
},
)
return Either.right(result)
except Exception as e:
return Either.left(ProcessingError(f"OCR processing failed: {e!s}"))
def _apply_privacy_filtering(
self,
result: OCRResult,
) -> Either[VisualError, OCRResult]:
"""Apply privacy filtering to OCR result."""
try:
# Validate content safety
safety_check = self.privacy_filter.validate_content_safety(str(result.text))
if safety_check.is_left():
return Either.left(safety_check.get_left())
# Filter sensitive content
filtered_text, detected_categories = (
self.privacy_filter.filter_sensitive_content(
str(result.text),
privacy_mode=True,
)
)
# Update metadata with privacy information
privacy_metadata = result.metadata.copy()
privacy_metadata.update(
{
"privacy_filtered": True,
"detected_sensitive_categories": list(detected_categories),
"original_length": len(str(result.text)),
"filtered_length": len(filtered_text),
},
)
# Create new result with filtered text
filtered_result = OCRResult(
text=OCRText(filtered_text),
confidence=result.confidence,
coordinates=result.coordinates,
language=result.language,
line_boxes=result.line_boxes,
word_boxes=result.word_boxes,
character_boxes=result.character_boxes,
text_orientation=result.text_orientation,
reading_order=result.reading_order,
metadata=privacy_metadata,
)
if detected_categories:
logger.warning(
f"Sensitive content detected and filtered: {detected_categories}",
)
return Either.right(filtered_result)
except Exception as e:
return Either.left(ProcessingError(f"Privacy filtering failed: {e!s}"))
@require(lambda text_inputs: len(text_inputs) > 0)
async def batch_extract_text(
self,
text_inputs: list[tuple[ImageData, ScreenRegion | None]],
language: str = "en",
options: OCRProcessingOptions | None = None,
privacy_mode: bool = True,
) -> list[Either[VisualError, OCRResult]]:
"""Perform batch OCR extraction for multiple images.
Args:
text_inputs: List of (image_data, region) tuples
language: Target language for all extractions
options: Processing options
privacy_mode: Enable privacy filtering
Returns:
List of OCR results or errors
"""
logger.info(f"Starting batch OCR extraction for {len(text_inputs)} inputs")
# Process all inputs concurrently
tasks = []
for image_data, region in text_inputs:
task = self.extract_text(
image_data,
region,
language,
options,
privacy_mode,
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
error_result = Either.left(
ProcessingError(f"Batch item {i} failed: {result!s}"),
)
processed_results.append(error_result)
else:
processed_results.append(result)
success_count = sum(1 for r in processed_results if r.is_right())
logger.info(
f"Batch OCR completed: {success_count}/{len(text_inputs)} successful",
)
return processed_results
def get_supported_languages(self) -> dict[str, OCRLanguageConfig]:
"""Get all supported languages and their configurations."""
return self.SUPPORTED_LANGUAGES.copy()
def get_cache_stats(self) -> dict[str, Any]:
"""Get OCR cache statistics."""
if not self.cache:
return {"cache_enabled": False}
return {
"cache_enabled": True,
"cache_size": len(self.cache.cache),
"max_size": self.cache.max_size,
"ttl_seconds": self.cache.ttl.total_seconds(),
}
def clear_cache(self) -> None:
"""Clear the OCR result cache."""
if self.cache:
self.cache.clear()
logger.info("OCR cache cleared")
# Convenience functions for common OCR operations
async def extract_text_from_region(
image_data: ImageData,
region: ScreenRegion,
language: str = "en",
privacy_mode: bool = True,
) -> Either[VisualError, OCRResult]:
"""Extract text from a specific screen region."""
engine = OCREngine()
return await engine.extract_text(
image_data,
region,
language,
privacy_mode=privacy_mode,
)
async def extract_text_with_high_accuracy(
image_data: ImageData,
language: str = "en",
privacy_mode: bool = True,
) -> Either[VisualError, OCRResult]:
"""Extract text with high accuracy settings."""
options = OCRProcessingOptions(
dpi=600,
contrast_enhancement=True,
noise_reduction=True,
skew_correction=True,
extract_word_boxes=True,
extract_line_boxes=True,
confidence_threshold=0.8,
)
engine = OCREngine()
return await engine.extract_text(image_data, None, language, options, privacy_mode)
def is_text_content_safe(text: str) -> bool:
"""Check if text content is safe for processing without privacy filtering."""
safety_check = OCRPrivacyFilter.validate_content_safety(text)
return safety_check.is_right()