main.py•3.16 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-28T15:31:02+00:00
import argparse
import json
import os
from typing import *
from typing import Union
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import APIKeyHeader, BaseSecurity
from models import (
DrivingLicenseSchema,
DrvlcCertificatePostRequest,
DrvlcCertificatePostResponse,
DrvlcCertificatePostResponse1,
DrvlcCertificatePostResponse2,
DrvlcCertificatePostResponse3,
DrvlcCertificatePostResponse4,
DrvlcCertificatePostResponse5,
DrvlcCertificatePostResponse6,
RvcerCertificatePostRequest,
RvcerCertificatePostResponse,
RvcerCertificatePostResponse1,
RvcerCertificatePostResponse2,
RvcerCertificatePostResponse3,
RvcerCertificatePostResponse4,
RvcerCertificatePostResponse5,
RvcerCertificatePostResponse6,
VehicleRegistrationSchema,
)
app = MCPProxy(
description='Driving License (DL) and Vehicle Registration Certificate (RC) of the State, as available on Parivahan Sewa (http://parivahan.co.in/) of Ministry of Road Transport and Highways, are available on DigiLocker. Citizens can pull these documents into their DigiLocker accounts.',
termsOfService='https://apisetu.gov.in/terms.php',
title='Transport Department, Puducherry',
version='3.0.0',
servers=[{'url': 'https://apisetu.gov.in/transportpy/v3'}],
)
@app.post(
'/drvlc/certificate',
description=""" API to verify Driving License. """,
tags=['driving_license_operations'],
security=[
APIKeyHeader(name="X-APISETU-APIKEY"),
APIKeyHeader(name="X-APISETU-CLIENTID"),
],
)
def drvlc(body: DrvlcCertificatePostRequest = None):
"""
Driving License
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/rvcer/certificate',
description=""" API to verify Registration of Vehicles. """,
tags=['vehicle_registration_processes'],
security=[
APIKeyHeader(name="X-APISETU-APIKEY"),
APIKeyHeader(name="X-APISETU-CLIENTID"),
],
)
def rvcer(body: RvcerCertificatePostRequest = None):
"""
Registration of Vehicles
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)