Skip to main content
Glama
talknerdytome-labs

Facebook Ads Library MCP Server

media_cache_service.py23.8 kB
import sqlite3 import hashlib import os import json import time from pathlib import Path from typing import Dict, Any, Optional, List import logging logger = logging.getLogger(__name__) # Cache configuration CACHE_DIR = Path.home() / ".cache" / "facebook-ads-mcp" CACHE_DB_PATH = CACHE_DIR / "media_cache.db" CACHE_IMAGES_DIR = CACHE_DIR / "images" CACHE_VIDEOS_DIR = CACHE_DIR / "videos" MAX_CACHE_SIZE_GB = 10 # Maximum cache size in GB (increased for videos) MAX_CACHE_AGE_DAYS = 30 # Auto-cleanup media older than this class MediaCacheService: """Service for managing cached ad images, videos and analysis results.""" def __init__(self): self._ensure_cache_directory() self._init_database() def _ensure_cache_directory(self): """Create cache directories if they don't exist.""" CACHE_DIR.mkdir(parents=True, exist_ok=True) CACHE_IMAGES_DIR.mkdir(parents=True, exist_ok=True) CACHE_VIDEOS_DIR.mkdir(parents=True, exist_ok=True) logger.info(f"Cache directory initialized at {CACHE_DIR}") def _init_database(self): """Initialize SQLite database with required schema.""" with sqlite3.connect(CACHE_DB_PATH) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS media_cache ( url_hash TEXT PRIMARY KEY, original_url TEXT NOT NULL, file_path TEXT NOT NULL, downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP, file_size INTEGER, content_type TEXT, media_type TEXT NOT NULL DEFAULT 'image', -- 'image' or 'video' -- Ad metadata brand_name TEXT, ad_id TEXT, campaign_period TEXT, -- Analysis results (cached) analysis_results TEXT, -- JSON string analysis_cached_at TIMESTAMP, -- Quick lookup fields dominant_colors TEXT, has_people BOOLEAN, text_elements TEXT, media_format TEXT, -- Video-specific fields duration_seconds REAL, has_audio BOOLEAN ) """) # Create indexes for fast lookups conn.execute("CREATE INDEX IF NOT EXISTS idx_brand_name ON media_cache(brand_name)") conn.execute("CREATE INDEX IF NOT EXISTS idx_ad_id ON media_cache(ad_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_last_accessed ON media_cache(last_accessed)") conn.execute("CREATE INDEX IF NOT EXISTS idx_has_people ON media_cache(has_people)") conn.execute("CREATE INDEX IF NOT EXISTS idx_dominant_colors ON media_cache(dominant_colors)") conn.execute("CREATE INDEX IF NOT EXISTS idx_media_type ON media_cache(media_type)") conn.commit() logger.info("Database schema initialized") def _generate_url_hash(self, url: str) -> str: """Generate a consistent hash for the URL.""" return hashlib.md5(url.encode()).hexdigest() def _get_file_path(self, url_hash: str, content_type: str, media_type: str = 'image') -> Path: """Generate file path for cached media.""" # Determine file extension from content type image_ext_map = { 'image/jpeg': '.jpg', 'image/jpg': '.jpg', 'image/png': '.png', 'image/gif': '.gif', 'image/webp': '.webp' } video_ext_map = { 'video/mp4': '.mp4', 'video/quicktime': '.mov', 'video/webm': '.webm', 'video/x-msvideo': '.avi', 'video/3gpp': '.3gp' } if media_type == 'video': ext = video_ext_map.get(content_type.lower(), '.mp4') return CACHE_VIDEOS_DIR / f"{url_hash}{ext}" else: ext = image_ext_map.get(content_type.lower(), '.jpg') return CACHE_IMAGES_DIR / f"{url_hash}{ext}" def get_cached_media(self, url: str, media_type: str = None) -> Optional[Dict[str, Any]]: """ Retrieve cached media data and metadata. Args: url: Original media URL media_type: Optional filter by media type ('image' or 'video') Returns: Dictionary with cached data or None if not found """ url_hash = self._generate_url_hash(url) with sqlite3.connect(CACHE_DB_PATH) as conn: conn.row_factory = sqlite3.Row query = "SELECT * FROM media_cache WHERE url_hash = ?" params = [url_hash] if media_type: query += " AND media_type = ?" params.append(media_type) cursor = conn.execute(query, params) row = cursor.fetchone() if not row: return None # Check if file still exists file_path = Path(row['file_path']) if not file_path.exists(): # File was deleted, remove from database conn.execute("DELETE FROM media_cache WHERE url_hash = ?", (url_hash,)) conn.commit() logger.warning(f"Cached file missing, removed from database: {file_path}") return None # Update last accessed time conn.execute(""" UPDATE media_cache SET last_accessed = CURRENT_TIMESTAMP WHERE url_hash = ? """, (url_hash,)) conn.commit() # Convert row to dictionary result = dict(row) # Parse JSON analysis results if available if result['analysis_results']: try: result['analysis_results'] = json.loads(result['analysis_results']) except json.JSONDecodeError: result['analysis_results'] = None logger.info(f"Cache hit for URL: {url}") return result # Backward compatibility method def get_cached_image(self, url: str) -> Optional[Dict[str, Any]]: """Retrieve cached image data (backward compatibility).""" return self.get_cached_media(url, media_type='image') def cache_media(self, url: str, media_data: bytes, content_type: str, media_type: str = 'image', brand_name: str = None, ad_id: str = None, analysis_results: Dict[str, Any] = None, duration_seconds: float = None, has_audio: bool = None) -> str: """ Cache media data and metadata. Args: url: Original media URL media_data: Raw media bytes content_type: MIME type of the media media_type: Type of media ('image' or 'video') brand_name: Optional brand name for metadata ad_id: Optional ad ID for metadata analysis_results: Optional analysis results to cache duration_seconds: Optional video duration has_audio: Optional audio presence flag Returns: File path where media was cached """ url_hash = self._generate_url_hash(url) file_path = self._get_file_path(url_hash, content_type, media_type) # Save media file file_path.write_bytes(media_data) # Prepare analysis results for storage analysis_json = None analysis_cached_at = None if analysis_results: analysis_json = json.dumps(analysis_results) analysis_cached_at = time.time() # Save metadata to database with sqlite3.connect(CACHE_DB_PATH) as conn: conn.execute(""" INSERT OR REPLACE INTO media_cache ( url_hash, original_url, file_path, file_size, content_type, media_type, brand_name, ad_id, analysis_results, analysis_cached_at, duration_seconds, has_audio ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( url_hash, url, str(file_path), len(media_data), content_type, media_type, brand_name, ad_id, analysis_json, analysis_cached_at, duration_seconds, has_audio )) conn.commit() logger.info(f"Cached {media_type}: {url} -> {file_path}") return str(file_path) # Backward compatibility method def cache_image(self, url: str, image_data: bytes, content_type: str, brand_name: str = None, ad_id: str = None, analysis_results: Dict[str, Any] = None) -> str: """Cache image data (backward compatibility).""" return self.cache_media(url, image_data, content_type, 'image', brand_name, ad_id, analysis_results) def update_analysis_results(self, url: str, analysis_results: Dict[str, Any]): """ Update cached analysis results for media. Args: url: Original media URL analysis_results: Analysis results to cache """ url_hash = self._generate_url_hash(url) analysis_json = json.dumps(analysis_results) # Extract quick lookup fields from analysis dominant_colors = self._extract_dominant_colors(analysis_results) has_people = self._extract_has_people(analysis_results) text_elements = self._extract_text_elements(analysis_results) with sqlite3.connect(CACHE_DB_PATH) as conn: conn.execute(""" UPDATE media_cache SET analysis_results = ?, analysis_cached_at = CURRENT_TIMESTAMP, dominant_colors = ?, has_people = ?, text_elements = ? WHERE url_hash = ? """, (analysis_json, dominant_colors, has_people, text_elements, url_hash)) conn.commit() logger.info(f"Updated analysis results for: {url}") def _extract_dominant_colors(self, analysis: Dict[str, Any]) -> str: """Extract dominant colors from analysis for quick lookup.""" try: colors = analysis.get('colors', {}).get('dominant_colors', []) return ','.join(colors) if colors else None except: return None def _extract_has_people(self, analysis: Dict[str, Any]) -> bool: """Extract whether image has people for quick lookup.""" try: people_desc = analysis.get('people_description', '') return bool(people_desc and people_desc.strip()) except: return False def _extract_text_elements(self, analysis: Dict[str, Any]) -> str: """Extract text elements for quick lookup.""" try: text_elements = analysis.get('text_elements', {}) all_text = [] for category, texts in text_elements.items(): if isinstance(texts, list): all_text.extend(texts) elif isinstance(texts, str): all_text.append(texts) return ' | '.join(all_text) if all_text else None except: return None def cleanup_old_cache(self, max_age_days: int = MAX_CACHE_AGE_DAYS): """ Remove old cached media files and database entries. Args: max_age_days: Maximum age in days before cleanup """ cutoff_time = time.time() - (max_age_days * 24 * 60 * 60) with sqlite3.connect(CACHE_DB_PATH) as conn: # Get files to delete cursor = conn.execute(""" SELECT file_path, media_type FROM media_cache WHERE downloaded_at < datetime(?, 'unixepoch') """, (cutoff_time,)) files_to_delete = cursor.fetchall() # Delete files deleted_images = 0 deleted_videos = 0 for file_path, media_type in files_to_delete: try: Path(file_path).unlink(missing_ok=True) if media_type == 'video': deleted_videos += 1 else: deleted_images += 1 except Exception as e: logger.warning(f"Failed to delete cached file {file_path}: {e}") # Remove database entries conn.execute(""" DELETE FROM media_cache WHERE downloaded_at < datetime(?, 'unixepoch') """, (cutoff_time,)) conn.commit() logger.info(f"Cleanup completed: removed {deleted_images} images and {deleted_videos} videos") def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics for both images and videos.""" with sqlite3.connect(CACHE_DB_PATH) as conn: conn.row_factory = sqlite3.Row # Overall stats cursor = conn.execute(""" SELECT COUNT(*) as total_files, SUM(file_size) as total_size_bytes, COUNT(CASE WHEN analysis_results IS NOT NULL THEN 1 END) as analyzed_files, COUNT(DISTINCT brand_name) as unique_brands FROM media_cache """) stats = dict(cursor.fetchone()) if cursor.fetchone() else {} # Image-specific stats cursor = conn.execute(""" SELECT COUNT(*) as total_images, SUM(file_size) as images_size_bytes, COUNT(CASE WHEN analysis_results IS NOT NULL THEN 1 END) as analyzed_images FROM media_cache WHERE media_type = 'image' """) image_stats = dict(cursor.fetchone()) if cursor.fetchone() else {} # Video-specific stats cursor = conn.execute(""" SELECT COUNT(*) as total_videos, SUM(file_size) as videos_size_bytes, COUNT(CASE WHEN analysis_results IS NOT NULL THEN 1 END) as analyzed_videos, AVG(duration_seconds) as avg_duration_seconds FROM media_cache WHERE media_type = 'video' """) video_stats = dict(cursor.fetchone()) if cursor.fetchone() else {} # Combine stats combined_stats = {**stats, **image_stats, **video_stats} # Convert to more readable format combined_stats['total_size_mb'] = round(combined_stats.get('total_size_bytes', 0) / (1024 * 1024), 2) combined_stats['total_size_gb'] = round(combined_stats['total_size_mb'] / 1024, 2) combined_stats['images_size_mb'] = round(combined_stats.get('images_size_bytes', 0) / (1024 * 1024), 2) combined_stats['videos_size_mb'] = round(combined_stats.get('videos_size_bytes', 0) / (1024 * 1024), 2) return combined_stats def search_cached_media(self, brand_name: str = None, has_people: bool = None, color_contains: str = None, media_type: str = None) -> List[Dict[str, Any]]: """ Search cached media by criteria. Args: brand_name: Filter by brand name has_people: Filter by presence of people color_contains: Filter by color (partial match) media_type: Filter by media type ('image' or 'video') Returns: List of matching cached media records """ query = "SELECT * FROM media_cache WHERE 1=1" params = [] if brand_name: query += " AND brand_name = ?" params.append(brand_name) if has_people is not None: query += " AND has_people = ?" params.append(has_people) if color_contains: query += " AND dominant_colors LIKE ?" params.append(f"%{color_contains}%") if media_type: query += " AND media_type = ?" params.append(media_type) query += " ORDER BY last_accessed DESC" with sqlite3.connect(CACHE_DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute(query, params) results = [] for row in cursor.fetchall(): result = dict(row) if result['analysis_results']: try: result['analysis_results'] = json.loads(result['analysis_results']) except json.JSONDecodeError: result['analysis_results'] = None results.append(result) return results # Backward compatibility method def search_cached_images(self, brand_name: str = None, has_people: bool = None, color_contains: str = None) -> List[Dict[str, Any]]: """Search cached images by criteria (backward compatibility).""" return self.search_cached_media(brand_name, has_people, color_contains, 'image') def get_cached_media_batch(self, urls: List[str], media_type: str = None) -> Dict[str, Optional[Dict[str, Any]]]: """ Retrieve cached media data for multiple URLs in a single database query. Args: urls: List of media URLs to look up media_type: Optional filter by media type ('image' or 'video') Returns: Dictionary mapping URLs to their cached data (None if not found) """ if not urls: return {} # Generate URL hashes for batch lookup url_hash_map = {self._generate_url_hash(url): url for url in urls} with sqlite3.connect(CACHE_DB_PATH) as conn: conn.row_factory = sqlite3.Row # Build query with placeholders for all hashes placeholders = ','.join(['?' for _ in url_hash_map.keys()]) query = f"SELECT * FROM media_cache WHERE url_hash IN ({placeholders})" params = list(url_hash_map.keys()) if media_type: query += " AND media_type = ?" params.append(media_type) cursor = conn.execute(query, params) # Initialize results with None for all URLs results = {url: None for url in urls} for row in cursor.fetchall(): original_url = url_hash_map[row['url_hash']] # Check if file still exists file_path = Path(row['file_path']) if not file_path.exists(): # File was deleted, remove from database conn.execute("DELETE FROM media_cache WHERE url_hash = ?", (row['url_hash'],)) logger.warning(f"Cached file missing, removed from database: {file_path}") continue # Update last accessed time conn.execute(""" UPDATE media_cache SET last_accessed = CURRENT_TIMESTAMP WHERE url_hash = ? """, (row['url_hash'],)) # Convert row to dictionary result = dict(row) # Parse JSON analysis results if available if result['analysis_results']: try: result['analysis_results'] = json.loads(result['analysis_results']) except json.JSONDecodeError: result['analysis_results'] = None results[original_url] = result conn.commit() logger.info(f"Batch cache lookup: {sum(1 for v in results.values() if v is not None)}/{len(urls)} cache hits") return results def cache_media_batch(self, media_data_list: List[Dict[str, Any]]) -> List[str]: """ Cache multiple media files and metadata in batch operations. Args: media_data_list: List of dictionaries containing: - url: Media URL - media_data: Raw media bytes - content_type: MIME type - media_type: 'image' or 'video' - brand_name: Optional brand name - ad_id: Optional ad ID - analysis_results: Optional analysis results - duration_seconds: Optional video duration - has_audio: Optional audio presence flag Returns: List of file paths where media was cached """ if not media_data_list: return [] file_paths = [] db_entries = [] # Process each media item and save files for media_info in media_data_list: url = media_info['url'] media_data = media_info['media_data'] content_type = media_info['content_type'] media_type = media_info.get('media_type', 'image') url_hash = self._generate_url_hash(url) file_path = self._get_file_path(url_hash, content_type, media_type) # Save media file file_path.write_bytes(media_data) file_paths.append(str(file_path)) # Prepare analysis results for storage analysis_json = None analysis_cached_at = None analysis_results = media_info.get('analysis_results') if analysis_results: analysis_json = json.dumps(analysis_results) analysis_cached_at = time.time() # Prepare database entry db_entries.append(( url_hash, url, str(file_path), len(media_data), content_type, media_type, media_info.get('brand_name'), media_info.get('ad_id'), analysis_json, analysis_cached_at, media_info.get('duration_seconds'), media_info.get('has_audio') )) # Batch insert into database with sqlite3.connect(CACHE_DB_PATH) as conn: conn.executemany(""" INSERT OR REPLACE INTO media_cache ( url_hash, original_url, file_path, file_size, content_type, media_type, brand_name, ad_id, analysis_results, analysis_cached_at, duration_seconds, has_audio ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, db_entries) conn.commit() logger.info(f"Batch cached {len(media_data_list)} media files") return file_paths # Global instance (maintain backward compatibility) media_cache = MediaCacheService() image_cache = media_cache # Backward compatibility alias

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/talknerdytome-labs/facebook-ads-library-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server