Skip to main content
Glama

AnyDocs MCP Server

by funky1688
models.pyโ€ข15.3 kB
#!/usr/bin/env python3 """ Database Models Defines database table structures and ORM models. """ import uuid from datetime import datetime from typing import Any, Dict, List, Optional from enum import Enum from dataclasses import dataclass, field from sqlalchemy import ( Column, String, Text, Integer, DateTime, Boolean, JSON, ForeignKey, Index, UniqueConstraint, create_engine, MetaData, ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship, sessionmaker from sqlalchemy.dialects.sqlite import BLOB import structlog logger = structlog.get_logger(__name__) # SQLAlchemy base Base = declarative_base() metadata = MetaData() class DocumentStatus(Enum): """Document processing status.""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" ARCHIVED = "archived" class AdapterType(Enum): """Document adapter types.""" GITBOOK = "gitbook" NOTION = "notion" CONFLUENCE = "confluence" WEBSITE = "website" GENERIC = "generic" class UserRole(Enum): """User roles.""" ADMIN = "admin" USER = "user" READONLY = "readonly" class User(Base): """User model.""" __tablename__ = "users" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) username = Column(String(100), unique=True, nullable=False, index=True) email = Column(String(255), unique=True, nullable=False, index=True) password_hash = Column(String(255), nullable=True) # Nullable for OAuth users full_name = Column(String(200), nullable=True) role = Column(String(20), nullable=False, default=UserRole.USER.value) is_active = Column(Boolean, default=True, nullable=False) is_verified = Column(Boolean, default=False, nullable=False) # OAuth fields oauth_provider = Column(String(50), nullable=True) oauth_id = Column(String(100), nullable=True) # Metadata extra_metadata = Column(JSON, nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) last_login_at = Column(DateTime, nullable=True) # Relationships api_keys = relationship("APIKey", back_populates="user", cascade="all, delete-orphan") doc_sources = relationship("DocumentSource", back_populates="created_by") # Indexes __table_args__ = ( Index('idx_user_oauth', 'oauth_provider', 'oauth_id'), Index('idx_user_role', 'role'), Index('idx_user_active', 'is_active'), ) def __repr__(self): return f"<User(id='{self.id}', username='{self.username}', role='{self.role}')>" class APIKey(Base): """API Key model.""" __tablename__ = "api_keys" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) user_id = Column(String(36), ForeignKey("users.id"), nullable=False) name = Column(String(100), nullable=False) key_hash = Column(String(255), nullable=False, unique=True) key_prefix = Column(String(10), nullable=False) # First few chars for identification # Permissions and scope permissions = Column(JSON, nullable=True) # List of permissions scope = Column(String(500), nullable=True) # Comma-separated scopes # Status and expiration is_active = Column(Boolean, default=True, nullable=False) expires_at = Column(DateTime, nullable=True) last_used_at = Column(DateTime, nullable=True) # Metadata extra_metadata = Column(JSON, nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) # Relationships user = relationship("User", back_populates="api_keys") # Indexes __table_args__ = ( Index('idx_apikey_user', 'user_id'), Index('idx_apikey_active', 'is_active'), Index('idx_apikey_expires', 'expires_at'), ) def __repr__(self): return f"<APIKey(id='{self.id}', name='{self.name}', user_id='{self.user_id}')>" class DocumentSource(Base): """Document source configuration model.""" __tablename__ = "document_sources" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) name = Column(String(100), nullable=False) adapter_type = Column(String(20), nullable=False) # Connection configuration base_url = Column(String(500), nullable=False) auth_config = Column(JSON, nullable=True) # Authentication configuration adapter_config = Column(JSON, nullable=True) # Adapter-specific configuration # Processing settings sync_enabled = Column(Boolean, default=True, nullable=False) sync_interval = Column(Integer, default=3600, nullable=False) # Seconds last_sync_at = Column(DateTime, nullable=True) next_sync_at = Column(DateTime, nullable=True) # Status is_active = Column(Boolean, default=True, nullable=False) enabled = Column(Boolean, default=True, nullable=False) # For filtering active sources status = Column(String(20), default=DocumentStatus.PENDING.value, nullable=False) error_message = Column(Text, nullable=True) # Metadata extra_metadata = Column(JSON, nullable=True) # Ownership created_by_id = Column(String(36), ForeignKey("users.id"), nullable=False) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) # Relationships created_by = relationship("User", back_populates="doc_sources") documents = relationship("Document", back_populates="source", cascade="all, delete-orphan") # Indexes __table_args__ = ( Index('idx_docsource_adapter', 'adapter_type'), Index('idx_docsource_active', 'is_active'), Index('idx_docsource_sync', 'sync_enabled', 'next_sync_at'), Index('idx_docsource_status', 'status'), UniqueConstraint('name', 'created_by_id', name='uq_docsource_name_user'), ) def __repr__(self): return f"<DocumentSource(id='{self.id}', name='{self.name}', adapter='{self.adapter_type}')>" class Document(Base): """Document model.""" __tablename__ = "documents" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) source_id = Column(String(36), ForeignKey("document_sources.id"), nullable=False) # Document identification external_id = Column(String(255), nullable=False) # ID from source system title = Column(String(500), nullable=False) slug = Column(String(255), nullable=True) path = Column(String(1000), nullable=True) # Document path/hierarchy # Content content = Column(Text, nullable=True) content_type = Column(String(50), default="markdown", nullable=False) content_hash = Column(String(64), nullable=True) # SHA-256 hash for change detection # Processed content processed_content = Column(Text, nullable=True) # HTML or processed format searchable_text = Column(Text, nullable=True) # Plain text for search # Document metadata author = Column(String(200), nullable=True) description = Column(Text, nullable=True) tags = Column(JSON, nullable=True) # List of tags category = Column(String(100), nullable=True) # URLs and references source_url = Column(String(1000), nullable=True) canonical_url = Column(String(1000), nullable=True) # Status and processing status = Column(String(20), default=DocumentStatus.PENDING.value, nullable=False) processing_error = Column(Text, nullable=True) # Versioning version = Column(String(50), nullable=True) parent_id = Column(String(36), ForeignKey("documents.id"), nullable=True) # Statistics word_count = Column(Integer, default=0, nullable=False) character_count = Column(Integer, default=0, nullable=False) # Metadata extra_metadata = Column(JSON, nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) published_at = Column(DateTime, nullable=True) source_updated_at = Column(DateTime, nullable=True) # Last update in source system # Relationships source = relationship("DocumentSource", back_populates="documents") parent = relationship("Document", remote_side=[id], backref="children") # Indexes __table_args__ = ( Index('idx_doc_source', 'source_id'), Index('idx_doc_external', 'external_id'), Index('idx_doc_status', 'status'), Index('idx_doc_title', 'title'), Index('idx_doc_path', 'path'), Index('idx_doc_updated', 'updated_at'), Index('idx_doc_published', 'published_at'), Index('idx_doc_search', 'searchable_text'), # For FTS if supported UniqueConstraint('source_id', 'external_id', name='uq_doc_source_external'), ) def __repr__(self): return f"<Document(id='{self.id}', title='{self.title}', source_id='{self.source_id}')>" class SearchIndex(Base): """Full-text search index model.""" __tablename__ = "search_index" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) document_id = Column(String(36), ForeignKey("documents.id"), nullable=False) # Search content title = Column(String(500), nullable=False) content = Column(Text, nullable=False) # Searchable text content keywords = Column(Text, nullable=True) # Extracted keywords # Search metadata language = Column(String(10), default="en", nullable=False) boost_factor = Column(Integer, default=1, nullable=False) # Search ranking boost # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) # Indexes __table_args__ = ( Index('idx_search_document', 'document_id'), Index('idx_search_title', 'title'), Index('idx_search_content', 'content'), # For FTS if supported Index('idx_search_language', 'language'), ) def __repr__(self): return f"<SearchIndex(id='{self.id}', document_id='{self.document_id}')>" class SyncLog(Base): """Synchronization log model.""" __tablename__ = "sync_logs" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) source_id = Column(String(36), ForeignKey("document_sources.id"), nullable=False) # Sync details sync_type = Column(String(20), nullable=False) # full, incremental, manual status = Column(String(20), nullable=False) # running, completed, failed # Statistics documents_processed = Column(Integer, default=0, nullable=False) documents_created = Column(Integer, default=0, nullable=False) documents_updated = Column(Integer, default=0, nullable=False) documents_deleted = Column(Integer, default=0, nullable=False) documents_failed = Column(Integer, default=0, nullable=False) # Timing started_at = Column(DateTime, default=datetime.utcnow, nullable=False) completed_at = Column(DateTime, nullable=True) duration_seconds = Column(Integer, nullable=True) # Error handling error_message = Column(Text, nullable=True) error_details = Column(JSON, nullable=True) # Metadata extra_metadata = Column(JSON, nullable=True) # Indexes __table_args__ = ( Index('idx_synclog_source', 'source_id'), Index('idx_synclog_status', 'status'), Index('idx_synclog_started', 'started_at'), Index('idx_synclog_type', 'sync_type'), ) def __repr__(self): return f"<SyncLog(id='{self.id}', source_id='{self.source_id}', status='{self.status}')>" class Configuration(Base): """Application configuration model.""" __tablename__ = "configurations" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) key = Column(String(100), unique=True, nullable=False, index=True) value = Column(JSON, nullable=True) description = Column(Text, nullable=True) # Metadata is_sensitive = Column(Boolean, default=False, nullable=False) # For passwords, keys, etc. is_system = Column(Boolean, default=False, nullable=False) # System vs user config # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) def __repr__(self): return f"<Configuration(key='{self.key}', is_system={self.is_system})>" # Database utility functions def create_database_engine(database_url: str, **kwargs): """Create database engine. Args: database_url: Database connection URL **kwargs: Additional engine options Returns: SQLAlchemy engine """ default_kwargs = { 'echo': False, 'pool_pre_ping': True, 'pool_recycle': 3600, } default_kwargs.update(kwargs) engine = create_engine(database_url, **default_kwargs) logger.info("Database engine created", database_url=database_url.split('@')[-1]) # Hide credentials return engine def create_tables(engine): """Create all database tables. Args: engine: SQLAlchemy engine """ Base.metadata.create_all(engine) logger.info("Database tables created") def create_session_factory(engine): """Create session factory. Args: engine: SQLAlchemy engine Returns: Session factory """ return sessionmaker(bind=engine) # Data classes for API responses @dataclass class DocumentInfo: """Document information for API responses.""" id: str title: str path: Optional[str] = None description: Optional[str] = None author: Optional[str] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None source_url: Optional[str] = None word_count: int = 0 tags: List[str] = field(default_factory=list) metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class DocumentSourceInfo: """Document source information for API responses.""" id: str name: str adapter_type: str base_url: str is_active: bool status: str last_sync_at: Optional[datetime] = None document_count: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class SearchResult: """Search result for API responses.""" document_id: str title: str snippet: str score: float source_name: str source_url: Optional[str] = None path: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server