main.py•7 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-07-07T17:02:39+00:00
import argparse
import json
import os
from typing import *
from typing import Optional, Union
from uuid import UUID
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, HTTPBasic, HTTPBearer
from models import (
FilterProperties,
V1BlocksBlockIdChildrenPatchRequest,
V1BlocksBlockIdPatchRequest,
V1CommentsPostRequest,
V1DatabasesDatabaseIdPatchRequest,
V1DatabasesDatabaseIdQueryPostRequest,
V1DatabasesPostRequest,
V1PagesPageIdPatchRequest,
V1PagesPostRequest,
V1SearchPostRequest,
V1UsersGetResponse,
V1UsersMeGetResponse,
V1UsersUserIdGetResponse,
)
app = MCPProxy(
title='Notion API',
version='1',
servers=[{'url': 'https://api.notion.com'}],
)
@app.delete('/v1/blocks/{block_id}', tags=['block_data_operations'])
def delete_a_block(block_id: str):
"""
Delete a block
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get('/v1/blocks/{block_id}', tags=['block_data_operations'])
def retrieve_a_block(block_id: str):
"""
Retrieve a block
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.patch('/v1/blocks/{block_id}', tags=['block_data_operations'])
def update_a_block(block_id: str, body: V1BlocksBlockIdPatchRequest = None):
"""
Update a block
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get('/v1/blocks/{block_id}/children', tags=['block_data_operations'])
def get_block_children(
block_id: str, start_cursor: Optional[str] = None, page_size: Optional[int] = 100
):
"""
Retrieve block children
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.patch('/v1/blocks/{block_id}/children', tags=['block_data_operations'])
def patch_block_children(
block_id: str, body: V1BlocksBlockIdChildrenPatchRequest = None
):
"""
Append block children
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1/comments',
description=""" Retrieves a list of un-resolved [Comment objects](ref:comment-object) from a page or block. """,
tags=['block_data_operations', 'comment_data_operations'],
)
def retrieve_a_comment(
block_id: str, start_cursor: Optional[str] = None, page_size: Optional[int] = None
):
"""
Retrieve comments
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1/comments',
description=""" Creates a comment in a page or existing discussion thread. """,
tags=['comment_data_operations'],
)
def create_a_comment(body: V1CommentsPostRequest = None):
"""
Create comment
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post('/v1/databases', tags=['database_schema_management'])
def create_a_database(body: V1DatabasesPostRequest = None):
"""
Create a database
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get('/v1/databases/{database_id}', tags=['database_schema_management'])
def retrieve_a_database(database_id: str):
"""
Retrieve a database
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.patch('/v1/databases/{database_id}', tags=['database_schema_management'])
def update_a_database(database_id: str, body: V1DatabasesDatabaseIdPatchRequest = None):
"""
Update a database
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post('/v1/databases/{database_id}/query', tags=['database_schema_management'])
def post_database_query(
database_id: str,
filter_properties: Optional[FilterProperties] = None,
body: V1DatabasesDatabaseIdQueryPostRequest = None,
):
"""
Query a database
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post('/v1/pages', tags=['page_content_management'])
def post_page(body: V1PagesPostRequest = None):
"""
Create a page
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get('/v1/pages/{page_id}', tags=['page_content_management'])
def retrieve_a_page(page_id: str, filter_properties: Optional[str] = None):
"""
Retrieve a page
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.patch('/v1/pages/{page_id}', tags=['page_content_management'])
def patch_page(page_id: str, body: V1PagesPageIdPatchRequest = None):
"""
Update page properties
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1/pages/{page_id}/properties/{property_id}', tags=['page_content_management']
)
def retrieve_a_page_property(
page_id: str,
property_id: str = ...,
page_size: Optional[int] = None,
start_cursor: Optional[str] = None,
):
"""
Retrieve a page property item
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post('/v1/search', tags=['database_schema_management', 'page_content_management'])
def post_search(body: V1SearchPostRequest = None):
"""
Search by title
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1/users',
tags=['user_profile_operations'],
security=[
HTTPBearer(name="None"),
],
)
def get_users(start_cursor: Optional[str] = None, page_size: Optional[int] = 100):
"""
List all users
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get('/v1/users/me', tags=['user_profile_operations'])
def get_self():
"""
Retrieve your token's bot user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get('/v1/users/{user_id}', tags=['user_profile_operations'])
def get_user(user_id: UUID):
"""
Retrieve a user
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)