engine.pyโข22.2 kB
#!/usr/bin/env python3
"""
Content Transformation Engine
Handles content transformation, processing, and formatting for various document types.
"""
import re
import html
import base64
import mimetypes
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urljoin, urlparse
from pathlib import Path
import structlog
from markdown import Markdown
from markdown.extensions import codehilite, tables, toc, fenced_code
from bs4 import BeautifulSoup, Tag
from PIL import Image
import requests
logger = structlog.get_logger(__name__)
class ContentType:
"""Content type constants."""
MARKDOWN = "markdown"
HTML = "html"
PLAIN_TEXT = "text"
JSON = "json"
XML = "xml"
CODE = "code"
class ImageProcessor:
"""Image processing utilities."""
def __init__(self, max_size: Tuple[int, int] = (1200, 800), quality: int = 85):
"""Initialize image processor.
Args:
max_size: Maximum image dimensions (width, height)
quality: JPEG quality (1-100)
"""
self.max_size = max_size
self.quality = quality
def process_image_url(self, url: str, base_url: Optional[str] = None) -> str:
"""Process image URL to absolute URL.
Args:
url: Image URL (relative or absolute)
base_url: Base URL for resolving relative URLs
Returns:
Absolute image URL
"""
if not url:
return url
# If already absolute URL, return as-is
if urlparse(url).netloc:
return url
# If base_url provided, resolve relative URL
if base_url:
return urljoin(base_url, url)
return url
def download_and_process_image(self, url: str) -> Optional[str]:
"""Download and process image, return base64 data URL.
Args:
url: Image URL to download
Returns:
Base64 data URL or None if processing fails
"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
# Open image with PIL
image = Image.open(response.content)
# Resize if necessary
if image.size[0] > self.max_size[0] or image.size[1] > self.max_size[1]:
image.thumbnail(self.max_size, Image.Resampling.LANCZOS)
# Convert to RGB if necessary
if image.mode in ('RGBA', 'P'):
image = image.convert('RGB')
# Save to bytes
import io
buffer = io.BytesIO()
image.save(buffer, format='JPEG', quality=self.quality, optimize=True)
# Create data URL
image_data = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/jpeg;base64,{image_data}"
except Exception as e:
logger.warning("Failed to process image", url=url, error=str(e))
return None
class CodeBlockProcessor:
"""Code block processing utilities."""
def __init__(self):
"""Initialize code block processor."""
self.language_aliases = {
'js': 'javascript',
'ts': 'typescript',
'py': 'python',
'rb': 'ruby',
'sh': 'bash',
'yml': 'yaml',
}
def normalize_language(self, language: str) -> str:
"""Normalize programming language name.
Args:
language: Language identifier
Returns:
Normalized language name
"""
if not language:
return 'text'
language = language.lower().strip()
return self.language_aliases.get(language, language)
def extract_code_blocks(self, content: str) -> List[Dict[str, str]]:
"""Extract code blocks from markdown content.
Args:
content: Markdown content
Returns:
List of code blocks with language and content
"""
code_blocks = []
# Pattern for fenced code blocks
pattern = r'```(\w+)?\n([\s\S]*?)```'
for match in re.finditer(pattern, content):
language = match.group(1) or 'text'
code_content = match.group(2).strip()
code_blocks.append({
'language': self.normalize_language(language),
'content': code_content,
'start_pos': match.start(),
'end_pos': match.end(),
})
return code_blocks
def highlight_code_block(self, code: str, language: str) -> str:
"""Apply syntax highlighting to code block.
Args:
code: Code content
language: Programming language
Returns:
HTML with syntax highlighting
"""
try:
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter
lexer = get_lexer_by_name(language, stripall=True)
formatter = HtmlFormatter(cssclass='highlight')
return highlight(code, lexer, formatter)
except Exception as e:
logger.warning("Failed to highlight code", language=language, error=str(e))
return f'<pre><code class="language-{language}">{html.escape(code)}</code></pre>'
class MarkdownProcessor:
"""Markdown processing utilities."""
def __init__(self, base_url: Optional[str] = None):
"""Initialize markdown processor.
Args:
base_url: Base URL for resolving relative links
"""
self.base_url = base_url
self.image_processor = ImageProcessor()
self.code_processor = CodeBlockProcessor()
# Configure markdown with extensions
self.markdown = Markdown(
extensions=[
'codehilite',
'tables',
'toc',
'fenced_code',
'attr_list',
'def_list',
'footnotes',
'md_in_html',
],
extension_configs={
'codehilite': {
'css_class': 'highlight',
'use_pygments': True,
},
'toc': {
'permalink': True,
'permalink_class': 'toc-link',
},
}
)
def process_links(self, content: str) -> str:
"""Process and resolve links in content.
Args:
content: Content with links
Returns:
Content with processed links
"""
if not self.base_url:
return content
# Pattern for markdown links
link_pattern = r'\[([^\]]+)\]\(([^\)]+)\)'
def replace_link(match):
text = match.group(1)
url = match.group(2)
# Skip if already absolute URL or anchor
if urlparse(url).netloc or url.startswith('#'):
return match.group(0)
# Resolve relative URL
absolute_url = urljoin(self.base_url, url)
return f'[{text}]({absolute_url})'
return re.sub(link_pattern, replace_link, content)
def process_images(self, content: str) -> str:
"""Process images in markdown content.
Args:
content: Markdown content
Returns:
Content with processed images
"""
# Pattern for markdown images
image_pattern = r'!\[([^\]]*)\]\(([^\)]+)\)'
def replace_image(match):
alt_text = match.group(1)
url = match.group(2)
# Process image URL
processed_url = self.image_processor.process_image_url(url, self.base_url)
return f''
return re.sub(image_pattern, replace_image, content)
def to_html(self, content: str) -> str:
"""Convert markdown to HTML.
Args:
content: Markdown content
Returns:
HTML content
"""
# Process links and images
processed_content = self.process_links(content)
processed_content = self.process_images(processed_content)
# Convert to HTML
html_content = self.markdown.convert(processed_content)
# Reset markdown instance for next use
self.markdown.reset()
return html_content
def extract_metadata(self, content: str) -> Dict[str, Any]:
"""Extract metadata from markdown content.
Args:
content: Markdown content
Returns:
Extracted metadata
"""
metadata = {}
# Extract front matter (YAML)
front_matter_pattern = r'^---\n([\s\S]*?)\n---\n'
match = re.match(front_matter_pattern, content)
if match:
try:
import yaml
yaml_content = match.group(1)
metadata.update(yaml.safe_load(yaml_content))
except Exception as e:
logger.warning("Failed to parse front matter", error=str(e))
# Extract headings
headings = []
heading_pattern = r'^(#{1,6})\s+(.+)$'
for match in re.finditer(heading_pattern, content, re.MULTILINE):
level = len(match.group(1))
text = match.group(2).strip()
headings.append({'level': level, 'text': text})
metadata['headings'] = headings
# Extract code blocks
code_blocks = self.code_processor.extract_code_blocks(content)
metadata['code_blocks'] = code_blocks
return metadata
class HTMLProcessor:
"""HTML processing utilities."""
def __init__(self, base_url: Optional[str] = None):
"""Initialize HTML processor.
Args:
base_url: Base URL for resolving relative links
"""
self.base_url = base_url
self.image_processor = ImageProcessor()
def clean_html(self, content: str) -> str:
"""Clean and sanitize HTML content.
Args:
content: HTML content
Returns:
Cleaned HTML content
"""
soup = BeautifulSoup(content, 'html.parser')
# Remove script and style tags
for tag in soup(['script', 'style']):
tag.decompose()
# Remove dangerous attributes
dangerous_attrs = ['onclick', 'onload', 'onerror', 'onmouseover']
for tag in soup.find_all():
for attr in dangerous_attrs:
if attr in tag.attrs:
del tag.attrs[attr]
return str(soup)
def process_links(self, content: str) -> str:
"""Process links in HTML content.
Args:
content: HTML content
Returns:
Content with processed links
"""
if not self.base_url:
return content
soup = BeautifulSoup(content, 'html.parser')
for link in soup.find_all('a', href=True):
href = link['href']
# Skip if already absolute URL or anchor
if urlparse(href).netloc or href.startswith('#'):
continue
# Resolve relative URL
link['href'] = urljoin(self.base_url, href)
return str(soup)
def process_images(self, content: str) -> str:
"""Process images in HTML content.
Args:
content: HTML content
Returns:
Content with processed images
"""
soup = BeautifulSoup(content, 'html.parser')
for img in soup.find_all('img', src=True):
src = img['src']
# Process image URL
processed_src = self.image_processor.process_image_url(src, self.base_url)
img['src'] = processed_src
return str(soup)
def to_markdown(self, content: str) -> str:
"""Convert HTML to markdown.
Args:
content: HTML content
Returns:
Markdown content
"""
try:
import html2text
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = False
h.body_width = 0 # Don't wrap lines
return h.handle(content)
except ImportError:
logger.warning("html2text not available, using basic conversion")
# Basic HTML to text conversion
soup = BeautifulSoup(content, 'html.parser')
return soup.get_text()
def extract_text(self, content: str) -> str:
"""Extract plain text from HTML.
Args:
content: HTML content
Returns:
Plain text content
"""
soup = BeautifulSoup(content, 'html.parser')
return soup.get_text(separator=' ', strip=True)
class ContentEngine:
"""Main content transformation engine."""
def __init__(self, base_url: Optional[str] = None):
"""Initialize content engine.
Args:
base_url: Base URL for resolving relative links
"""
self.base_url = base_url
self.markdown_processor = MarkdownProcessor(base_url)
self.html_processor = HTMLProcessor(base_url)
self.image_processor = ImageProcessor()
self.code_processor = CodeBlockProcessor()
logger.info("Content engine initialized", base_url=base_url)
def detect_content_type(self, content: str) -> str:
"""Detect content type from content.
Args:
content: Content to analyze
Returns:
Detected content type
"""
content = content.strip()
# Check for HTML
if content.startswith('<') and content.endswith('>'):
return ContentType.HTML
# Check for JSON
if (content.startswith('{') and content.endswith('}')) or \
(content.startswith('[') and content.endswith(']')):
try:
import json
json.loads(content)
return ContentType.JSON
except:
pass
# Check for XML
if content.startswith('<?xml') or (content.startswith('<') and '>' in content):
return ContentType.XML
# Check for markdown indicators
markdown_indicators = ['#', '##', '```', '**', '*', '[', '](', '![', '|']
if any(indicator in content for indicator in markdown_indicators):
return ContentType.MARKDOWN
return ContentType.PLAIN_TEXT
def transform_content(
self,
content: str,
source_type: Optional[str] = None,
target_type: str = ContentType.HTML,
**options
) -> Dict[str, Any]:
"""Transform content from one type to another.
Args:
content: Source content
source_type: Source content type (auto-detected if None)
target_type: Target content type
**options: Additional transformation options
Returns:
Transformation result with content and metadata
"""
if not content:
return {'content': '', 'metadata': {}, 'source_type': ContentType.PLAIN_TEXT}
# Detect source type if not provided
if source_type is None:
source_type = self.detect_content_type(content)
logger.debug(
"Transforming content",
source_type=source_type,
target_type=target_type,
content_length=len(content),
)
result = {
'content': content,
'metadata': {},
'source_type': source_type,
'target_type': target_type,
}
try:
# Transform based on source and target types
if source_type == ContentType.MARKDOWN:
if target_type == ContentType.HTML:
result['content'] = self.markdown_processor.to_html(content)
result['metadata'] = self.markdown_processor.extract_metadata(content)
elif target_type == ContentType.PLAIN_TEXT:
html_content = self.markdown_processor.to_html(content)
result['content'] = self.html_processor.extract_text(html_content)
result['metadata'] = self.markdown_processor.extract_metadata(content)
elif source_type == ContentType.HTML:
if target_type == ContentType.MARKDOWN:
result['content'] = self.html_processor.to_markdown(content)
elif target_type == ContentType.PLAIN_TEXT:
result['content'] = self.html_processor.extract_text(content)
elif target_type == ContentType.HTML:
result['content'] = self.html_processor.clean_html(content)
result['content'] = self.html_processor.process_links(result['content'])
result['content'] = self.html_processor.process_images(result['content'])
elif source_type == ContentType.PLAIN_TEXT:
if target_type == ContentType.HTML:
# Convert plain text to HTML with basic formatting
lines = content.split('\n')
html_lines = [f'<p>{html.escape(line)}</p>' if line.strip() else '<br>' for line in lines]
result['content'] = '\n'.join(html_lines)
elif target_type == ContentType.MARKDOWN:
# Convert plain text to markdown with basic formatting
result['content'] = content # Plain text is valid markdown
# Add common metadata
result['metadata'].update({
'word_count': len(content.split()),
'character_count': len(content),
'line_count': len(content.split('\n')),
})
logger.debug(
"Content transformation completed",
source_type=source_type,
target_type=target_type,
output_length=len(result['content']),
)
except Exception as e:
logger.error(
"Content transformation failed",
source_type=source_type,
target_type=target_type,
error=str(e),
)
# Return original content on error
result['content'] = content
result['error'] = str(e)
return result
def process_document(
self,
content: str,
title: Optional[str] = None,
source_url: Optional[str] = None,
**metadata
) -> Dict[str, Any]:
"""Process a complete document.
Args:
content: Document content
title: Document title
source_url: Source URL
**metadata: Additional metadata
Returns:
Processed document with content and metadata
"""
# Transform content to HTML
result = self.transform_content(content, target_type=ContentType.HTML)
# Build document metadata
doc_metadata = {
'title': title,
'source_url': source_url,
'processed_at': None, # Will be set by caller
**metadata,
**result['metadata'],
}
return {
'content': result['content'],
'metadata': doc_metadata,
'source_type': result['source_type'],
'target_type': result['target_type'],
}
def extract_searchable_text(self, content: str, content_type: Optional[str] = None) -> str:
"""Extract searchable text from content.
Args:
content: Content to extract text from
content_type: Content type (auto-detected if None)
Returns:
Plain text suitable for search indexing
"""
if content_type is None:
content_type = self.detect_content_type(content)
if content_type == ContentType.HTML:
return self.html_processor.extract_text(content)
elif content_type == ContentType.MARKDOWN:
html_content = self.markdown_processor.to_html(content)
return self.html_processor.extract_text(html_content)
else:
return content
def get_content_summary(self, content: str, max_length: int = 200) -> str:
"""Get a summary of content.
Args:
content: Content to summarize
max_length: Maximum summary length
Returns:
Content summary
"""
text = self.extract_searchable_text(content)
if len(text) <= max_length:
return text
# Find a good break point near the max length
truncated = text[:max_length]
last_space = truncated.rfind(' ')
if last_space > max_length * 0.8: # If we found a space reasonably close
return truncated[:last_space] + '...'
else:
return truncated + '...'