main.py•13.4 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-07-07T17:37:22+00:00
import argparse
import json
import os
from typing import *
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, HTTPBasic
from starlette.requests import Request
from models import (
BulkCallResponse,
CallResponse,
CancelScheduledHangupResponse,
CancelScheduledPlayResponse,
ConferenceDeafResponse,
ConferenceHangupResponse,
ConferenceKickResponse,
ConferenceListMembersResponse,
ConferenceListResponse,
ConferenceMuteResponse,
ConferencePlayResponse,
ConferenceRecordStartResponse,
ConferenceRecordStopResponse,
ConferenceSpeakResponse,
ConferenceUndeafResponse,
ConferenceUnmuteResponse,
GroupCallResponse,
HangupAllCallsResponse,
HangupCallResponse,
PlayResponse,
PlayStopResponse,
RecordStartResponse,
RecordStopResponse,
ScheduleHangupResponse,
SchedulePlayResponse,
SendDigitsResponse,
SoundTouchResponse,
SoundTouchStopResponse,
TransferCallResponse,
)
app = MCPProxy(
description='Eqivo OpenApi Specification',
title='Eqivo API',
version='v0.1',
servers=[
{'url': 'https://raw.github.com/rtckit/media/master/eqivo/readme-splash.png'}
],
)
@app.post(
'/v0.1/BulkCall/',
description=""" Initiates multiple concurrent outbound calls """,
tags=['call_operations', 'conference_call_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__bulk_call_(request: Request):
"""
/v0.1/BulkCall/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/Call/',
description=""" Initiates an outbound call """,
tags=[
'call_operations',
'conference_call_management',
'audio_control',
'call_recording_management',
],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__call_(request: Request):
"""
/v0.1/Call/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/CancelScheduledHangup/',
description=""" Cancels a scheduled hangup for a call """,
tags=['call_operations', 'action_scheduling'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__cancel_scheduled_hangup_(request: Request):
"""
/v0.1/CancelScheduledHangup/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/CancelScheduledPlay/',
description=""" Cancels a scheduled playback request """,
tags=['action_scheduling'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__cancel_scheduled_play_(request: Request):
"""
/v0.1/CancelScheduledPlay/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceDeaf/',
description=""" Blocks audio to one or more conference members """,
tags=['conference_call_management', 'call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_deaf_(request: Request):
"""
/v0.1/ConferenceDeaf/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceHangup/',
description=""" Kicks one or more conference members, without playing the kick sound """,
tags=['call_operations', 'conference_call_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_hangup_(request: Request):
"""
/v0.1/ConferenceHangup/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceKick/',
description=""" Kicks one or more conference members """,
tags=['conference_call_management', 'call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_kick_(request: Request):
"""
/v0.1/ConferenceKick/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceList/',
description=""" Returns a list of all established conferences """,
tags=['conference_call_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_list_(request: Request):
"""
/v0.1/ConferenceList/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceListMembers/',
description=""" Retrieves the member list for a given conference """,
tags=['conference_call_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_list_members_(request: Request):
"""
/v0.1/ConferenceListMembers/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceMute/',
description=""" Blocks audio from one or more conference members """,
tags=['conference_call_management', 'call_operations', 'audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_mute_(request: Request):
"""
/v0.1/ConferenceMute/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferencePlay/',
description=""" Plays media to one or more conference members """,
tags=['conference_call_management', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_play_(request: Request):
"""
/v0.1/ConferencePlay/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceRecordStart/',
description=""" Initiates a conference recording """,
tags=['conference_call_management', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_record_start_(request: Request):
"""
/v0.1/ConferenceRecordStart/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceRecordStop/',
description=""" Stops a conference recording """,
tags=['conference_call_management', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_record_stop_(request: Request):
"""
/v0.1/ConferenceRecordStop/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceSpeak/',
description=""" Plays synthesized speech into a conference """,
tags=['conference_call_management', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_speak_(request: Request):
"""
/v0.1/ConferenceSpeak/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceUndeaf/',
description=""" Restores audio to one or more conference members """,
tags=['conference_call_management', 'call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_undeaf_(request: Request):
"""
/v0.1/ConferenceUndeaf/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ConferenceUnmute/',
description=""" Restores audio from one or more conference members """,
tags=['conference_call_management', 'audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__conference_unmute_(request: Request):
"""
/v0.1/ConferenceUnmute/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/GroupCall/',
description=""" Initiate multiple racing outbound calls """,
tags=['call_operations', 'conference_call_management', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__group_call_(request: Request):
"""
/v0.1/GroupCall/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/HangupAllCalls/',
description=""" Hangs up all established calls """,
tags=['call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__hangup_all_calls_():
"""
/v0.1/HangupAllCalls/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/HangupCall/',
description=""" Hangs up a specific call """,
tags=['call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__hangup_call_(request: Request):
"""
/v0.1/HangupCall/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/Play/',
description=""" Plays media into a live call """,
tags=['call_operations', 'audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__play_(request: Request):
"""
/v0.1/Play/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/PlayStop/',
description=""" Interrupts media playback on a given call """,
tags=['audio_control', 'call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__play_stop_(request: Request):
"""
/v0.1/PlayStop/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/RecordStart/',
description=""" Initiates recording of a given call """,
tags=['call_operations', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__record_start_(request: Request):
"""
/v0.1/RecordStart/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/RecordStop/',
description=""" Stops the recording of a given call """,
tags=['call_operations', 'call_recording_management'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__record_stop_(request: Request):
"""
/v0.1/RecordStop/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/ScheduleHangup/',
description=""" Schedules a hangup for a specific call """,
tags=['call_operations', 'action_scheduling'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__schedule_hangup_(request: Request):
"""
/v0.1/ScheduleHangup/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/SchedulePlay/',
description=""" Schedules media playback for a specific call """,
tags=['action_scheduling', 'audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__schedule_play_(request: Request):
"""
/v0.1/SchedulePlay/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/SendDigits/',
description=""" Emits DMTF tones to a call """,
tags=['call_operations', 'audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__send_digits_(request: Request):
"""
/v0.1/SendDigits/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/SoundTouch/',
description=""" Applies SoundTouch effects to a live call """,
tags=['audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__sound_touch_(request: Request):
"""
/v0.1/SoundTouch/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/SoundTouchStop/',
description=""" Removes SoundTouch effects from a given call """,
tags=['audio_control'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__sound_touch_stop_(request: Request):
"""
/v0.1/SoundTouchStop/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v0.1/TransferCall/',
description=""" Replaces the RestXML flow of a live call """,
tags=['call_operations'],
security=[
HTTPBasic(name="None"),
],
)
def post_v0_1__transfer_call_(request: Request):
"""
/v0.1/TransferCall/
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)