Skip to main content
Glama

E-commerce Local MCP Server

sync_tracker.py8.44 kB
""" Sync timestamp tracking system. Manages last sync times for incremental data synchronization. """ import logging from typing import Optional, Dict, Any from datetime import datetime, timezone, timedelta logger = logging.getLogger(__name__) class SyncTracker: """ Tracks last sync timestamps for each table to enable incremental sync. Stores sync metadata in MongoDB for persistence across restarts. """ def __init__(self): self.db = None self.collection_name = "sync_metadata" self._cache: Dict[str, datetime] = {} async def initialize(self, database): """Initialize the sync tracker with MongoDB database.""" self.db = database # Ensure collection exists and create index collection = self.db[self.collection_name] await collection.create_index("table_name", unique=True) # Load existing sync times into cache await self._load_cache() logger.info("Sync tracker initialized") async def _load_cache(self): """Load all sync times into memory cache for faster access.""" if self.db is None: return collection = self.db[self.collection_name] async for doc in collection.find({}): table_name = doc["table_name"] last_sync = doc["last_sync_time"] self._cache[table_name] = last_sync logger.info(f"Loaded {len(self._cache)} sync timestamps into cache") async def get_last_sync_time(self, table_name: str) -> Optional[datetime]: """ Get the last sync timestamp for a table. Args: table_name: Name of the table Returns: Last sync datetime or None if never synced """ # Check cache first if table_name in self._cache: return self._cache[table_name] # If not in cache, check database if self.db is None: return None collection = self.db[self.collection_name] doc = await collection.find_one({"table_name": table_name}) if doc: last_sync = doc["last_sync_time"] self._cache[table_name] = last_sync return last_sync return None async def update_last_sync_time( self, table_name: str, sync_time: Optional[datetime] = None ): """ Update the last sync timestamp for a table. Args: table_name: Name of the table sync_time: Sync timestamp (defaults to current UTC time) """ if sync_time is None: sync_time = datetime.now(timezone.utc) if self.db is None: logger.warning("Database not initialized, cannot update sync time") return collection = self.db[self.collection_name] # Update in database await collection.replace_one( {"table_name": table_name}, { "table_name": table_name, "last_sync_time": sync_time, "updated_at": datetime.now(timezone.utc) }, upsert=True ) # Update cache self._cache[table_name] = sync_time logger.debug(f"Updated sync time for {table_name}: {sync_time}") async def get_all_sync_times(self) -> Dict[str, datetime]: """Get all sync timestamps for all tables.""" if self.db is None: return self._cache.copy() # Ensure cache is up to date await self._load_cache() return self._cache.copy() async def reset_sync_time(self, table_name: str): """ Reset sync time for a table (force full sync on next run). Args: table_name: Name of the table """ if self.db is None: logger.warning("Database not initialized, cannot reset sync time") return collection = self.db[self.collection_name] # Remove from database await collection.delete_one({"table_name": table_name}) # Remove from cache self._cache.pop(table_name, None) logger.info(f"Reset sync time for {table_name}") async def reset_all_sync_times(self): """Reset all sync times (force full sync for all tables).""" if self.db is None: logger.warning("Database not initialized, cannot reset sync times") return collection = self.db[self.collection_name] # Clear database await collection.delete_many({}) # Clear cache self._cache.clear() logger.info("Reset all sync times") async def get_sync_statistics(self) -> Dict[str, Any]: """Get sync statistics and metadata.""" if self.db is None: return { "total_tables_tracked": len(self._cache), "tables": {} } collection = self.db[self.collection_name] # Get all sync records with additional metadata stats = { "total_tables_tracked": 0, "tables": {}, "oldest_sync": None, "newest_sync": None } oldest_sync = None newest_sync = None async for doc in collection.find({}): table_name = doc["table_name"] last_sync = doc["last_sync_time"] updated_at = doc.get("updated_at") # Ensure timezone awareness for datetime comparison if last_sync and last_sync.tzinfo is None: # Make naive datetime timezone-aware (assuming UTC) last_sync = last_sync.replace(tzinfo=timezone.utc) stats["tables"][table_name] = { "last_sync_time": last_sync.isoformat() if last_sync else None, "updated_at": updated_at.isoformat() if updated_at else None, "days_since_sync": (datetime.now(timezone.utc) - last_sync).days if last_sync else None } # Track oldest and newest sync times if last_sync: if not oldest_sync or last_sync < oldest_sync: oldest_sync = last_sync if not newest_sync or last_sync > newest_sync: newest_sync = last_sync stats["total_tables_tracked"] = len(stats["tables"]) stats["oldest_sync"] = oldest_sync.isoformat() if oldest_sync else None stats["newest_sync"] = newest_sync.isoformat() if newest_sync else None return stats async def cleanup_old_metadata(self, days_old: int = 30): """ Clean up metadata for tables that haven't been synced in a while. Args: days_old: Remove metadata for tables not synced in this many days """ if self.db is None: logger.warning("Database not initialized, cannot cleanup metadata") return cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old) collection = self.db[self.collection_name] # Find old records old_records = await collection.find( {"last_sync_time": {"$lt": cutoff_date}} ).to_list(None) if not old_records: logger.info("No old sync metadata to cleanup") return # Remove old records result = await collection.delete_many( {"last_sync_time": {"$lt": cutoff_date}} ) # Update cache for record in old_records: table_name = record["table_name"] self._cache.pop(table_name, None) logger.info(f"Cleaned up {result.deleted_count} old sync metadata records") def get_cached_sync_times(self) -> Dict[str, datetime]: """Get sync times from cache (no database access).""" return self._cache.copy() async def force_cache_refresh(self): """Force refresh of the cache from database.""" self._cache.clear() await self._load_cache() logger.info("Sync tracker cache refreshed from database")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AnisurRahman06046/mcptestwithmodel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server