Skip to main content
Glama
Michaelzag
by Michaelzag
schemas.py15.3 kB
#!/usr/bin/env python3 """ Pydantic schemas for Migadu API requests and responses Based on official API documentation: https://migadu.com/api/ """ from typing import List, Optional, Union, Dict, Any, Type from datetime import date, datetime from pydantic import BaseModel, Field, EmailStr, field_validator, model_validator from enum import Enum import re class PasswordMethod(str, Enum): """Password method options for mailbox creation""" PASSWORD = "password" # nosec B105 - API constant, not hardcoded password INVITATION = "invitation" class SpamAction(str, Enum): """Spam action options""" FOLDER = "folder" REJECT = "reject" # Add other spam actions as documented class SpamAggressiveness(str, Enum): """Spam aggressiveness levels""" DEFAULT = "default" # Add other levels as documented # Email validation function def validate_email(email: str) -> str: """Simple email validation""" email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" if not re.match(email_pattern, email.strip()): raise ValueError(f"Invalid email format: {email}") return email.strip() # Utility functions for destinations handling def normalize_destinations(destinations: Union[List[str], str]) -> List[str]: """Convert destinations from CSV string or list to list of validated emails""" if isinstance(destinations, str): # Split CSV string and strip whitespace emails = [dest.strip() for dest in destinations.split(",") if dest.strip()] return [validate_email(email) for email in emails] return [validate_email(dest) for dest in destinations] def destinations_to_csv(destinations: List[str]) -> str: """Convert list of destinations to CSV string for API""" return ",".join(destinations) # Mailbox Schemas class MailboxCreateRequest(BaseModel): """Request schema for creating a mailbox""" target: str = Field(..., description="Email address or local part") name: str = Field(..., description="Display name for the mailbox") password: Optional[str] = Field( None, description="Password (required if password_method is 'password')" ) password_method: PasswordMethod = Field( PasswordMethod.PASSWORD, description="Password setup method" ) password_recovery_email: Optional[EmailStr] = Field( None, description="Recovery email for invitation method" ) is_internal: bool = Field(False, description="Restrict to internal-only messages") forwarding_to: Optional[EmailStr] = Field( None, description="External forwarding address" ) @model_validator(mode="after") def validate_password_method(self): if self.password_method == PasswordMethod.INVITATION: if not self.password_recovery_email: raise ValueError( "password_recovery_email is required when password_method is invitation" ) elif self.password_method == PasswordMethod.PASSWORD: if not self.password: raise ValueError( "password is required when password_method is password" ) return self class MailboxUpdateRequest(BaseModel): """Request schema for updating a mailbox""" target: str = Field(..., description="Email address or local part") name: Optional[str] = Field(None, description="Update display name") may_send: Optional[bool] = Field(None, description="Allow/deny sending emails") may_receive: Optional[bool] = Field(None, description="Allow/deny receiving emails") may_access_imap: Optional[bool] = Field(None, description="Allow/deny IMAP access") may_access_pop3: Optional[bool] = Field(None, description="Allow/deny POP3 access") may_access_managesieve: Optional[bool] = Field( None, description="Allow/deny ManageSieve access" ) spam_action: Optional[SpamAction] = Field(None, description="Spam handling action") spam_aggressiveness: Optional[SpamAggressiveness] = Field( None, description="Spam filtering sensitivity" ) sender_denylist: Optional[str] = Field(None, description="Sender denylist rules") sender_allowlist: Optional[str] = Field(None, description="Sender allowlist rules") recipient_denylist: Optional[str] = Field( None, description="Recipient denylist rules" ) class MailboxDeleteRequest(BaseModel): """Request schema for deleting a mailbox""" target: str = Field(..., description="Email address or local part") class MailboxPasswordResetRequest(BaseModel): """Request schema for resetting mailbox password""" target: str = Field(..., description="Email address or local part") new_password: str = Field(..., description="New password for authentication") class AutoresponderRequest(BaseModel): """Request schema for setting autoresponder""" target: str = Field(..., description="Email address or local part") active: bool = Field(..., description="Whether autoresponder is enabled") subject: Optional[str] = Field(None, description="Subject line for replies") body: Optional[str] = Field(None, description="Message content for replies") expires_on: Optional[date] = Field(None, description="Expiration date YYYY-MM-DD") @field_validator("expires_on") @classmethod def validate_expires_on(cls, v: Optional[date]) -> Optional[date]: if v and v <= date.today(): raise ValueError("expires_on must be a future date") return v # Alias Schemas class AliasCreateRequest(BaseModel): """Request schema for creating an alias""" target: str = Field(..., description="Local part of alias") destinations: Union[List[EmailStr], str] = Field( ..., description="List of email addresses or CSV string" ) domain: Optional[str] = Field(None, description="Domain name") is_internal: bool = Field(False, description="Internal-only flag") @field_validator("destinations", mode="before") @classmethod def normalize_destinations(cls, v: Union[List[str], str]) -> List[str]: return normalize_destinations(v) class AliasUpdateRequest(BaseModel): """Request schema for updating an alias""" target: str = Field(..., description="Local part of alias") destinations: Union[List[EmailStr], str] = Field( ..., description="New list of email addresses or CSV string" ) domain: Optional[str] = Field(None, description="Domain name") @field_validator("destinations", mode="before") @classmethod def normalize_destinations(cls, v: Union[List[str], str]) -> List[str]: return normalize_destinations(v) class AliasDeleteRequest(BaseModel): """Request schema for deleting an alias""" target: str = Field(..., description="Local part of alias") domain: Optional[str] = Field(None, description="Domain name") # Identity Schemas class IdentityCreateRequest(BaseModel): """Request schema for creating an identity""" target: str = Field(..., description="Local part of identity address") mailbox: str = Field(..., description="Username of mailbox that owns this identity") name: str = Field(..., description="Display name for identity") password: str = Field(..., description="Password for SMTP authentication") domain: Optional[str] = Field(None, description="Domain name") class IdentityUpdateRequest(BaseModel): """Request schema for updating an identity""" target: str = Field(..., description="Local part of identity address") mailbox: str = Field(..., description="Username of mailbox that owns this identity") domain: Optional[str] = Field(None, description="Domain name") name: Optional[str] = Field(None, description="Update display name") may_send: Optional[bool] = Field( None, description="Allow/deny sending from identity" ) may_receive: Optional[bool] = Field( None, description="Allow/deny receiving to identity" ) may_access_imap: Optional[bool] = Field(None, description="Allow/deny IMAP access") may_access_pop3: Optional[bool] = Field(None, description="Allow/deny POP3 access") may_access_managesieve: Optional[bool] = Field( None, description="Allow/deny ManageSieve access" ) footer_active: Optional[bool] = Field(None, description="Enable/disable footer") footer_plain_body: Optional[str] = Field( None, description="Plain text footer content" ) footer_html_body: Optional[str] = Field(None, description="HTML footer content") class IdentityDeleteRequest(BaseModel): """Request schema for deleting an identity""" target: str = Field(..., description="Local part of identity address") mailbox: str = Field(..., description="Username of mailbox that owns this identity") domain: Optional[str] = Field(None, description="Domain name") # Rewrite Schemas class RewriteCreateRequest(BaseModel): """Request schema for creating a rewrite rule""" name: str = Field(..., description="Unique identifier/slug for the rule") local_part_rule: str = Field( ..., description="Pattern to match (e.g., 'demo-*', 'support-*')" ) destinations: Union[List[EmailStr], str] = Field( ..., description="List of email addresses or CSV string" ) domain: Optional[str] = Field(None, description="Domain name") order_num: Optional[int] = Field( None, description="Processing order (lower numbers processed first)" ) @field_validator("destinations", mode="before") @classmethod def normalize_destinations(cls, v: Union[List[str], str]) -> List[str]: return normalize_destinations(v) class RewriteUpdateRequest(BaseModel): """Request schema for updating a rewrite rule""" name: str = Field(..., description="Current identifier/slug of the rule") domain: Optional[str] = Field(None, description="Domain name") new_name: Optional[str] = Field(None, description="New identifier/slug") local_part_rule: Optional[str] = Field(None, description="New pattern to match") destinations: Optional[Union[List[EmailStr], str]] = Field( None, description="New list of destinations or CSV string" ) order_num: Optional[int] = Field(None, description="New processing order") @field_validator("destinations", mode="before") @classmethod def normalize_destinations( cls, v: Optional[Union[List[str], str]] ) -> Optional[List[str]]: if v is not None: return normalize_destinations(v) return v class RewriteDeleteRequest(BaseModel): """Request schema for deleting a rewrite rule""" name: str = Field(..., description="Identifier/slug of the rule to delete") domain: Optional[str] = Field(None, description="Domain name") # Forwarding Schemas (additional API endpoint) class ForwardingCreateRequest(BaseModel): """Request schema for creating a forwarding""" address: EmailStr = Field(..., description="External email address to forward to") expires_on: Optional[date] = Field(None, description="Expiration date") remove_upon_expiry: Optional[bool] = Field( None, description="Remove forwarding when expired" ) @field_validator("expires_on") @classmethod def validate_expires_on(cls, v: Optional[date]) -> Optional[date]: if v and v <= date.today(): raise ValueError("expires_on must be a future date") return v class ForwardingUpdateRequest(BaseModel): """Request schema for updating a forwarding""" address: EmailStr = Field(..., description="External email address") is_active: Optional[bool] = Field(None, description="Enable/disable forwarding") expires_on: Optional[date] = Field(None, description="New expiration date") remove_upon_expiry: Optional[bool] = Field( None, description="Remove forwarding when expired" ) @field_validator("expires_on") @classmethod def validate_expires_on(cls, v: Optional[date]) -> Optional[date]: if v and v <= date.today(): raise ValueError("expires_on must be a future date") return v class ForwardingDeleteRequest(BaseModel): """Request schema for deleting a forwarding""" address: EmailStr = Field(..., description="External email address to remove") # Response Schemas (for validation of API responses) class MailboxResponse(BaseModel): """Response schema for mailbox operations""" address: str local_part: str domain_name: str name: str is_internal: bool may_send: bool may_receive: bool may_access_imap: bool may_access_pop3: bool may_access_managesieve: bool password_recovery_email: Optional[EmailStr] spam_action: str spam_aggressiveness: str sender_denylist: List[str] = Field(default_factory=list) sender_allowlist: List[str] = Field(default_factory=list) recipient_denylist: List[str] = Field(default_factory=list) autorespond_active: Optional[bool] autorespond_subject: Optional[str] autorespond_body: Optional[str] autorespond_expires_on: Optional[date] footer_active: bool footer_plain_body: Optional[str] footer_html_body: Optional[str] delegations: List[Dict[str, Any]] = Field(default_factory=list) identities: List[Dict[str, Any]] = Field(default_factory=list) class AliasResponse(BaseModel): """Response schema for alias operations""" address: str local_part: str domain_name: str is_internal: bool destinations: List[EmailStr] class IdentityResponse(BaseModel): """Response schema for identity operations""" address: str local_part: str domain_name: str name: str may_send: bool may_receive: bool may_access_imap: bool may_access_pop3: bool may_access_managesieve: bool footer_active: bool footer_plain_body: Optional[str] footer_html_body: Optional[str] class RewriteResponse(BaseModel): """Response schema for rewrite operations""" name: str domain_name: str local_part_rule: str destinations: List[EmailStr] order_num: Optional[int] class ForwardingResponse(BaseModel): """Response schema for forwarding operations""" address: EmailStr blocked_at: Optional[datetime] confirmation_sent_at: Optional[datetime] confirmed_at: Optional[datetime] expires_on: Optional[date] is_active: bool remove_upon_expiry: Optional[bool] # Utility functions for schema validation def validate_schema(data: Dict[str, Any], schema_class: Type[BaseModel]) -> BaseModel: """Validate dictionary data against a Pydantic schema""" try: return schema_class(**data) except Exception as e: raise ValueError(f"Schema validation failed for {schema_class.__name__}: {e}") def schema_to_api_dict( schema_instance: BaseModel, exclude_none: bool = True ) -> Dict[str, Any]: """Convert Pydantic schema instance to dictionary for API calls""" data = schema_instance.model_dump(exclude_none=exclude_none) # Convert destinations list to CSV string for API compatibility if "destinations" in data and isinstance(data["destinations"], list): data["destinations"] = destinations_to_csv(data["destinations"]) return data

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Michaelzag/migadu-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server