file_based_element_cloner.pyβ’26.9 kB
import asyncio
import json
import os
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
try:
    from .debug_logger import debug_logger
except ImportError:
    from debug_logger import debug_logger
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
from comprehensive_element_cloner import ComprehensiveElementCloner
from element_cloner import element_cloner
class FileBasedElementCloner:
    """Element cloner that saves data to files and returns file paths."""
    def __init__(self, output_dir: str = "element_clones"):
        """
        Initialize with output directory for clone files.
        Args:
            output_dir (str): Directory to save clone files.
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.comprehensive_cloner = ComprehensiveElementCloner()
    
    def _safe_process_framework_handlers(self, framework_handlers):
        """Safely process framework handlers that might be dict or list."""
        if isinstance(framework_handlers, dict):
            return {k: len(v) if isinstance(v, list) else str(v) for k, v in framework_handlers.items()}
        elif isinstance(framework_handlers, list):
            return {"handlers": len(framework_handlers)}
        else:
            return {"value": str(framework_handlers)}
    def _generate_filename(self, prefix: str, extension: str = "json") -> str:
        """
        Generate unique filename with timestamp.
        Args:
            prefix (str): Prefix for the filename.
            extension (str): File extension.
        Returns:
            str: Generated filename.
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        unique_id = str(uuid.uuid4())[:8]
        return f"{prefix}_{timestamp}_{unique_id}.{extension}"
    async def extract_element_styles_to_file(
        self,
        tab,
        selector: str,
        include_computed: bool = True,
        include_css_rules: bool = True,
        include_pseudo: bool = True,
        include_inheritance: bool = False
    ) -> Dict[str, Any]:
        """
        Extract element styles and save to file, returning file path.
        Args:
            tab: Browser tab instance
            selector (str): CSS selector for the element
            include_computed (bool): Include computed styles
            include_css_rules (bool): Include matching CSS rules
            include_pseudo (bool): Include pseudo-element styles
            include_inheritance (bool): Include style inheritance chain
        Returns:
            Dict[str, Any]: File path and summary of extracted styles
        """
        try:
            debug_logger.log_info("file_element_cloner", "extract_styles_to_file",
                                f"Starting style extraction for selector: {selector}")
            
            # Extract styles using element_cloner
            style_data = await element_cloner.extract_element_styles(
                tab,
                selector=selector,
                include_computed=include_computed,
                include_css_rules=include_css_rules,
                include_pseudo=include_pseudo,
                include_inheritance=include_inheritance
            )
            
            # Generate filename and save
            filename = self._generate_filename("styles")
            file_path = self._save_to_file(style_data, filename)
            
            # Create summary
            summary = {
                "file_path": str(file_path),
                "extraction_type": "styles",
                "selector": selector,
                "url": getattr(tab, 'url', 'unknown'),
                "components": {
                    "computed_styles_count": len(style_data.get('computed_styles', {})),
                    "css_rules_count": len(style_data.get('css_rules', [])),
                    "pseudo_elements_count": len(style_data.get('pseudo_elements', {})),
                    "custom_properties_count": len(style_data.get('custom_properties', {}))
                }
            }
            
            debug_logger.log_info("file_element_cloner", "extract_styles_to_file",
                                f"Styles saved to {file_path}")
            return summary
            
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_styles_to_file", e)
            return {"error": str(e)}
    def _save_to_file(self, data: Dict[str, Any], filename: str) -> str:
        """
        Save data to file and return absolute path.
        Args:
            data (Dict[str, Any]): Data to save.
            filename (str): Name of the file.
        Returns:
            str: Absolute path to the saved file.
        """
        file_path = self.output_dir / filename
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        return str(file_path.absolute())
    async def extract_complete_element_to_file(
        self,
        tab,
        selector: str,
        include_children: bool = True
    ) -> Dict[str, Any]:
        """
        Extract complete element using working comprehensive cloner and save to file.
        Args:
            tab: Browser tab object.
            selector (str): CSS selector for the element.
            include_children (bool): Whether to include children.
        Returns:
            Dict[str, Any]: Summary of extraction and file path.
        """
        try:
            complete_data = await self.comprehensive_cloner.extract_complete_element(
                tab, selector, include_children
            )
            complete_data['_metadata'] = {
                'extraction_type': 'complete_comprehensive',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'include_children': include_children
            }
            filename = self._generate_filename("complete_comprehensive")
            file_path = self._save_to_file(complete_data, filename)
            debug_logger.log_info("file_element_cloner", "extract_complete_to_file",
                                  f"Saved complete element data to {file_path}")
            summary = {
                "file_path": file_path,
                "extraction_type": "complete_comprehensive",
                "selector": selector,
                "url": complete_data.get('url', 'unknown'),
                "summary": {
                    "tag_name": complete_data.get('html', {}).get('tagName', 'unknown'),
                    "computed_styles_count": len(complete_data.get('styles', {})),
                    "attributes_count": len(complete_data.get('html', {}).get('attributes', [])),
                    "event_listeners_count": len(complete_data.get('eventListeners', [])),
                    "children_count": len(complete_data.get('children', [])) if include_children else 0,
                    "has_pseudo_elements": bool(complete_data.get('pseudoElements')),
                    "css_rules_count": len(complete_data.get('cssRules', [])),
                    "animations_count": len(complete_data.get('animations', [])),
                    "file_size_kb": round(len(json.dumps(complete_data)) / 1024, 2)
                }
            }
            return summary
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_complete_to_file", e)
            return {"error": str(e)}
    async def extract_element_structure_to_file(
        self,
        tab,
        element=None,
        selector: str = None,
        include_children: bool = False,
        include_attributes: bool = True,
        include_data_attributes: bool = True,
        max_depth: int = 3
    ) -> Dict[str, str]:
        """
        Extract structure and save to file, return file path.
        Args:
            tab: Browser tab object.
            element: DOM element object.
            selector (str): CSS selector for the element.
            include_children (bool): Whether to include children.
            include_attributes (bool): Whether to include attributes.
            include_data_attributes (bool): Whether to include data attributes.
            max_depth (int): Maximum depth for extraction.
        Returns:
            Dict[str, str]: Summary of extraction and file path.
        """
        try:
            structure_data = await element_cloner.extract_element_structure(
                tab, element, selector, include_children,
                include_attributes, include_data_attributes, max_depth
            )
            structure_data['_metadata'] = {
                'extraction_type': 'structure',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'options': {
                    'include_children': include_children,
                    'include_attributes': include_attributes,
                    'include_data_attributes': include_data_attributes,
                    'max_depth': max_depth
                }
            }
            filename = self._generate_filename("structure")
            file_path = self._save_to_file(structure_data, filename)
            debug_logger.log_info("file_element_cloner", "extract_structure_to_file",
                                  f"Saved structure data to {file_path}")
            return {
                "file_path": file_path,
                "extraction_type": "structure",
                "selector": selector,
                "summary": {
                    "tag_name": structure_data.get('tag_name'),
                    "attributes_count": len(structure_data.get('attributes', {})),
                    "data_attributes_count": len(structure_data.get('data_attributes', {})),
                    "children_count": len(structure_data.get('children', [])),
                    "dom_path": structure_data.get('dom_path')
                }
            }
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_structure_to_file", e)
            return {"error": str(e)}
    async def extract_element_events_to_file(
        self,
        tab,
        element=None,
        selector: str = None,
        include_inline: bool = True,
        include_listeners: bool = True,
        include_framework: bool = True,
        analyze_handlers: bool = True
    ) -> Dict[str, str]:
        """
        Extract events and save to file, return file path.
        Args:
            tab: Browser tab object.
            element: DOM element object.
            selector (str): CSS selector for the element.
            include_inline (bool): Include inline event handlers.
            include_listeners (bool): Include event listeners.
            include_framework (bool): Include framework event handlers.
            analyze_handlers (bool): Analyze event handlers.
        Returns:
            Dict[str, str]: Summary of extraction and file path.
        """
        try:
            event_data = await element_cloner.extract_element_events(
                tab, element, selector, include_inline,
                include_listeners, include_framework, analyze_handlers
            )
            event_data['_metadata'] = {
                'extraction_type': 'events',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'options': {
                    'include_inline': include_inline,
                    'include_listeners': include_listeners,
                    'include_framework': include_framework,
                    'analyze_handlers': analyze_handlers
                }
            }
            filename = self._generate_filename("events")
            file_path = self._save_to_file(event_data, filename)
            debug_logger.log_info("file_element_cloner", "extract_events_to_file",
                                  f"Saved events data to {file_path}")
            return {
                "file_path": file_path,
                "extraction_type": "events",
                "selector": selector,
                "summary": {
                    "inline_handlers_count": len(event_data.get('inline_handlers', [])),
                    "event_listeners_count": len(event_data.get('event_listeners', [])),
                    "detected_frameworks": event_data.get('detected_frameworks', []),
                    "framework_handlers": self._safe_process_framework_handlers(event_data.get('framework_handlers', {}))
                }
            }
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_events_to_file", e)
            return {"error": str(e)}
    async def extract_element_animations_to_file(
        self,
        tab,
        element=None,
        selector: str = None,
        include_css_animations: bool = True,
        include_transitions: bool = True,
        include_transforms: bool = True,
        analyze_keyframes: bool = True
    ) -> Dict[str, str]:
        """
        Extract animations and save to file, return file path.
        Args:
            tab: Browser tab object.
            element: DOM element object.
            selector (str): CSS selector for the element.
            include_css_animations (bool): Include CSS animations.
            include_transitions (bool): Include transitions.
            include_transforms (bool): Include transforms.
            analyze_keyframes (bool): Analyze keyframes.
        Returns:
            Dict[str, str]: Summary of extraction and file path.
        """
        try:
            animation_data = await element_cloner.extract_element_animations(
                tab, element, selector, include_css_animations,
                include_transitions, include_transforms, analyze_keyframes
            )
            animation_data['_metadata'] = {
                'extraction_type': 'animations',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'options': {
                    'include_css_animations': include_css_animations,
                    'include_transitions': include_transitions,
                    'include_transforms': include_transforms,
                    'analyze_keyframes': analyze_keyframes
                }
            }
            filename = self._generate_filename("animations")
            file_path = self._save_to_file(animation_data, filename)
            debug_logger.log_info("file_element_cloner", "extract_animations_to_file",
                                  f"Saved animations data to {file_path}")
            return {
                "file_path": file_path,
                "extraction_type": "animations",
                "selector": selector,
                "summary": {
                    "has_animations": animation_data.get('animations', {}).get('animation_name', 'none') != 'none',
                    "has_transitions": animation_data.get('transitions', {}).get('transition_property', 'none') != 'none',
                    "has_transforms": animation_data.get('transforms', {}).get('transform', 'none') != 'none',
                    "keyframes_count": len(animation_data.get('keyframes', []))
                }
            }
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_animations_to_file", e)
            return {"error": str(e)}
    async def extract_element_assets_to_file(
        self,
        tab,
        element=None,
        selector: str = None,
        include_images: bool = True,
        include_backgrounds: bool = True,
        include_fonts: bool = True,
        fetch_external: bool = False
    ) -> Dict[str, str]:
        """
        Extract assets and save to file, return file path.
        Args:
            tab: Browser tab object.
            element: DOM element object.
            selector (str): CSS selector for the element.
            include_images (bool): Include images.
            include_backgrounds (bool): Include background images.
            include_fonts (bool): Include fonts.
            fetch_external (bool): Fetch external assets.
        Returns:
            Dict[str, str]: Summary of extraction and file path.
        """
        try:
            asset_data = await element_cloner.extract_element_assets(
                tab, element, selector, include_images,
                include_backgrounds, include_fonts, fetch_external
            )
            asset_data['_metadata'] = {
                'extraction_type': 'assets',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'options': {
                    'include_images': include_images,
                    'include_backgrounds': include_backgrounds,
                    'include_fonts': include_fonts,
                    'fetch_external': fetch_external
                }
            }
            filename = self._generate_filename("assets")
            file_path = self._save_to_file(asset_data, filename)
            debug_logger.log_info("file_element_cloner", "extract_assets_to_file",
                                  f"Saved assets data to {file_path}")
            return {
                "file_path": file_path,
                "extraction_type": "assets",
                "selector": selector,
                "summary": {
                    "images_count": len(asset_data.get('images', [])),
                    "background_images_count": len(asset_data.get('background_images', [])),
                    "font_family": asset_data.get('fonts', {}).get('family'),
                    "custom_fonts_count": len(asset_data.get('fonts', {}).get('custom_fonts', [])),
                    "icons_count": len(asset_data.get('icons', [])),
                    "videos_count": len(asset_data.get('videos', [])),
                    "audio_count": len(asset_data.get('audio', []))
                }
            }
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_assets_to_file", e)
            return {"error": str(e)}
    async def extract_related_files_to_file(
        self,
        tab,
        element=None,
        selector: str = None,
        analyze_css: bool = True,
        analyze_js: bool = True,
        follow_imports: bool = False,
        max_depth: int = 2
    ) -> Dict[str, str]:
        """
        Extract related files and save to file, return file path.
        Args:
            tab: Browser tab object.
            element: DOM element object.
            selector (str): CSS selector for the element.
            analyze_css (bool): Analyze CSS files.
            analyze_js (bool): Analyze JS files.
            follow_imports (bool): Follow imports.
            max_depth (int): Maximum depth for import following.
        Returns:
            Dict[str, str]: Summary of extraction and file path.
        """
        try:
            file_data = await element_cloner.extract_related_files(
                tab, element, selector, analyze_css, analyze_js, follow_imports, max_depth
            )
            file_data['_metadata'] = {
                'extraction_type': 'related_files',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'options': {
                    'analyze_css': analyze_css,
                    'analyze_js': analyze_js,
                    'follow_imports': follow_imports,
                    'max_depth': max_depth
                }
            }
            filename = self._generate_filename("related_files")
            file_path = self._save_to_file(file_data, filename)
            debug_logger.log_info("file_element_cloner", "extract_related_files_to_file",
                                  f"Saved related files data to {file_path}")
            return {
                "file_path": file_path,
                "extraction_type": "related_files",
                "selector": selector,
                "summary": {
                    "stylesheets_count": len(file_data.get('stylesheets', [])),
                    "scripts_count": len(file_data.get('scripts', [])),
                    "imports_count": len(file_data.get('imports', [])),
                    "modules_count": len(file_data.get('modules', []))
                }
            }
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "extract_related_files_to_file", e)
            return {"error": str(e)}
    async def clone_element_complete_to_file(
        self,
        tab,
        element=None,
        selector: str = None,
        extraction_options: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """
        Master function that extracts all element data and saves to file.
        Returns file path instead of full data.
        Args:
            tab: Browser tab object.
            element: DOM element object.
            selector (str): CSS selector for the element.
            extraction_options (Dict[str, Any]): Extraction options.
        Returns:
            Dict[str, Any]: Summary of extraction and file path.
        """
        try:
            complete_data = await element_cloner.clone_element_complete(
                tab, element, selector, extraction_options
            )
            if 'error' in complete_data:
                return complete_data
            complete_data['_metadata'] = {
                'extraction_type': 'complete_clone',
                'selector': selector,
                'timestamp': datetime.now().isoformat(),
                'extraction_options': extraction_options
            }
            filename = self._generate_filename("complete_clone")
            file_path = self._save_to_file(complete_data, filename)
            summary = {
                "file_path": file_path,
                "extraction_type": "complete_clone",
                "selector": selector,
                "url": complete_data.get('url'),
                "components": {}
            }
            if 'styles' in complete_data:
                styles = complete_data['styles']
                summary['components']['styles'] = {
                    'computed_styles_count': len(styles.get('computed_styles', {})),
                    'css_rules_count': len(styles.get('css_rules', [])),
                    'pseudo_elements_count': len(styles.get('pseudo_elements', {}))
                }
            if 'structure' in complete_data:
                structure = complete_data['structure']
                summary['components']['structure'] = {
                    'tag_name': structure.get('tag_name'),
                    'attributes_count': len(structure.get('attributes', {})),
                    'children_count': len(structure.get('children', []))
                }
            if 'events' in complete_data:
                events = complete_data['events']
                summary['components']['events'] = {
                    'inline_handlers_count': len(events.get('inline_handlers', [])),
                    'detected_frameworks': events.get('detected_frameworks', [])
                }
            if 'animations' in complete_data:
                animations = complete_data['animations']
                summary['components']['animations'] = {
                    'has_animations': animations.get('animations', {}).get('animation_name', 'none') != 'none',
                    'keyframes_count': len(animations.get('keyframes', []))
                }
            if 'assets' in complete_data:
                assets = complete_data['assets']
                summary['components']['assets'] = {
                    'images_count': len(assets.get('images', [])),
                    'background_images_count': len(assets.get('background_images', []))
                }
            if 'related_files' in complete_data:
                files = complete_data['related_files']
                summary['components']['related_files'] = {
                    'stylesheets_count': len(files.get('stylesheets', [])),
                    'scripts_count': len(files.get('scripts', []))
                }
            debug_logger.log_info("file_element_cloner", "clone_complete_to_file",
                                  f"Saved complete clone data to {file_path}")
            return summary
        except Exception as e:
            debug_logger.log_error("file_element_cloner", "clone_complete_to_file", e)
            return {"error": str(e)}
    def list_clone_files(self) -> List[Dict[str, Any]]:
        """
        List all clone files in the output directory.
        Returns:
            List[Dict[str, Any]]: List of file info dictionaries.
        """
        files = []
        for file_path in self.output_dir.glob("*.json"):
            try:
                file_info = {
                    "file_path": str(file_path.absolute()),
                    "filename": file_path.name,
                    "size": file_path.stat().st_size,
                    "created": datetime.fromtimestamp(file_path.stat().st_ctime).isoformat(),
                    "modified": datetime.fromtimestamp(file_path.stat().st_mtime).isoformat()
                }
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        data = json.load(f)
                        if '_metadata' in data:
                            file_info['metadata'] = data['_metadata']
                except:
                    pass
                files.append(file_info)
            except Exception as e:
                debug_logger.log_warning("file_element_cloner", "list_files", f"Error reading {file_path}: {e}")
        files.sort(key=lambda x: x['created'], reverse=True)
        return files
    def cleanup_old_files(self, max_age_hours: int = 24) -> int:
        """
        Clean up clone files older than specified hours.
        Args:
            max_age_hours (int): Maximum age of files in hours.
        Returns:
            int: Number of deleted files.
        """
        import time
        cutoff_time = time.time() - (max_age_hours * 3600)
        deleted_count = 0
        for file_path in self.output_dir.glob("*.json"):
            try:
                if file_path.stat().st_ctime < cutoff_time:
                    file_path.unlink()
                    deleted_count += 1
                    debug_logger.log_info("file_element_cloner", "cleanup", f"Deleted old file: {file_path.name}")
            except Exception as e:
                debug_logger.log_warning("file_element_cloner", "cleanup", f"Error deleting {file_path}: {e}")
        return deleted_count
file_based_element_cloner = FileBasedElementCloner()