Skip to main content
Glama

URL Reputation and Validity Checker

by prismon
cache.py4.29 kB
"""Caching utilities for URL reputation checker.""" import hashlib import json from datetime import datetime from typing import Any, Dict, Optional from redis.asyncio import Redis class CacheManager: """Manage caching for URL validation results.""" def __init__(self, redis_url: str = "redis://localhost:6379"): self.redis_url = redis_url self.redis: Optional[Redis] = None self.ttl_valid = 86400 # 24 hours for valid URLs self.ttl_invalid = 3600 # 1 hour for invalid URLs self.ttl_history = 604800 # 7 days for domain history async def connect(self): """Connect to Redis.""" try: self.redis = Redis.from_url(self.redis_url) await self.redis.ping() except Exception: # If Redis is not available, caching will be disabled self.redis = None async def _ensure_connected(self): """Ensure we're connected to Redis.""" if self.redis is None: await self.connect() async def disconnect(self): """Disconnect from Redis.""" if self.redis: await self.redis.close() def _get_cache_key(self, prefix: str, identifier: str) -> str: """Generate cache key.""" # Use hash for long URLs if len(identifier) > 200: identifier = hashlib.md5(identifier.encode()).hexdigest() return f"url_reputation:{prefix}:{identifier}" async def get_validation_result(self, url: str) -> Optional[Dict[str, Any]]: """Get cached validation result.""" if not self.redis: return None try: key = self._get_cache_key("validation", url) data = await self.redis.get(key) if data: return json.loads(data) except Exception: pass return None async def set_validation_result( self, url: str, result: Dict[str, Any], is_valid: bool ): """Cache validation result.""" if not self.redis: return try: key = self._get_cache_key("validation", url) ttl = self.ttl_valid if is_valid else self.ttl_invalid # Add timestamp result["cached_at"] = datetime.utcnow().isoformat() await self.redis.setex(key, ttl, json.dumps(result)) except Exception: pass async def get_domain_history(self, domain: str) -> Optional[Dict[str, Any]]: """Get cached domain history.""" if not self.redis: return None try: key = self._get_cache_key("history", domain) data = await self.redis.get(key) if data: return json.loads(data) except Exception: pass return None async def set_domain_history(self, domain: str, history: Dict[str, Any]): """Cache domain history.""" if not self.redis: return try: key = self._get_cache_key("history", domain) # Add timestamp history["cached_at"] = datetime.utcnow().isoformat() await self.redis.setex(key, self.ttl_history, json.dumps(history)) except Exception: pass async def get_stats(self) -> Dict[str, int]: """Get cache statistics.""" await self._ensure_connected() if not self.redis: return {"enabled": False} try: validation_keys = await self.redis.keys("url_reputation:validation:*") history_keys = await self.redis.keys("url_reputation:history:*") return { "enabled": True, "validation_entries": len(validation_keys), "history_entries": len(history_keys), "total_entries": len(validation_keys) + len(history_keys), } except Exception: return {"enabled": False} async def clear_cache(self): """Clear all cache entries.""" await self._ensure_connected() if not self.redis: return try: keys = await self.redis.keys("url_reputation:*") if keys: await self.redis.delete(*keys) except Exception: pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prismon/reputation-checker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server