"""
Session Manager for LMS MCP Server
Handles session persistence, encryption, and management
"""
import json
import os
import logging
from datetime import datetime, timedelta
from typing import Dict, Optional, Any
from cryptography.fernet import Fernet
import base64
logger = logging.getLogger(__name__)
class SessionManager:
"""Manages encrypted session storage and retrieval"""
def __init__(
self, session_file: str = "session.enc", key_file: str = "session.key"
):
self.session_file = session_file
self.key_file = key_file
self.cipher_suite = None
self._initialize_encryption()
def _initialize_encryption(self):
"""Initialize encryption key and cipher suite"""
try:
# Try to load existing key
if os.path.exists(self.key_file):
with open(self.key_file, "rb") as f:
key = f.read()
else:
# Generate new key
key = Fernet.generate_key()
with open(self.key_file, "wb") as f:
f.write(key)
logger.info("Generated new encryption key")
self.cipher_suite = Fernet(key)
except Exception as e:
logger.error(f"Encryption initialization error: {str(e)}")
self.cipher_suite = None
def save_session(self, session_data: Dict[str, Any]) -> bool:
"""
Save session data encrypted to file
Args:
session_data: Dictionary containing session information
Returns:
True if successful, False otherwise
"""
try:
if not self.cipher_suite:
logger.error("Encryption not available")
return False
# Add timestamp
session_data["saved_at"] = datetime.now().isoformat()
# Convert to JSON
json_data = json.dumps(session_data)
# Encrypt
encrypted_data = self.cipher_suite.encrypt(json_data.encode())
# Save to file
with open(self.session_file, "wb") as f:
f.write(encrypted_data)
logger.info("Session saved successfully")
return True
except Exception as e:
logger.error(f"Session save error: {str(e)}")
return False
def load_session(self) -> Optional[Dict[str, Any]]:
"""
Load and decrypt session data from file
Returns:
Session data dictionary or None if failed/expired
"""
try:
if not self.cipher_suite:
logger.error("Encryption not available")
return None
if not os.path.exists(self.session_file):
logger.info("No saved session found")
return None
# Read encrypted data
with open(self.session_file, "rb") as f:
encrypted_data = f.read()
# Decrypt
decrypted_data = self.cipher_suite.decrypt(encrypted_data)
# Parse JSON
session_data = json.loads(decrypted_data.decode())
# Check if session is expired (default 24 hours)
if self._is_session_expired(session_data):
logger.info("Session expired")
self.clear_session()
return None
logger.info("Session loaded successfully")
return session_data
except Exception as e:
logger.error(f"Session load error: {str(e)}")
return None
def _is_session_expired(
self, session_data: Dict[str, Any], hours: int = 24
) -> bool:
"""
Check if session is expired
Args:
session_data: Session data dictionary
hours: Expiry time in hours
Returns:
True if expired, False otherwise
"""
try:
saved_at = session_data.get("saved_at")
if not saved_at:
return True
saved_time = datetime.fromisoformat(saved_at)
expiry_time = saved_time + timedelta(hours=hours)
return datetime.now() > expiry_time
except Exception:
return True
def clear_session(self) -> bool:
"""
Clear saved session
Returns:
True if successful, False otherwise
"""
try:
if os.path.exists(self.session_file):
os.remove(self.session_file)
logger.info("Session cleared")
return True
except Exception as e:
logger.error(f"Session clear error: {str(e)}")
return False
def update_session(self, updates: Dict[str, Any]) -> bool:
"""
Update existing session with new data
Args:
updates: Dictionary of updates to apply
Returns:
True if successful, False otherwise
"""
try:
# Load existing session
session_data = self.load_session()
if session_data is None:
session_data = {}
# Apply updates
session_data.update(updates)
# Save updated session
return self.save_session(session_data)
except Exception as e:
logger.error(f"Session update error: {str(e)}")
return False
def get_session_info(self) -> Dict[str, Any]:
"""
Get information about current session
Returns:
Dictionary with session information
"""
try:
session_data = self.load_session()
if session_data is None:
return {"exists": False, "message": "No active session"}
saved_at = session_data.get("saved_at")
if saved_at:
saved_time = datetime.fromisoformat(saved_at)
age_hours = (datetime.now() - saved_time).total_seconds() / 3600
else:
age_hours = 0
return {
"exists": True,
"saved_at": saved_at,
"age_hours": round(age_hours, 2),
"expires_in_hours": round(24 - age_hours, 2),
"data_keys": list(session_data.keys()),
}
except Exception as e:
logger.error(f"Session info error: {str(e)}")
return {"exists": False, "error": str(e)}
def backup_session(self, backup_path: str) -> bool:
"""
Create a backup of current session
Args:
backup_path: Path for backup file
Returns:
True if successful, False otherwise
"""
try:
if not os.path.exists(self.session_file):
return False
import shutil
shutil.copy2(self.session_file, backup_path)
logger.info(f"Session backed up to {backup_path}")
return True
except Exception as e:
logger.error(f"Session backup error: {str(e)}")
return False
def restore_session(self, backup_path: str) -> bool:
"""
Restore session from backup
Args:
backup_path: Path to backup file
Returns:
True if successful, False otherwise
"""
try:
if not os.path.exists(backup_path):
logger.error("Backup file not found")
return False
import shutil
shutil.copy2(backup_path, self.session_file)
logger.info(f"Session restored from {backup_path}")
return True
except Exception as e:
logger.error(f"Session restore error: {str(e)}")
return False