models.py•4.76 kB
"""
Database models for the TimeLooker Lambda function.
"""
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.sql import func
import os
Base = declarative_base()
class Task(Base):
"""
Represents a monitoring task configuration.
"""
__tablename__ = 'tasks'
id = Column(Integer, primary_key=True, autoincrement=True)
task_description = Column(Text, nullable=False)
frequency_minutes = Column(Integer, nullable=False)
runtime_minutes = Column(Integer, nullable=False)
recipient_email = Column(String(255), nullable=False)
sender_email = Column(String(255), default="fortnightlydevs@gmail.com")
# Task status
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# Relationships
search_results = relationship("SearchResult", back_populates="task", cascade="all, delete-orphan")
executions = relationship("Execution", back_populates="task", cascade="all, delete-orphan")
def __repr__(self):
return f"<Task(id={self.id}, description='{self.task_description[:50]}...')>"
class SearchResult(Base):
"""
Stores search results for comparison between iterations.
"""
__tablename__ = 'search_results'
id = Column(Integer, primary_key=True, autoincrement=True)
task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False)
# Result data
name = Column(Text, nullable=False)
description = Column(Text)
url = Column(Text)
source = Column(String(255))
location = Column(String(255))
additional_info = Column(Text)
# Metadata
content_hash = Column(String(32), nullable=False) # MD5 hash for quick comparison
raw_data = Column(JSON) # Store the complete original data
found_at = Column(DateTime, default=func.now())
# Relationships
task = relationship("Task", back_populates="search_results")
def __repr__(self):
return f"<SearchResult(id={self.id}, name='{self.name[:30]}...', task_id={self.task_id})>"
class Execution(Base):
"""
Tracks execution history and status of task runs.
"""
__tablename__ = 'executions'
id = Column(Integer, primary_key=True, autoincrement=True)
task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False)
# Execution details
started_at = Column(DateTime, default=func.now())
completed_at = Column(DateTime)
status = Column(String(50), default='running') # running, completed, failed, timeout
# Results
items_found = Column(Integer, default=0)
new_items_count = Column(Integer, default=0)
notification_sent = Column(Boolean, default=False)
# Error handling
error_message = Column(Text)
# Relationships
task = relationship("Task", back_populates="executions")
def __repr__(self):
return f"<Execution(id={self.id}, task_id={self.task_id}, status='{self.status}')>"
class DatabaseManager:
"""
Manages database connections and sessions.
"""
def __init__(self, database_url=None):
if database_url is None:
# Default to local SQLite for development
database_url = os.getenv('DATABASE_URL', 'sqlite:///timelooker.db')
self.engine = create_engine(database_url, echo=False)
self.SessionLocal = sessionmaker(bind=self.engine)
def create_tables(self):
"""Create all tables in the database."""
Base.metadata.create_all(bind=self.engine)
def get_session(self):
"""Get a new database session."""
return self.SessionLocal()
def drop_tables(self):
"""Drop all tables (for testing/development)."""
Base.metadata.drop_all(bind=self.engine)
# Global database manager instance
db_manager = DatabaseManager()
def get_db_session():
"""
Get a database session for use in Lambda functions.
This function can be easily mocked for testing.
"""
return db_manager.get_session()
class DatabaseSession:
"""
Context manager for database sessions to ensure proper cleanup.
"""
def __init__(self):
self.session = None
def __enter__(self):
self.session = get_db_session()
return self.session
def __exit__(self, exc_type, exc_val, exc_tb):
if self.session:
if exc_type:
# Exception occurred, rollback
self.session.rollback()
self.session.close()