# src/fctr_okta_mcp/resources/csv_output.py
"""
CSV Output - Local File Storage
Saves query results as CSV files to the local filesystem.
Uses UUID filenames for security (not guessable).
Includes cleanup functionality to remove old files.
"""
import csv
import os
import uuid
import asyncio
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
from fctr_okta_mcp.utils.logger import get_logger
logger = get_logger(__name__)
# Default output directory
DEFAULT_OUTPUT_DIR = os.path.join(os.getcwd(), "okta_results")
# File expiry time in seconds (5 minutes)
FILE_EXPIRY_SECONDS = 300
# Track if cleanup task is running
_cleanup_task: Optional[asyncio.Task] = None
def save_results_csv(data: list, output_dir: str = None) -> Tuple[str, str]:
"""
Convert results to CSV and save to local filesystem.
Args:
data: List of dictionaries to save
output_dir: Directory to save CSV files (defaults to ./okta_results)
Returns:
Tuple of (absolute_path, filename) - filename is UUID-based for URL construction
"""
if not data:
return None, None
# Determine output directory
if output_dir is None:
output_dir = DEFAULT_OUTPUT_DIR
# Create output directory if it doesn't exist
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Generate UUID filename (not guessable, secure for HTTP serving)
file_uuid = uuid.uuid4().hex
filename = f"{file_uuid}.csv"
filepath = os.path.join(output_dir, filename)
# Flatten nested dicts for CSV
flat_data = [_flatten_dict(row) for row in data]
# Write CSV file
if flat_data:
# Get all unique keys across all rows
all_keys = set()
for row in flat_data:
all_keys.update(row.keys())
fieldnames = sorted(all_keys)
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(flat_data)
logger.debug(f"Saved CSV results to: {filepath}")
return os.path.abspath(filepath), filename
def cleanup_old_files(output_dir: str = None, max_age_seconds: int = FILE_EXPIRY_SECONDS) -> int:
"""
Remove CSV files older than max_age_seconds.
Args:
output_dir: Directory to clean (defaults to ./okta_results)
max_age_seconds: Maximum age of files in seconds (default: 300 = 5 minutes)
Returns:
Number of files deleted
"""
if output_dir is None:
output_dir = DEFAULT_OUTPUT_DIR
if not os.path.exists(output_dir):
return 0
deleted_count = 0
current_time = time.time()
for filename in os.listdir(output_dir):
if not filename.endswith('.csv'):
continue
filepath = os.path.join(output_dir, filename)
try:
file_age = current_time - os.path.getmtime(filepath)
if file_age > max_age_seconds:
os.remove(filepath)
deleted_count += 1
logger.debug(f"Cleaned up expired CSV: {filename}")
except OSError as e:
logger.warning(f"Failed to cleanup {filename}: {e}")
if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} expired CSV file(s)")
return deleted_count
async def start_cleanup_task(interval_seconds: int = 60):
"""
Start background task to periodically clean up old CSV files.
Args:
interval_seconds: How often to run cleanup (default: 60 seconds)
"""
global _cleanup_task
if _cleanup_task is not None and not _cleanup_task.done():
logger.debug("Cleanup task already running")
return
async def cleanup_loop():
logger.info(f"CSV cleanup task started (files expire after {FILE_EXPIRY_SECONDS}s)")
while True:
try:
await asyncio.sleep(interval_seconds)
cleanup_old_files()
except asyncio.CancelledError:
logger.debug("CSV cleanup task cancelled")
break
except Exception as e:
logger.warning(f"Error in cleanup task: {e}")
_cleanup_task = asyncio.create_task(cleanup_loop())
def stop_cleanup_task():
"""Stop the background cleanup task."""
global _cleanup_task
if _cleanup_task is not None and not _cleanup_task.done():
_cleanup_task.cancel()
_cleanup_task = None
logger.debug("CSV cleanup task stopped")
def _flatten_dict(d: dict, parent_key: str = '', sep: str = '_') -> dict:
"""Flatten nested dict for CSV output."""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key, sep).items())
elif isinstance(v, list):
# Convert lists to string representation
items.append((new_key, str(v)))
else:
items.append((new_key, v))
return dict(items)