models.pyโข15.3 kB
#!/usr/bin/env python3
"""
Database Models
Defines database table structures and ORM models.
"""
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from enum import Enum
from dataclasses import dataclass, field
from sqlalchemy import (
Column,
String,
Text,
Integer,
DateTime,
Boolean,
JSON,
ForeignKey,
Index,
UniqueConstraint,
create_engine,
MetaData,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.dialects.sqlite import BLOB
import structlog
logger = structlog.get_logger(__name__)
# SQLAlchemy base
Base = declarative_base()
metadata = MetaData()
class DocumentStatus(Enum):
"""Document processing status."""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
ARCHIVED = "archived"
class AdapterType(Enum):
"""Document adapter types."""
GITBOOK = "gitbook"
NOTION = "notion"
CONFLUENCE = "confluence"
WEBSITE = "website"
GENERIC = "generic"
class UserRole(Enum):
"""User roles."""
ADMIN = "admin"
USER = "user"
READONLY = "readonly"
class User(Base):
"""User model."""
__tablename__ = "users"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
username = Column(String(100), unique=True, nullable=False, index=True)
email = Column(String(255), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=True) # Nullable for OAuth users
full_name = Column(String(200), nullable=True)
role = Column(String(20), nullable=False, default=UserRole.USER.value)
is_active = Column(Boolean, default=True, nullable=False)
is_verified = Column(Boolean, default=False, nullable=False)
# OAuth fields
oauth_provider = Column(String(50), nullable=True)
oauth_id = Column(String(100), nullable=True)
# Metadata
extra_metadata = Column(JSON, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
last_login_at = Column(DateTime, nullable=True)
# Relationships
api_keys = relationship("APIKey", back_populates="user", cascade="all, delete-orphan")
doc_sources = relationship("DocumentSource", back_populates="created_by")
# Indexes
__table_args__ = (
Index('idx_user_oauth', 'oauth_provider', 'oauth_id'),
Index('idx_user_role', 'role'),
Index('idx_user_active', 'is_active'),
)
def __repr__(self):
return f"<User(id='{self.id}', username='{self.username}', role='{self.role}')>"
class APIKey(Base):
"""API Key model."""
__tablename__ = "api_keys"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
user_id = Column(String(36), ForeignKey("users.id"), nullable=False)
name = Column(String(100), nullable=False)
key_hash = Column(String(255), nullable=False, unique=True)
key_prefix = Column(String(10), nullable=False) # First few chars for identification
# Permissions and scope
permissions = Column(JSON, nullable=True) # List of permissions
scope = Column(String(500), nullable=True) # Comma-separated scopes
# Status and expiration
is_active = Column(Boolean, default=True, nullable=False)
expires_at = Column(DateTime, nullable=True)
last_used_at = Column(DateTime, nullable=True)
# Metadata
extra_metadata = Column(JSON, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Relationships
user = relationship("User", back_populates="api_keys")
# Indexes
__table_args__ = (
Index('idx_apikey_user', 'user_id'),
Index('idx_apikey_active', 'is_active'),
Index('idx_apikey_expires', 'expires_at'),
)
def __repr__(self):
return f"<APIKey(id='{self.id}', name='{self.name}', user_id='{self.user_id}')>"
class DocumentSource(Base):
"""Document source configuration model."""
__tablename__ = "document_sources"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String(100), nullable=False)
adapter_type = Column(String(20), nullable=False)
# Connection configuration
base_url = Column(String(500), nullable=False)
auth_config = Column(JSON, nullable=True) # Authentication configuration
adapter_config = Column(JSON, nullable=True) # Adapter-specific configuration
# Processing settings
sync_enabled = Column(Boolean, default=True, nullable=False)
sync_interval = Column(Integer, default=3600, nullable=False) # Seconds
last_sync_at = Column(DateTime, nullable=True)
next_sync_at = Column(DateTime, nullable=True)
# Status
is_active = Column(Boolean, default=True, nullable=False)
enabled = Column(Boolean, default=True, nullable=False) # For filtering active sources
status = Column(String(20), default=DocumentStatus.PENDING.value, nullable=False)
error_message = Column(Text, nullable=True)
# Metadata
extra_metadata = Column(JSON, nullable=True)
# Ownership
created_by_id = Column(String(36), ForeignKey("users.id"), nullable=False)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Relationships
created_by = relationship("User", back_populates="doc_sources")
documents = relationship("Document", back_populates="source", cascade="all, delete-orphan")
# Indexes
__table_args__ = (
Index('idx_docsource_adapter', 'adapter_type'),
Index('idx_docsource_active', 'is_active'),
Index('idx_docsource_sync', 'sync_enabled', 'next_sync_at'),
Index('idx_docsource_status', 'status'),
UniqueConstraint('name', 'created_by_id', name='uq_docsource_name_user'),
)
def __repr__(self):
return f"<DocumentSource(id='{self.id}', name='{self.name}', adapter='{self.adapter_type}')>"
class Document(Base):
"""Document model."""
__tablename__ = "documents"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
source_id = Column(String(36), ForeignKey("document_sources.id"), nullable=False)
# Document identification
external_id = Column(String(255), nullable=False) # ID from source system
title = Column(String(500), nullable=False)
slug = Column(String(255), nullable=True)
path = Column(String(1000), nullable=True) # Document path/hierarchy
# Content
content = Column(Text, nullable=True)
content_type = Column(String(50), default="markdown", nullable=False)
content_hash = Column(String(64), nullable=True) # SHA-256 hash for change detection
# Processed content
processed_content = Column(Text, nullable=True) # HTML or processed format
searchable_text = Column(Text, nullable=True) # Plain text for search
# Document metadata
author = Column(String(200), nullable=True)
description = Column(Text, nullable=True)
tags = Column(JSON, nullable=True) # List of tags
category = Column(String(100), nullable=True)
# URLs and references
source_url = Column(String(1000), nullable=True)
canonical_url = Column(String(1000), nullable=True)
# Status and processing
status = Column(String(20), default=DocumentStatus.PENDING.value, nullable=False)
processing_error = Column(Text, nullable=True)
# Versioning
version = Column(String(50), nullable=True)
parent_id = Column(String(36), ForeignKey("documents.id"), nullable=True)
# Statistics
word_count = Column(Integer, default=0, nullable=False)
character_count = Column(Integer, default=0, nullable=False)
# Metadata
extra_metadata = Column(JSON, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
published_at = Column(DateTime, nullable=True)
source_updated_at = Column(DateTime, nullable=True) # Last update in source system
# Relationships
source = relationship("DocumentSource", back_populates="documents")
parent = relationship("Document", remote_side=[id], backref="children")
# Indexes
__table_args__ = (
Index('idx_doc_source', 'source_id'),
Index('idx_doc_external', 'external_id'),
Index('idx_doc_status', 'status'),
Index('idx_doc_title', 'title'),
Index('idx_doc_path', 'path'),
Index('idx_doc_updated', 'updated_at'),
Index('idx_doc_published', 'published_at'),
Index('idx_doc_search', 'searchable_text'), # For FTS if supported
UniqueConstraint('source_id', 'external_id', name='uq_doc_source_external'),
)
def __repr__(self):
return f"<Document(id='{self.id}', title='{self.title}', source_id='{self.source_id}')>"
class SearchIndex(Base):
"""Full-text search index model."""
__tablename__ = "search_index"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
document_id = Column(String(36), ForeignKey("documents.id"), nullable=False)
# Search content
title = Column(String(500), nullable=False)
content = Column(Text, nullable=False) # Searchable text content
keywords = Column(Text, nullable=True) # Extracted keywords
# Search metadata
language = Column(String(10), default="en", nullable=False)
boost_factor = Column(Integer, default=1, nullable=False) # Search ranking boost
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Indexes
__table_args__ = (
Index('idx_search_document', 'document_id'),
Index('idx_search_title', 'title'),
Index('idx_search_content', 'content'), # For FTS if supported
Index('idx_search_language', 'language'),
)
def __repr__(self):
return f"<SearchIndex(id='{self.id}', document_id='{self.document_id}')>"
class SyncLog(Base):
"""Synchronization log model."""
__tablename__ = "sync_logs"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
source_id = Column(String(36), ForeignKey("document_sources.id"), nullable=False)
# Sync details
sync_type = Column(String(20), nullable=False) # full, incremental, manual
status = Column(String(20), nullable=False) # running, completed, failed
# Statistics
documents_processed = Column(Integer, default=0, nullable=False)
documents_created = Column(Integer, default=0, nullable=False)
documents_updated = Column(Integer, default=0, nullable=False)
documents_deleted = Column(Integer, default=0, nullable=False)
documents_failed = Column(Integer, default=0, nullable=False)
# Timing
started_at = Column(DateTime, default=datetime.utcnow, nullable=False)
completed_at = Column(DateTime, nullable=True)
duration_seconds = Column(Integer, nullable=True)
# Error handling
error_message = Column(Text, nullable=True)
error_details = Column(JSON, nullable=True)
# Metadata
extra_metadata = Column(JSON, nullable=True)
# Indexes
__table_args__ = (
Index('idx_synclog_source', 'source_id'),
Index('idx_synclog_status', 'status'),
Index('idx_synclog_started', 'started_at'),
Index('idx_synclog_type', 'sync_type'),
)
def __repr__(self):
return f"<SyncLog(id='{self.id}', source_id='{self.source_id}', status='{self.status}')>"
class Configuration(Base):
"""Application configuration model."""
__tablename__ = "configurations"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
key = Column(String(100), unique=True, nullable=False, index=True)
value = Column(JSON, nullable=True)
description = Column(Text, nullable=True)
# Metadata
is_sensitive = Column(Boolean, default=False, nullable=False) # For passwords, keys, etc.
is_system = Column(Boolean, default=False, nullable=False) # System vs user config
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
def __repr__(self):
return f"<Configuration(key='{self.key}', is_system={self.is_system})>"
# Database utility functions
def create_database_engine(database_url: str, **kwargs):
"""Create database engine.
Args:
database_url: Database connection URL
**kwargs: Additional engine options
Returns:
SQLAlchemy engine
"""
default_kwargs = {
'echo': False,
'pool_pre_ping': True,
'pool_recycle': 3600,
}
default_kwargs.update(kwargs)
engine = create_engine(database_url, **default_kwargs)
logger.info("Database engine created", database_url=database_url.split('@')[-1]) # Hide credentials
return engine
def create_tables(engine):
"""Create all database tables.
Args:
engine: SQLAlchemy engine
"""
Base.metadata.create_all(engine)
logger.info("Database tables created")
def create_session_factory(engine):
"""Create session factory.
Args:
engine: SQLAlchemy engine
Returns:
Session factory
"""
return sessionmaker(bind=engine)
# Data classes for API responses
@dataclass
class DocumentInfo:
"""Document information for API responses."""
id: str
title: str
path: Optional[str] = None
description: Optional[str] = None
author: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
source_url: Optional[str] = None
word_count: int = 0
tags: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DocumentSourceInfo:
"""Document source information for API responses."""
id: str
name: str
adapter_type: str
base_url: str
is_active: bool
status: str
last_sync_at: Optional[datetime] = None
document_count: int = 0
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SearchResult:
"""Search result for API responses."""
document_id: str
title: str
snippet: str
score: float
source_name: str
source_url: Optional[str] = None
path: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)