Skip to main content
Glama

AnyDocs MCP Server

by funky1688
engine.pyโ€ข22.2 kB
#!/usr/bin/env python3 """ Content Transformation Engine Handles content transformation, processing, and formatting for various document types. """ import re import html import base64 import mimetypes from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urljoin, urlparse from pathlib import Path import structlog from markdown import Markdown from markdown.extensions import codehilite, tables, toc, fenced_code from bs4 import BeautifulSoup, Tag from PIL import Image import requests logger = structlog.get_logger(__name__) class ContentType: """Content type constants.""" MARKDOWN = "markdown" HTML = "html" PLAIN_TEXT = "text" JSON = "json" XML = "xml" CODE = "code" class ImageProcessor: """Image processing utilities.""" def __init__(self, max_size: Tuple[int, int] = (1200, 800), quality: int = 85): """Initialize image processor. Args: max_size: Maximum image dimensions (width, height) quality: JPEG quality (1-100) """ self.max_size = max_size self.quality = quality def process_image_url(self, url: str, base_url: Optional[str] = None) -> str: """Process image URL to absolute URL. Args: url: Image URL (relative or absolute) base_url: Base URL for resolving relative URLs Returns: Absolute image URL """ if not url: return url # If already absolute URL, return as-is if urlparse(url).netloc: return url # If base_url provided, resolve relative URL if base_url: return urljoin(base_url, url) return url def download_and_process_image(self, url: str) -> Optional[str]: """Download and process image, return base64 data URL. Args: url: Image URL to download Returns: Base64 data URL or None if processing fails """ try: response = requests.get(url, timeout=10) response.raise_for_status() # Open image with PIL image = Image.open(response.content) # Resize if necessary if image.size[0] > self.max_size[0] or image.size[1] > self.max_size[1]: image.thumbnail(self.max_size, Image.Resampling.LANCZOS) # Convert to RGB if necessary if image.mode in ('RGBA', 'P'): image = image.convert('RGB') # Save to bytes import io buffer = io.BytesIO() image.save(buffer, format='JPEG', quality=self.quality, optimize=True) # Create data URL image_data = base64.b64encode(buffer.getvalue()).decode() return f"data:image/jpeg;base64,{image_data}" except Exception as e: logger.warning("Failed to process image", url=url, error=str(e)) return None class CodeBlockProcessor: """Code block processing utilities.""" def __init__(self): """Initialize code block processor.""" self.language_aliases = { 'js': 'javascript', 'ts': 'typescript', 'py': 'python', 'rb': 'ruby', 'sh': 'bash', 'yml': 'yaml', } def normalize_language(self, language: str) -> str: """Normalize programming language name. Args: language: Language identifier Returns: Normalized language name """ if not language: return 'text' language = language.lower().strip() return self.language_aliases.get(language, language) def extract_code_blocks(self, content: str) -> List[Dict[str, str]]: """Extract code blocks from markdown content. Args: content: Markdown content Returns: List of code blocks with language and content """ code_blocks = [] # Pattern for fenced code blocks pattern = r'```(\w+)?\n([\s\S]*?)```' for match in re.finditer(pattern, content): language = match.group(1) or 'text' code_content = match.group(2).strip() code_blocks.append({ 'language': self.normalize_language(language), 'content': code_content, 'start_pos': match.start(), 'end_pos': match.end(), }) return code_blocks def highlight_code_block(self, code: str, language: str) -> str: """Apply syntax highlighting to code block. Args: code: Code content language: Programming language Returns: HTML with syntax highlighting """ try: from pygments import highlight from pygments.lexers import get_lexer_by_name from pygments.formatters import HtmlFormatter lexer = get_lexer_by_name(language, stripall=True) formatter = HtmlFormatter(cssclass='highlight') return highlight(code, lexer, formatter) except Exception as e: logger.warning("Failed to highlight code", language=language, error=str(e)) return f'<pre><code class="language-{language}">{html.escape(code)}</code></pre>' class MarkdownProcessor: """Markdown processing utilities.""" def __init__(self, base_url: Optional[str] = None): """Initialize markdown processor. Args: base_url: Base URL for resolving relative links """ self.base_url = base_url self.image_processor = ImageProcessor() self.code_processor = CodeBlockProcessor() # Configure markdown with extensions self.markdown = Markdown( extensions=[ 'codehilite', 'tables', 'toc', 'fenced_code', 'attr_list', 'def_list', 'footnotes', 'md_in_html', ], extension_configs={ 'codehilite': { 'css_class': 'highlight', 'use_pygments': True, }, 'toc': { 'permalink': True, 'permalink_class': 'toc-link', }, } ) def process_links(self, content: str) -> str: """Process and resolve links in content. Args: content: Content with links Returns: Content with processed links """ if not self.base_url: return content # Pattern for markdown links link_pattern = r'\[([^\]]+)\]\(([^\)]+)\)' def replace_link(match): text = match.group(1) url = match.group(2) # Skip if already absolute URL or anchor if urlparse(url).netloc or url.startswith('#'): return match.group(0) # Resolve relative URL absolute_url = urljoin(self.base_url, url) return f'[{text}]({absolute_url})' return re.sub(link_pattern, replace_link, content) def process_images(self, content: str) -> str: """Process images in markdown content. Args: content: Markdown content Returns: Content with processed images """ # Pattern for markdown images image_pattern = r'!\[([^\]]*)\]\(([^\)]+)\)' def replace_image(match): alt_text = match.group(1) url = match.group(2) # Process image URL processed_url = self.image_processor.process_image_url(url, self.base_url) return f'![{alt_text}]({processed_url})' return re.sub(image_pattern, replace_image, content) def to_html(self, content: str) -> str: """Convert markdown to HTML. Args: content: Markdown content Returns: HTML content """ # Process links and images processed_content = self.process_links(content) processed_content = self.process_images(processed_content) # Convert to HTML html_content = self.markdown.convert(processed_content) # Reset markdown instance for next use self.markdown.reset() return html_content def extract_metadata(self, content: str) -> Dict[str, Any]: """Extract metadata from markdown content. Args: content: Markdown content Returns: Extracted metadata """ metadata = {} # Extract front matter (YAML) front_matter_pattern = r'^---\n([\s\S]*?)\n---\n' match = re.match(front_matter_pattern, content) if match: try: import yaml yaml_content = match.group(1) metadata.update(yaml.safe_load(yaml_content)) except Exception as e: logger.warning("Failed to parse front matter", error=str(e)) # Extract headings headings = [] heading_pattern = r'^(#{1,6})\s+(.+)$' for match in re.finditer(heading_pattern, content, re.MULTILINE): level = len(match.group(1)) text = match.group(2).strip() headings.append({'level': level, 'text': text}) metadata['headings'] = headings # Extract code blocks code_blocks = self.code_processor.extract_code_blocks(content) metadata['code_blocks'] = code_blocks return metadata class HTMLProcessor: """HTML processing utilities.""" def __init__(self, base_url: Optional[str] = None): """Initialize HTML processor. Args: base_url: Base URL for resolving relative links """ self.base_url = base_url self.image_processor = ImageProcessor() def clean_html(self, content: str) -> str: """Clean and sanitize HTML content. Args: content: HTML content Returns: Cleaned HTML content """ soup = BeautifulSoup(content, 'html.parser') # Remove script and style tags for tag in soup(['script', 'style']): tag.decompose() # Remove dangerous attributes dangerous_attrs = ['onclick', 'onload', 'onerror', 'onmouseover'] for tag in soup.find_all(): for attr in dangerous_attrs: if attr in tag.attrs: del tag.attrs[attr] return str(soup) def process_links(self, content: str) -> str: """Process links in HTML content. Args: content: HTML content Returns: Content with processed links """ if not self.base_url: return content soup = BeautifulSoup(content, 'html.parser') for link in soup.find_all('a', href=True): href = link['href'] # Skip if already absolute URL or anchor if urlparse(href).netloc or href.startswith('#'): continue # Resolve relative URL link['href'] = urljoin(self.base_url, href) return str(soup) def process_images(self, content: str) -> str: """Process images in HTML content. Args: content: HTML content Returns: Content with processed images """ soup = BeautifulSoup(content, 'html.parser') for img in soup.find_all('img', src=True): src = img['src'] # Process image URL processed_src = self.image_processor.process_image_url(src, self.base_url) img['src'] = processed_src return str(soup) def to_markdown(self, content: str) -> str: """Convert HTML to markdown. Args: content: HTML content Returns: Markdown content """ try: import html2text h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = False h.body_width = 0 # Don't wrap lines return h.handle(content) except ImportError: logger.warning("html2text not available, using basic conversion") # Basic HTML to text conversion soup = BeautifulSoup(content, 'html.parser') return soup.get_text() def extract_text(self, content: str) -> str: """Extract plain text from HTML. Args: content: HTML content Returns: Plain text content """ soup = BeautifulSoup(content, 'html.parser') return soup.get_text(separator=' ', strip=True) class ContentEngine: """Main content transformation engine.""" def __init__(self, base_url: Optional[str] = None): """Initialize content engine. Args: base_url: Base URL for resolving relative links """ self.base_url = base_url self.markdown_processor = MarkdownProcessor(base_url) self.html_processor = HTMLProcessor(base_url) self.image_processor = ImageProcessor() self.code_processor = CodeBlockProcessor() logger.info("Content engine initialized", base_url=base_url) def detect_content_type(self, content: str) -> str: """Detect content type from content. Args: content: Content to analyze Returns: Detected content type """ content = content.strip() # Check for HTML if content.startswith('<') and content.endswith('>'): return ContentType.HTML # Check for JSON if (content.startswith('{') and content.endswith('}')) or \ (content.startswith('[') and content.endswith(']')): try: import json json.loads(content) return ContentType.JSON except: pass # Check for XML if content.startswith('<?xml') or (content.startswith('<') and '>' in content): return ContentType.XML # Check for markdown indicators markdown_indicators = ['#', '##', '```', '**', '*', '[', '](', '![', '|'] if any(indicator in content for indicator in markdown_indicators): return ContentType.MARKDOWN return ContentType.PLAIN_TEXT def transform_content( self, content: str, source_type: Optional[str] = None, target_type: str = ContentType.HTML, **options ) -> Dict[str, Any]: """Transform content from one type to another. Args: content: Source content source_type: Source content type (auto-detected if None) target_type: Target content type **options: Additional transformation options Returns: Transformation result with content and metadata """ if not content: return {'content': '', 'metadata': {}, 'source_type': ContentType.PLAIN_TEXT} # Detect source type if not provided if source_type is None: source_type = self.detect_content_type(content) logger.debug( "Transforming content", source_type=source_type, target_type=target_type, content_length=len(content), ) result = { 'content': content, 'metadata': {}, 'source_type': source_type, 'target_type': target_type, } try: # Transform based on source and target types if source_type == ContentType.MARKDOWN: if target_type == ContentType.HTML: result['content'] = self.markdown_processor.to_html(content) result['metadata'] = self.markdown_processor.extract_metadata(content) elif target_type == ContentType.PLAIN_TEXT: html_content = self.markdown_processor.to_html(content) result['content'] = self.html_processor.extract_text(html_content) result['metadata'] = self.markdown_processor.extract_metadata(content) elif source_type == ContentType.HTML: if target_type == ContentType.MARKDOWN: result['content'] = self.html_processor.to_markdown(content) elif target_type == ContentType.PLAIN_TEXT: result['content'] = self.html_processor.extract_text(content) elif target_type == ContentType.HTML: result['content'] = self.html_processor.clean_html(content) result['content'] = self.html_processor.process_links(result['content']) result['content'] = self.html_processor.process_images(result['content']) elif source_type == ContentType.PLAIN_TEXT: if target_type == ContentType.HTML: # Convert plain text to HTML with basic formatting lines = content.split('\n') html_lines = [f'<p>{html.escape(line)}</p>' if line.strip() else '<br>' for line in lines] result['content'] = '\n'.join(html_lines) elif target_type == ContentType.MARKDOWN: # Convert plain text to markdown with basic formatting result['content'] = content # Plain text is valid markdown # Add common metadata result['metadata'].update({ 'word_count': len(content.split()), 'character_count': len(content), 'line_count': len(content.split('\n')), }) logger.debug( "Content transformation completed", source_type=source_type, target_type=target_type, output_length=len(result['content']), ) except Exception as e: logger.error( "Content transformation failed", source_type=source_type, target_type=target_type, error=str(e), ) # Return original content on error result['content'] = content result['error'] = str(e) return result def process_document( self, content: str, title: Optional[str] = None, source_url: Optional[str] = None, **metadata ) -> Dict[str, Any]: """Process a complete document. Args: content: Document content title: Document title source_url: Source URL **metadata: Additional metadata Returns: Processed document with content and metadata """ # Transform content to HTML result = self.transform_content(content, target_type=ContentType.HTML) # Build document metadata doc_metadata = { 'title': title, 'source_url': source_url, 'processed_at': None, # Will be set by caller **metadata, **result['metadata'], } return { 'content': result['content'], 'metadata': doc_metadata, 'source_type': result['source_type'], 'target_type': result['target_type'], } def extract_searchable_text(self, content: str, content_type: Optional[str] = None) -> str: """Extract searchable text from content. Args: content: Content to extract text from content_type: Content type (auto-detected if None) Returns: Plain text suitable for search indexing """ if content_type is None: content_type = self.detect_content_type(content) if content_type == ContentType.HTML: return self.html_processor.extract_text(content) elif content_type == ContentType.MARKDOWN: html_content = self.markdown_processor.to_html(content) return self.html_processor.extract_text(html_content) else: return content def get_content_summary(self, content: str, max_length: int = 200) -> str: """Get a summary of content. Args: content: Content to summarize max_length: Maximum summary length Returns: Content summary """ text = self.extract_searchable_text(content) if len(text) <= max_length: return text # Find a good break point near the max length truncated = text[:max_length] last_space = truncated.rfind(' ') if last_space > max_length * 0.8: # If we found a space reasonably close return truncated[:last_space] + '...' else: return truncated + '...'

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server