# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-29T03:33:06+00:00
import argparse
import json
import os
from datetime import datetime
from typing import *
from typing import Optional, Union
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import (
APIKeyHeader,
APIKeyQuery,
BaseSecurity,
UnsuportedSecurityStub,
)
from fastapi import Path, Query
from models import CollectionResponseWithTotalDomainForwardPaging, Domain, Error, Sort
app = MCPProxy(
title='Domains',
version='v3',
servers=[{'url': 'https://api.hubapi.com/'}],
)
@app.get(
'/cms/v3/domains/',
description=""" Returns all existing domains that have been created. Results can be limited and filtered by creation or updated date. """,
tags=['domain_management'],
security=[
APIKeyHeader(name="private-app-legacy"),
UnsuportedSecurityStub(name="None"),
APIKeyHeader(name="private-app"),
APIKeyQuery(name="hapikey"),
APIKeyHeader(name="private-app"),
UnsuportedSecurityStub(name="None"),
APIKeyHeader(name="private-app-legacy"),
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def get__cms_v3_domains__get_page(
created_at: Optional[datetime] = Query(None, alias='createdAt'),
created_after: Optional[datetime] = Query(None, alias='createdAfter'),
created_before: Optional[datetime] = Query(None, alias='createdBefore'),
updated_at: Optional[datetime] = Query(None, alias='updatedAt'),
updated_after: Optional[datetime] = Query(None, alias='updatedAfter'),
updated_before: Optional[datetime] = Query(None, alias='updatedBefore'),
sort: Optional[Sort] = None,
after: Optional[str] = None,
limit: Optional[int] = None,
archived: Optional[bool] = None,
):
"""
Get current domains
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/cms/v3/domains/{domainId}',
description=""" Returns a single domains with the id specified. """,
tags=['domain_management'],
security=[
APIKeyHeader(name="private-app-legacy"),
UnsuportedSecurityStub(name="None"),
APIKeyHeader(name="private-app"),
APIKeyQuery(name="hapikey"),
APIKeyHeader(name="private-app"),
UnsuportedSecurityStub(name="None"),
APIKeyHeader(name="private-app-legacy"),
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def get__cms_v3_domains__domain_id__get_by_id(
domain_id: str = Path(..., alias='domainId')
):
"""
Get a single domain
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)