persistence.py•5.16 kB
"""
Persistence layer for storing and retrieving data.
"""
import json
import logging
from datetime import datetime
from config import JOB_EXECUTION_LOG_PATH, PERSISTENT_INFO_PATH, SCHEDULED_JOBS_PATH
def load_persistent_info() -> dict:
"""Load persistent info from disk, or return empty dict if not found/corrupt."""
try:
with open(PERSISTENT_INFO_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as exc:
logging.debug("Could not load persistent info: %s", exc)
return {}
def save_persistent_info(data: dict) -> None:
"""Save persistent info to disk (overwrites file)."""
try:
with open(PERSISTENT_INFO_PATH, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logging.info("Persistent info saved.")
except (OSError, TypeError, ValueError) as exc:
logging.error("Error saving persistent info: %s", exc)
def load_scheduled_jobs() -> dict:
"""Load scheduled jobs from disk, or return empty dict if not found/corrupt."""
try:
with open(SCHEDULED_JOBS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as exc:
logging.debug("Could not load scheduled jobs: %s", exc)
return {}
def save_scheduled_jobs(jobs: dict) -> None:
"""Save scheduled jobs to disk (overwrites file)."""
try:
with open(SCHEDULED_JOBS_PATH, "w", encoding="utf-8") as f:
json.dump(jobs, f, indent=2, ensure_ascii=False, default=str)
logging.info("Scheduled jobs saved.")
except (OSError, TypeError, ValueError) as exc:
logging.error("Error saving scheduled jobs: %s", exc)
def load_job_execution_log() -> list:
"""Load job execution log from disk, or return empty list if not found/corrupt."""
try:
with open(JOB_EXECUTION_LOG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError) as exc:
logging.debug("Could not load job execution log: %s", exc)
return []
def save_job_execution_log(log_entries: list) -> None:
"""Save job execution log to disk (overwrites file)."""
try:
with open(JOB_EXECUTION_LOG_PATH, "w", encoding="utf-8") as f:
json.dump(log_entries, f, indent=2, ensure_ascii=False, default=str)
logging.info("Job execution log saved.")
except (OSError, TypeError, ValueError) as exc:
logging.error("Error saving job execution log: %s", exc)
def log_job_execution(
job_id: str, function_name: str, success: bool, result: str = "", error: str = ""
):
"""Log a job execution to the persistent log."""
try:
log_entries = load_job_execution_log()
log_entry = {
"timestamp": datetime.now().isoformat(),
"job_id": job_id,
"function_name": function_name,
"success": success,
"result": result,
"error": error,
}
log_entries.append(log_entry)
# Keep only last 1000 entries to prevent file from growing too large
if len(log_entries) > 1000:
log_entries = log_entries[-1000:]
save_job_execution_log(log_entries)
except (OSError, TypeError, ValueError) as exc:
logging.error("Error logging job execution: %s", exc)
def save_job_to_persistence(
job_id: str, function_name: str, run_time: datetime, args: list, kwargs: dict
):
"""Save a job to persistent storage."""
try:
jobs = load_scheduled_jobs()
jobs[job_id] = {
"function_name": function_name,
"run_time": run_time.isoformat(),
"args": args,
"kwargs": kwargs,
"created_at": datetime.now().isoformat(),
}
save_scheduled_jobs(jobs)
except (OSError, TypeError, ValueError) as exc:
logging.error("Error saving job to persistence: %s", exc)
def remove_job_from_persistence(job_id: str):
"""Remove a job from persistent storage."""
try:
jobs = load_scheduled_jobs()
if job_id in jobs:
del jobs[job_id]
save_scheduled_jobs(jobs)
except (OSError, TypeError, ValueError) as exc:
logging.error("Error removing job from persistence: %s", exc)
def save_recurring_job_to_persistence(
job_id: str,
function_name: str,
schedule_expression: str,
args: list,
kwargs: dict,
start_date: str,
end_date: str,
):
"""Save a recurring job to persistent storage."""
try:
jobs = load_scheduled_jobs()
jobs[job_id] = {
"type": "recurring",
"function_name": function_name,
"schedule_expression": schedule_expression,
"args": args,
"kwargs": kwargs,
"start_date": start_date,
"end_date": end_date,
"created_at": datetime.now().isoformat(),
}
save_scheduled_jobs(jobs)
except (OSError, TypeError, ValueError) as exc:
logging.error("Error saving recurring job to persistence: %s", exc)