update_forwarding
Update email forwarding configurations for specified addresses, adjusting active status, expiry date, and removal behavior.
Instructions
Update forwarding(s). List of dicts with: mailbox, address (required), and any of: is_active, expires_on, remove_upon_expiry, domain.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| items | Yes |
Implementation Reference
- MCP tool handler that processes an update_forwarding request. It is wrapped by @migadu_bulk_tool which accepts a list of items, validates each against ForwardingUpdateRequest, and calls this one-item-at-a-time. The handler delegates to ForwardingService.update_forwarding and returns a bulk-result envelope.
@migadu_bulk_tool(mcp, ForwardingUpdateRequest, entity="forwarding") async def update_forwarding( item: ForwardingUpdateRequest, ctx: Context ) -> dict[str, Any]: """Update forwarding(s). List of dicts with: mailbox, address (required), and any of: is_active, expires_on, remove_upon_expiry, domain.""" domain = item.domain or resolve_domain(None) await ctx.info( f"📋 Updating forwarding {item.address} on {item.mailbox}@{domain}" ) result = ( await get_service_factory() .forwarding_service() .update_forwarding( domain=domain, mailbox=item.mailbox, address=str(item.address), is_active=item.is_active, expires_on=item.expires_on.isoformat() if item.expires_on else None, remove_upon_expiry=item.remove_upon_expiry, ) ) return {"forwarding": result, "success": True} - migadu_mcp/utils/schemas.py:244-257 (schema)Pydantic model for the update_forwarding input schema. Fields: mailbox (required), address (required, EmailStr), domain (optional), is_active (optional bool), expires_on (optional date), remove_upon_expiry (optional bool). Validates that expires_on is a future date.
class ForwardingUpdateRequest(BaseModel): mailbox: str address: EmailStr domain: str | None = None is_active: bool | None = None expires_on: date | None = None remove_upon_expiry: bool | None = None @field_validator("expires_on") @classmethod def _future_expiry(cls, v: date | None) -> date | None: if v is not None and v <= date.today(): raise ValueError("expires_on must be a future date") return v - ForwardingService.update_forwarding method. Builds a payload with only non-None fields (is_active, expires_on, remove_upon_expiry) and sends a PUT request to the Migadu API endpoint for the specific forwarding, URL-encoding the address.
async def update_forwarding( self, domain: str, mailbox: str, address: str, is_active: bool | None = None, expires_on: str | None = None, remove_upon_expiry: bool | None = None, ) -> dict[str, Any]: data: dict[str, Any] = {} if is_active is not None: data["is_active"] = is_active if expires_on is not None: data["expires_on"] = expires_on if remove_upon_expiry is not None: data["remove_upon_expiry"] = remove_upon_expiry return await self.client.put( f"/domains/{domain}/mailboxes/{mailbox}/forwardings/{_encode_address(address)}", json=data, ) - migadu_mcp/main.py:51-57 (registration)Initialization of the MCP server — register_forwarding_tools(mcp) is called, which registers the update_forwarding tool (along with other forwarding tools) on the FastMCP server instance.
def initialize_server() -> None: register_domain_tools(mcp) register_mailbox_tools(mcp) register_identity_tools(mcp) register_alias_tools(mcp) register_rewrite_tools(mcp) register_forwarding_tools(mcp) - The @migadu_bulk_tool decorator wraps the inner handler (process_one) in a bulk_wrapper that accepts a list of dicts, validates each against the schema, calls the handler per item, and aggregates results into a bulk-response envelope. It registers the wrapper as a FastMCP tool.
def migadu_bulk_tool( mcp: FastMCP, schema: Type[BaseModel], *, entity: str, destructive: bool = False, idempotent: bool = True, ) -> Callable[..., Any]: """Register a bulk mutation tool. The decorated function processes ONE validated item at a time. The outer tool accepts `list[dict]`, validates each, calls the inner function, and returns a bulk-result envelope with per-item success/failure. """ annotations = { "readOnlyHint": False, "destructiveHint": destructive, "idempotentHint": idempotent, "openWorldHint": True, } def decorator( process_one: Callable[..., Awaitable[dict[str, Any]]], ) -> Callable[..., Awaitable[dict[str, Any]]]: async def bulk_wrapper(items: list, ctx: Context) -> dict: normalized: list[dict[str, Any]] = ( [items] if isinstance(items, dict) else list(items) ) total = len(normalized) plural = entity if total == 1 else f"{entity}s" await ctx.info(f"🔄 Processing {total} {plural}") results: list[dict[str, Any]] = [] for item in normalized: try: validated = validate_with_schema(item, schema) result = await process_one(validated, ctx) results.append(result) except Exception as exc: results.append({"error": str(exc), "item": item, "success": False}) successful = sum(1 for r in results if r.get("success", True)) failed = total - successful if failed == 0: await ctx.info(f"✅ Processed {successful}/{total} {plural}") else: await ctx.warning( f"⚠️ Processed {successful}/{total} {plural}; {failed} failed" ) return { "items": results, "total_requested": total, "total_successful": successful, "total_failed": failed, "success": failed == 0, } bulk_wrapper.__name__ = getattr(process_one, "__name__", "bulk_tool") bulk_wrapper.__qualname__ = getattr( process_one, "__qualname__", bulk_wrapper.__name__ ) bulk_wrapper.__doc__ = getattr(process_one, "__doc__", None) mcp.tool(annotations=annotations)(bulk_wrapper) return bulk_wrapper return decorator