main.py•2.95 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-29T07:45:21+00:00
import argparse
import json
import os
from typing import *
from typing import Optional
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import APIKeyQuery, BaseSecurity
from fastapi import Path
from models import Source
from models.Content import JsonGetResponse
from models.Content_source__section_ import JsonGetResponse
from models.Content_source__section__time_period_ import JsonGetResponse
app = MCPProxy(
description='With the Times Newswire API, you can get links and metadata for Times articles and blog posts as soon as they are published on NYTimes.com. The Times Newswire API provides an up-to-the-minute stream of published items.',
termsOfService='http://developer.nytimes.com/tou',
title='Times Newswire API',
version='3.0.0',
servers=[
{'url': 'http://api.nytimes.com/svc/news/v3'},
{'url': 'https://api.nytimes.com/svc/news/v3'},
],
)
@app.get(
'/content.json',
tags=['content_retrieval'],
security=[
APIKeyQuery(name="api-key"),
],
)
def get_content_json(url: str):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/content/{source}/{section}.json',
tags=['content_retrieval'],
security=[
APIKeyQuery(name="api-key"),
],
)
def get_content__source__section_json(
source: Source,
section: str = ...,
limit: Optional[int] = 20,
offset: Optional[int] = 0,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/content/{source}/{section}/{time-period}.json',
tags=['content_retrieval'],
security=[
APIKeyQuery(name="api-key"),
],
)
def get_content__source__section__time_period_json(
source: Source,
section: str = ...,
time_period: int = Path(..., alias='time-period'),
limit: Optional[int] = 20,
offset: Optional[int] = 0,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)