import json
import os
import logging
from typing import Any, Dict, Optional
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.error import URLError, HTTPError
from .DatabaseService import DatabaseService
class DataManager:
"""
Data manager for loading and saving data from different sources with fallback strategy.
"""
def __init__(self):
"""
Initialize the data manager.
"""
self.data_cache = {}
self.logger = logging.getLogger(__name__)
# Initialize database service
# Database initialization is handled automatically by DatabaseService
self.db_service = DatabaseService()
def load_data(self, data_collection: str, data_id: str) -> Dict[str, Any]:
"""
Loads data with fallback strategy: HTTP/HTTPS → Database → Local files.
Args:
data_collection: Collection to get data from (ignored for URLs)
data_id: Path to the data file or URL
Returns:
Dictionary with loaded data
Raises:
FileNotFoundError: If data cannot be found in any source
json.JSONDecodeError: If data is not valid JSON
ValueError: If data_id is invalid
"""
# Check cache first
cache_key = f"{data_collection}:{data_id}"
if cache_key in self.data_cache:
self.logger.debug(f"Returning cached data for {cache_key}")
return self.data_cache[cache_key]
data = None
if self._is_url(data_id):
try:
data = self._load_from_url(data_id)
self.logger.info(f"Successfully loaded data from URL: {data_id}")
except Exception as e:
error_msg = f"Failed to load from URL {data_id}: {e}"
self.logger.warning(error_msg)
# Extract filename from URL for fallback
parsed = urlparse(data_id)
data_id = os.path.basename(parsed.path) or data_id
if data is None:
try:
data = self._load_from_database(data_collection, data_id)
if data:
self.logger.info(f"Successfully loaded data from database: {data_collection}/{data_id}")
except Exception as e:
error_msg = f"Failed to load from database {data_collection}/{data_id}: {e}"
self.logger.warning(error_msg)
if data is None:
try:
data = self._load_from_file(data_collection, data_id)
self.logger.info(f"Successfully loaded data from local file: {data_collection}/{data_id}")
except Exception as e:
error_msg = f"Failed to load from local file {data_collection}/{data_id}: {e}"
self.logger.error(error_msg)
if data is None:
raise FileNotFoundError(f"Could not load data '{data_id}' from any source.")
# Cache the successfully loaded data
self.data_cache[cache_key] = data
return data
def _is_url(self, data_id: str) -> bool:
"""
Check if data_id is a valid HTTP/HTTPS URL.
Args:
data_id: String to check
Returns:
True if data_id is a URL, False otherwise
"""
try:
parsed = urlparse(data_id)
return parsed.scheme in ('http', 'https') and parsed.netloc
except Exception:
return False
def _load_from_url(self, url: str) -> Dict[str, Any]:
"""
Load data from HTTP/HTTPS URL.
Args:
url: URL to load data from
Returns:
Dictionary with loaded data
Raises:
URLError: If URL cannot be accessed
HTTPError: If HTTP error occurs
json.JSONDecodeError: If response is not valid JSON
"""
try:
with urlopen(url, timeout=30) as response:
if response.getcode() != 200:
raise HTTPError(url, response.getcode(), f"HTTP {response.getcode()}", response.headers, None)
content = response.read().decode('utf-8')
data = json.loads(content)
return data
except URLError as e:
raise URLError(f"Could not access URL {url}: {e}")
except json.JSONDecodeError as e:
raise json.JSONDecodeError(f"Invalid JSON from URL {url}: {e.msg}", e.doc, e.pos)
def _load_from_database(self, data_collection: str, data_id: str) -> Optional[Dict[str, Any]]:
"""
Load data from PostgreSQL database.
Args:
data_collection: Collection/table name
data_id: Data identifier
Returns:
Dictionary with loaded data or None if not found
"""
if not self.db_service or not self.db_service.connection_pool:
self.logger.debug(f"Database service not available, skipping {data_collection}/{data_id}")
return None
try:
data = self.db_service.load_data(data_collection, data_id)
if data:
self.logger.debug(f"Successfully loaded data from database: {data_collection}/{data_id}")
return data
except Exception as e:
self.logger.error(f"Database loading failed for {data_collection}/{data_id}: {e}")
return None
def _load_from_file(self, data_collection: str, data_id: str) -> Dict[str, Any]:
"""
Load data from local file.
Args:
data_collection: Directory name
data_id: File name
Returns:
Dictionary with loaded data
Raises:
FileNotFoundError: If file does not exist
json.JSONDecodeError: If file is not valid JSON
"""
full_path = os.path.join(data_collection, data_id)
if not os.path.exists(full_path):
raise FileNotFoundError(f"Data file not found: {full_path}")
try:
with open(full_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except json.JSONDecodeError as e:
raise json.JSONDecodeError(f"Invalid JSON in data file {full_path}: {e.msg}", e.doc, e.pos)
def _save_to_file(self, data_collection: str, data_id: str, data: Dict[str, Any]):
"""
Save data to local file.
Args:
data_collection: Directory name
data_id: File name
data: Data to save
Raises:
OSError: If file cannot be written
"""
# Create directory if it doesn't exist
os.makedirs(data_collection, exist_ok=True)
full_path = os.path.join(data_collection, data_id)
with open(full_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def clear_cache(self):
"""
Clear the data cache.
"""
self.data_cache.clear()
self.logger.info("Data cache cleared")
def get_cache_info(self) -> Dict[str, Any]:
"""
Get information about the current cache state and database connection.
Returns:
Dictionary with cache statistics and database info
"""
db_info = self.db_service.get_database_info() if self.db_service else {"connected": False}
return {
"cache_size": len(self.data_cache),
"cached_items": list(self.data_cache.keys()),
"database_info": db_info
}
def save_data(self, data_collection: str, data_id: str, data: Dict[str, Any], save_to_db: bool = True, save_to_file: bool = True) -> Dict[str, Any]:
"""
Saves data to database and/or local file.
Args:
data_collection: Collection to save data to
data_id: Data identifier
data: Data to save (must be a dictionary)
save_to_db: Whether to save to database (default: True)
save_to_file: Whether to save to local file as backup (default: False)
Returns:
Dictionary with save operation results
Raises:
ValueError: If data is not a dictionary
"""
if not isinstance(data, dict):
raise ValueError("Data must be a dictionary")
results = {
"sucess": False,
"database_success": False,
"file_success": False,
"errors": []
}
# Save to database first
if save_to_db and self.db_service and self.db_service.connection_pool:
try:
# Determine table based on collection name
table = "schemas" if data_collection.endswith("schemas") else "data_storage"
success = self.db_service.save_data(data_collection, data_id, data, table)
results["database_success"] = success
if success:
# Update cache
cache_key = f"{data_collection}:{data_id}"
self.data_cache[cache_key] = data
self.logger.info(f"Successfully saved data to database: {data_collection}/{data_id}")
else:
results["errors"].append("Database save failed")
except Exception as e:
error_msg = f"Database save error: {e}"
results["errors"].append(error_msg)
self.logger.error(f"Failed to save data to database {data_collection}/{data_id}: {e}")
# Save to local file as backup
if save_to_file:
try:
self._save_to_file(data_collection, data_id, data)
results["file_success"] = True
self.logger.info(f"Successfully saved data to file: {data_collection}/{data_id}")
except Exception as e:
error_msg = f"File save error: {e}"
results["errors"].append(error_msg)
self.logger.error(f"Failed to save data to file {data_collection}/{data_id}: {e}")
results["success"] = results["database_success"] or results["file_success"]
return results
def delete_data(self, data_collection: str, data_id: str, delete_from_db: bool = True, delete_from_file: bool = True) -> Dict[str, Any]:
"""
Delete data from database and/or local file.
Args:
data_collection: Collection to delete data from
data_id: Data identifier
delete_from_db: Whether to delete from database (default: True)
delete_from_file: Whether to delete from local file (default: False)
Returns:
Dictionary with delete operation results
"""
results = {
"success": False,
"database_success": False,
"file_success": False,
"errors": []
}
# Delete from database
if delete_from_db and self.db_service and self.db_service.connection_pool:
try:
# Determine table based on collection name
table = "schemas" if data_collection.endswith("schemas") else "data_storage"
success = self.db_service.delete_data(data_collection, data_id, table)
results["database_success"] = success
if success:
# Remove from cache
cache_key = f"{data_collection}:{data_id}"
self.data_cache.pop(cache_key, None)
self.logger.info(f"Successfully deleted data from database: {data_collection}/{data_id}")
else:
results["errors"].append("Database delete failed - data not found")
except Exception as e:
error_msg = f"Database delete error: {e}"
results["errors"].append(error_msg)
self.logger.error(f"Failed to delete data from database {data_collection}/{data_id}: {e}")
# Delete from local file
if delete_from_file:
try:
full_path = os.path.join(data_collection, data_id)
if os.path.exists(full_path):
os.remove(full_path)
results["file_success"] = True
# Remove from cache when file is deleted
cache_key = f"{data_collection}:{data_id}"
self.data_cache.pop(cache_key, None)
self.logger.info(f"Successfully deleted file: {data_collection}/{data_id}")
else:
results["errors"].append("File not found")
except Exception as e:
error_msg = f"File delete error: {e}"
results["errors"].append(error_msg)
self.logger.error(f"Failed to delete file {data_collection}/{data_id}: {e}")
results["success"] = results["database_success"] or results["file_success"]
return results
def schema_exists(self, data_collection: str, data_id: str) -> bool:
"""
Check if a schema exists without loading it.
Args:
data_collection: Collection to check (typically ".schemas")
data_id: Schema identifier
Returns:
True if schema exists, False otherwise
"""
try:
# Check cache first
cache_key = f"{data_collection}:{data_id}"
if cache_key in self.data_cache:
return True
# Check database if available
if self.db_service and self.db_service.connection_pool:
try:
# Use exists method from DatabaseService if available
if hasattr(self.db_service, 'data_exists'):
table = "schemas" if data_collection.endswith("schemas") else "data_storage"
return self.db_service.data_exists(data_collection, data_id, table)
else:
# Fallback: try to load and check if we get data
data = self.db_service.load_data(data_collection, data_id)
return data is not None
except Exception as e:
self.logger.debug(f"Database exists check failed for {data_collection}/{data_id}: {e}")
# Check local file as fallback
full_path = os.path.join(data_collection, data_id)
return os.path.exists(full_path)
except Exception as e:
self.logger.debug(f"Error checking schema existence for {data_collection}/{data_id}: {e}")
return False
def list_schemas(self) -> Dict[str, Any]:
"""
Lists all available schema IDs from the .schemas collection.
Returns:
Dictionary with schema list and metadata
"""
try:
# Try to get list from database first
if self.db_service and self.db_service.connection_pool:
try:
schema_ids = self.db_service.list_data_ids(".schemas", "schemas")
if schema_ids:
return {
"success": True,
"source": "database",
"schema_ids": sorted(schema_ids),
"count": len(schema_ids)
}
except Exception as e:
self.logger.warning(f"Could not list schemas from database: {e}")
# Fallback to local files
try:
schemas_dir = ".schemas"
if not os.path.exists(schemas_dir):
return {
"success": True,
"source": "file_system",
"schema_ids": [],
"count": 0,
"message": "No .schemas directory found"
}
schema_files = []
for root, dirs, files in os.walk(schemas_dir):
for file in files:
if file.endswith('.json'):
# Get relative path from .schemas directory
rel_path = os.path.relpath(os.path.join(root, file), schemas_dir)
# Convert to forward slashes for consistency
schema_id = rel_path.replace(os.sep, '/')
schema_files.append(schema_id)
return {
"success": True,
"source": "file_system",
"schema_ids": sorted(schema_files),
"count": len(schema_files)
}
except Exception as e:
self.logger.error(f"Error listing schemas from file system: {e}")
return {
"success": False,
"error": str(e),
"schema_ids": [],
"count": 0
}
except Exception as e:
self.logger.error(f"Error in list_schemas: {e}")
return {
"success": False,
"error": str(e),
"schema_ids": [],
"count": 0
}
def close(self):
"""
Close database connections and cleanup resources.
"""
if self.db_service:
self.db_service.close()
self.clear_cache()
self.logger.info("DataManager closed")