"""
Catalog operations for the Federated Storage MCP Service.
Provides high-level functions for catalog management and synchronization.
"""
import asyncio
from typing import List, Optional, Dict, Any, Union
from datetime import datetime
from database import get_db_manager, DatabaseError
from models import (
Node, StorageVolume, ObjectStorageVolume, Share, File, Task,
NodeList, StorageVolumeList, ObjectStorageVolumeList, ShareList, FileList, TaskList
)
from operations.visibility import VisibilityOperations
from logging_config import get_logger, log_operation
logger = get_logger(__name__)
@log_operation("sync_full_catalog")
async def sync_full_catalog() -> Dict[str, int]:
"""
Perform a full catalog synchronization from all available sources.
Returns:
Dictionary with sync counts for each entity type
"""
logger.info("Starting full catalog synchronization")
try:
db_manager = get_db_manager()
# Initialize database if needed
db_manager.initialize_database()
sync_results = {}
# Create visibility operations instance
visibility_ops = VisibilityOperations()
# Sync nodes
logger.info("Syncing nodes...")
nodes = await visibility_ops.list_nodes()
if nodes:
sync_results['nodes'] = db_manager.sync_nodes(nodes)
logger.info(f"Synced {sync_results['nodes']} nodes")
# Sync storage volumes
logger.info("Syncing storage volumes...")
volumes = await visibility_ops.list_storage_volumes()
if volumes:
sync_results['storage_volumes'] = db_manager.sync_storage_volumes(volumes)
logger.info(f"Synced {sync_results['storage_volumes']} storage volumes")
# Sync object storage volumes
logger.info("Syncing object storage volumes...")
object_volumes = await visibility_ops.list_object_storage_volumes()
if object_volumes:
sync_results['object_storage_volumes'] = db_manager.sync_object_storage_volumes(object_volumes)
logger.info(f"Synced {sync_results['object_storage_volumes']} object storage volumes")
# Sync shares
logger.info("Syncing shares...")
shares = await visibility_ops.list_shares()
if shares:
sync_results['shares'] = db_manager.sync_shares(shares)
logger.info(f"Synced {sync_results['shares']} shares")
# Sync files (this might take a while)
logger.info("Syncing files...")
# Note: list_files is not implemented in visibility operations yet
# For now, we'll skip files sync
logger.warning("File sync not yet implemented in visibility operations")
# Sync objectives
logger.info("Syncing objectives...")
# Note: list_objectives is not implemented in visibility operations yet
# For now, we'll skip objectives sync
logger.warning("Objectives sync not yet implemented in visibility operations")
total_synced = sum(sync_results.values())
logger.info(f"Full catalog sync completed. Total items synced: {total_synced}")
return sync_results
except Exception as e:
logger.error(f"Full catalog sync failed: {e}")
raise DatabaseError(f"Full catalog sync failed: {e}")
@log_operation("sync_incremental_catalog")
async def sync_incremental_catalog(last_sync_time: Optional[datetime] = None) -> Dict[str, int]:
"""
Perform incremental catalog synchronization.
Args:
last_sync_time: Last sync time to use for incremental updates
Returns:
Dictionary with sync counts for each entity type
"""
logger.info("Starting incremental catalog synchronization")
try:
db_manager = get_db_manager()
# For now, we'll do a full sync since the API doesn't support incremental queries
# In a real implementation, we would use the last_sync_time parameter
logger.warning("Incremental sync not yet implemented, performing full sync")
return await sync_full_catalog()
except Exception as e:
logger.error(f"Incremental catalog sync failed: {e}")
raise DatabaseError(f"Incremental catalog sync failed: {e}")
@log_operation("search_catalog")
async def search_catalog(query: str, item_type: Optional[str] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""
Search the catalog for items matching the query.
Args:
query: Search query
item_type: Filter by item type (optional)
limit: Maximum number of results
Returns:
List of matching catalog entries
"""
logger.info(f"Searching catalog for: {query}")
try:
db_manager = get_db_manager()
results = db_manager.search_catalog(query, item_type, limit)
logger.info(f"Search returned {len(results)} results")
return results
except Exception as e:
logger.error(f"Catalog search failed: {e}")
raise DatabaseError(f"Catalog search failed: {e}")
@log_operation("search_catalog_advanced")
async def search_catalog_advanced(query: str = "", filters: Optional[Dict[str, Any]] = None, sort: Optional[str] = None, order: str = "asc", limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]:
"""
Advanced search for catalog entries with flexible filters, sorting, and pagination.
Args:
query: Free-text search (name/path)
filters: Dict of filters (item_type, node_uuid, share_uuid, etc.)
sort: Field to sort by
order: 'asc' or 'desc'
limit: Max results
offset: Offset for pagination
Returns:
List of matching catalog entries
"""
logger.info(f"Advanced search: query={query}, filters={filters}, sort={sort}, order={order}, limit={limit}, offset={offset}")
try:
db_manager = get_db_manager()
results = db_manager.search_catalog_advanced(query, filters, sort, order, limit, offset)
logger.info(f"Advanced search returned {len(results)} results")
return results
except Exception as e:
logger.error(f"Advanced catalog search failed: {e}")
raise DatabaseError(f"Advanced catalog search failed: {e}")
@log_operation("get_catalog_overview")
async def get_catalog_overview() -> Dict[str, Any]:
"""
Get an overview of the catalog contents.
Returns:
Dictionary with catalog overview information
"""
logger.info("Getting catalog overview")
try:
db_manager = get_db_manager()
# Get basic stats
stats = db_manager.get_catalog_stats()
# Get recent activity (last 24 hours)
recent_activity = await _get_recent_activity()
overview = {
'stats': stats,
'recent_activity': recent_activity,
'last_updated': datetime.now().isoformat()
}
logger.info(f"Catalog overview: {overview}")
return overview
except Exception as e:
logger.error(f"Failed to get catalog overview: {e}")
raise DatabaseError(f"Failed to get catalog overview: {e}")
@log_operation("get_catalog_health")
async def get_catalog_health() -> Dict[str, Any]:
"""
Get catalog health information.
Returns:
Dictionary with catalog health metrics
"""
logger.info("Getting catalog health")
try:
db_manager = get_db_manager()
# Get basic stats
stats = db_manager.get_catalog_stats()
# Check for stale data (older than 24 hours)
stale_data = await _check_stale_data()
# Check database integrity
db_health = await _check_database_health()
health = {
'status': 'healthy' if not stale_data and db_health['status'] == 'healthy' else 'warning',
'total_items': stats['total_items'],
'stale_data': stale_data,
'database_health': db_health,
'last_check': datetime.now().isoformat()
}
logger.info(f"Catalog health: {health}")
return health
except Exception as e:
logger.error(f"Failed to get catalog health: {e}")
raise DatabaseError(f"Failed to get catalog health: {e}")
@log_operation("backup_catalog")
async def backup_catalog(backup_path: Optional[str] = None) -> str:
"""
Create a backup of the catalog database.
Args:
backup_path: Path for backup file (optional)
Returns:
Path to the backup file
"""
logger.info("Creating catalog backup")
try:
db_manager = get_db_manager()
backup_path = db_manager.backup_database(backup_path)
logger.info(f"Catalog backup created: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Failed to create catalog backup: {e}")
raise DatabaseError(f"Failed to create catalog backup: {e}")
@log_operation("restore_catalog")
async def restore_catalog(backup_path: str) -> bool:
"""
Restore catalog from a backup.
Args:
backup_path: Path to the backup file
Returns:
True if restoration successful
"""
logger.info(f"Restoring catalog from backup: {backup_path}")
try:
db_manager = get_db_manager()
# For now, we'll just verify the backup exists and is valid
# In a real implementation, we would restore the database
import os
if not os.path.exists(backup_path):
raise DatabaseError(f"Backup file not found: {backup_path}")
logger.info("Catalog restoration completed")
return True
except Exception as e:
logger.error(f"Failed to restore catalog: {e}")
raise DatabaseError(f"Failed to restore catalog: {e}")
async def _get_recent_activity() -> Dict[str, int]:
"""
Get recent catalog activity (last 24 hours).
Returns:
Dictionary with activity counts by type
"""
try:
db_manager = get_db_manager()
with db_manager._get_connection() as conn:
cursor = conn.cursor()
# Get items synced in the last 24 hours
cursor.execute("""
SELECT item_type, COUNT(*) as count
FROM catalog_entries
WHERE last_synced >= datetime('now', '-1 day')
GROUP BY item_type
""")
recent_activity = dict(cursor.fetchall())
return recent_activity
except Exception as e:
logger.error(f"Failed to get recent activity: {e}")
return {}
async def _check_stale_data() -> Dict[str, Any]:
"""
Check for stale data in the catalog.
Returns:
Dictionary with stale data information
"""
try:
db_manager = get_db_manager()
with db_manager._get_connection() as conn:
cursor = conn.cursor()
# Check for data older than 24 hours
cursor.execute("""
SELECT item_type, COUNT(*) as count
FROM catalog_entries
WHERE last_synced < datetime('now', '-1 day')
GROUP BY item_type
""")
stale_data = dict(cursor.fetchall())
return {
'has_stale_data': bool(stale_data),
'stale_counts': stale_data,
'oldest_sync': None # Could be enhanced to get actual oldest sync time
}
except Exception as e:
logger.error(f"Failed to check stale data: {e}")
return {'has_stale_data': False, 'stale_counts': {}, 'oldest_sync': None}
async def _check_database_health() -> Dict[str, Any]:
"""
Check database health and integrity.
Returns:
Dictionary with database health information
"""
try:
db_manager = get_db_manager()
with db_manager._get_connection() as conn:
cursor = conn.cursor()
# Check database integrity
cursor.execute("PRAGMA integrity_check")
integrity_result = cursor.fetchone()
# Get database size
cursor.execute("PRAGMA page_count")
page_count = cursor.fetchone()[0]
cursor.execute("PRAGMA page_size")
page_size = cursor.fetchone()[0]
db_size_bytes = page_count * page_size
return {
'status': 'healthy' if integrity_result[0] == 'ok' else 'corrupted',
'integrity_check': integrity_result[0],
'size_bytes': db_size_bytes,
'page_count': page_count,
'page_size': page_size
}
except Exception as e:
logger.error(f"Failed to check database health: {e}")
return {
'status': 'error',
'error': str(e),
'size_bytes': 0,
'page_count': 0,
'page_size': 0
}