main.py•13.6 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-29T04:51:39+00:00
import argparse
import json
import os
from typing import *
from typing import Optional, Union
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, HTTPBearer
from models import (
ActionsActionPutRequest,
ActionsPostRequest,
AddActionResponse,
AddAddressRequest,
AddDomainRequest,
AddKeyRequest,
AddKeyResponse,
AddTriggerRequest,
AddTriggerResponse,
AddVerificationResponse,
AddWorkflowRequest,
AddWorkspaceRequest,
CheckDomainVerify,
DaemonsDaemonTokenGetResponse,
DomainResponse,
ErrorResponse,
GetAllActionsResponse,
GetAllAddressesResponse,
GetAllDomainsResponse,
GetAllInputsResponse,
GetAllIntegrationsResponse,
GetAllKeysResponse,
GetAllTriggersResponse,
GetAllVerificationsResponse,
GetAllWorkflowsResponse,
GetAllWorkspacesResponse,
Key,
SendRequest,
SetWorkflowRequest,
UpdateKeyRequest,
UpdateUserRequest,
User,
VerificationsPostRequest,
VerificationsVerificationVerifyPostRequest,
)
app = MCPProxy(
license={'name': 'MIT'},
title='Mailscript',
version='0.4.0',
servers=[{'description': 'API server', 'url': 'https://api.mailscript.com/v2'}],
)
@app.get(
'/actions',
tags=['action_management', 'user_profile_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_actions():
"""
Get all actions for the user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/actions',
tags=['action_management'],
security=[
HTTPBearer(name="None"),
],
)
def add_action(body: ActionsPostRequest):
"""
Add an action
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/actions/{action}',
tags=['action_management'],
security=[
HTTPBearer(name="None"),
],
)
def delete_action(action: str):
"""
Delete an action
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.put(
'/actions/{action}',
tags=['action_management'],
security=[
HTTPBearer(name="None"),
],
)
def update_action(action: str, body: ActionsActionPutRequest = ...):
"""
Update an action key
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/addresses',
tags=['address_management'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_addresses():
"""
Get all addresses you have access to
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/addresses',
tags=['address_management'],
security=[
HTTPBearer(name="None"),
],
)
def add_address(body: AddAddressRequest):
"""
Claim a new Mailscript address
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/addresses/{address}',
tags=['address_management', 'action_management'],
security=[
HTTPBearer(name="None"),
],
)
def delete_address(address: str):
"""
Delete a mailscript address
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/addresses/{address}/keys',
tags=['address_management'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_keys(address: str):
"""
List address keys
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/addresses/{address}/keys',
tags=['address_management'],
security=[
HTTPBearer(name="None"),
],
)
def add_key(address: str, body: AddKeyRequest = ...):
"""
Add address key
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/addresses/{address}/keys/{key}',
tags=['address_management'],
security=[
HTTPBearer(name="None"),
],
)
def delete_key(address: str, key: str = ...):
"""
Delete address key
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/addresses/{address}/keys/{key}',
tags=['address_management'],
security=[
HTTPBearer(name="None"),
],
)
def get_key(address: str, key: str = ...):
"""
Get address key
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.put(
'/addresses/{address}/keys/{key}',
tags=['address_management', 'action_management'],
security=[
HTTPBearer(name="None"),
],
)
def update_key(address: str, key: str = ..., body: UpdateKeyRequest = ...):
"""
Update an address key
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/daemons/{daemon}/token',
tags=['integration_handling'],
security=[
HTTPBearer(name="None"),
],
)
def get_daemon_token(daemon: str):
"""
Get a token for opening a daemon connection
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/domains',
tags=['domain_handling'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_domains():
"""
Get all domains you have access to
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/domains',
tags=['domain_handling'],
security=[
HTTPBearer(name="None"),
],
)
def add_domain(body: AddDomainRequest):
"""
Claim a new Domain
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/domains/verify/{domain}',
tags=['domain_handling', 'verification_handling'],
security=[
HTTPBearer(name="None"),
],
)
def get_domain_verify(domain: str):
"""
Get domain verification
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/domains/verify/{domain}',
tags=['domain_handling', 'verification_handling'],
security=[
HTTPBearer(name="None"),
],
)
def check_domain_verify(domain: str):
"""
Check a new Domain
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/domains/{domain}',
tags=['domain_handling'],
security=[
HTTPBearer(name="None"),
],
)
def remove_domain_verify(domain: str):
"""
Remove a domain
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/inputs',
tags=['user_profile_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_inputs(name: Optional[str] = None):
"""
Get all inputs you have access to
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/integrations',
tags=['integration_handling', 'user_profile_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_integrations():
"""
Get all integrations for the user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/integrations/{integration}',
tags=['integration_handling'],
security=[
HTTPBearer(name="None"),
],
)
def delete_integration(integration: str):
"""
Delete an integration
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/send',
tags=['action_management', 'verification_handling', 'verification_execution'],
security=[
HTTPBearer(name="None"),
],
)
def send(body: SendRequest):
"""
Send an email
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/triggers',
tags=['trigger_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_triggers():
"""
Get all triggers you have access to
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/triggers',
tags=['trigger_operations'],
security=[
HTTPBearer(name="None"),
],
)
def add_trigger(body: AddTriggerRequest):
"""
Setup a trigger
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/triggers/{trigger}',
tags=['trigger_operations', 'action_management'],
security=[
HTTPBearer(name="None"),
],
)
def delete_trigger(trigger: str):
"""
Delete a trigger
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.put(
'/triggers/{trigger}',
tags=['trigger_operations', 'action_management'],
security=[
HTTPBearer(name="None"),
],
)
def update_trigger(trigger: str, body: AddTriggerRequest = ...):
"""
Update a trigger
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/user',
tags=['user_profile_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_authenticated_user():
"""
Get the authenticated user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.put(
'/user',
tags=['user_profile_operations', 'action_management'],
security=[
HTTPBearer(name="None"),
],
)
def update_user(body: UpdateUserRequest):
"""
Update a user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/verifications',
tags=['verification_handling'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_verifications():
"""
Get all verificats for the user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/verifications',
tags=['verification_handling', 'verification_execution'],
security=[
HTTPBearer(name="None"),
],
)
def add_verification(body: VerificationsPostRequest):
"""
Start verification process for external email address or sms number
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/verifications/{verification}/verify',
tags=['verification_handling', 'verification_execution'],
security=[
HTTPBearer(name="None"),
],
)
def verify(verification: str, body: VerificationsVerificationVerifyPostRequest = ...):
"""
Verify an email address or sms number with a code
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/workflows',
tags=['workflow_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_workflows():
"""
Get all workflows you have access to
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/workflows',
tags=['workflow_operations'],
security=[
HTTPBearer(name="None"),
],
)
def add_workflow(body: AddWorkflowRequest):
"""
Setup workflow
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/workflows/set',
tags=['workflow_operations'],
security=[
HTTPBearer(name="None"),
],
)
def set_workflow(body: SetWorkflowRequest):
"""
Set a property on a workflow
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.delete(
'/workflows/{workflow}',
tags=['workflow_operations'],
security=[
HTTPBearer(name="None"),
],
)
def delete_workflow(workflow: str):
"""
Delete a workflow
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.put(
'/workflows/{workflow}',
tags=['workflow_operations'],
security=[
HTTPBearer(name="None"),
],
)
def update_workflow(workflow: str, body: AddWorkflowRequest = ...):
"""
Update an workflow
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/workspaces',
tags=['workspace_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_all_workspaces():
"""
Get all workspaces you have access to
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/workspaces',
description=""" An attendant address will be created as well """,
tags=['workspace_operations'],
security=[
HTTPBearer(name="None"),
],
)
def add_workspace(body: AddWorkspaceRequest):
"""
Claim a Mailscript workspace
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)