# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-29T02:41:00+00:00
import argparse
import json
import os
from typing import *
from typing import Optional
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, UnsuportedSecurityStub
from fastapi import Query
from models import (
ActivateBiddingFunctionRequest,
Alt,
ArchiveBiddingFunctionRequest,
BiddingFunction,
FieldXgafv,
ListBiddingFunctionsResponse,
)
app = MCPProxy(
contact={'name': 'Google', 'url': 'https://google.com', 'x-twitter': 'youtube'},
description='Allows external bidders to manage their RTB integration with Google. This includes managing bidder endpoints, QPS quotas, configuring what ad inventory to receive via pretargeting, submitting creatives for verification, and accessing creative metadata such as approval status.',
license={
'name': 'Creative Commons Attribution 3.0',
'url': 'http://creativecommons.org/licenses/by/3.0/',
},
termsOfService='https://developers.google.com/terms/',
title='Real-time Bidding API',
version='v1alpha',
servers=[{'url': 'https://realtimebidding.googleapis.com/'}],
)
@app.post(
'/v1alpha/{name}:activate',
description=""" Activates an existing bidding function. An activated function is available for invocation for the server-side TURTLEDOVE simulations. """,
tags=['bidding_function_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def realtimebidding_bidders_bidding_functions_activate(
name: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: ActivateBiddingFunctionRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1alpha/{name}:archive',
description=""" Archives an existing bidding function. An archived function will not be available for function invocation for the server-side TURTLEDOVE simulations unless it is activated. """,
tags=['bidding_function_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def realtimebidding_bidders_bidding_functions_archive(
name: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: ArchiveBiddingFunctionRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1alpha/{parent}/biddingFunctions',
description=""" Lists the bidding functions that a bidder currently has registered. """,
tags=['bidding_function_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def realtimebidding_bidders_bidding_functions_list(
parent: str,
page_size: Optional[int] = Query(None, alias='pageSize'),
page_token: Optional[str] = Query(None, alias='pageToken'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1alpha/{parent}/biddingFunctions',
description=""" Creates a new bidding function. """,
tags=['bidding_function_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def realtimebidding_bidders_bidding_functions_create(
parent: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: BiddingFunction = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)