# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-29T02:22:32+00:00
import argparse
import json
import os
from typing import *
from typing import Optional
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, UnsuportedSecurityStub
from fastapi import Query
from models import (
Alt,
CancelOperationRequest,
Empty,
FieldXgafv,
ListLocationsResponse,
ListOperationsResponse,
Operation,
RunPipelineRequest,
)
app = MCPProxy(
contact={'name': 'Google', 'url': 'https://google.com', 'x-twitter': 'youtube'},
description='Cloud Life Sciences is a suite of services and tools for managing, processing, and transforming life sciences data.',
license={
'name': 'Creative Commons Attribution 3.0',
'url': 'http://creativecommons.org/licenses/by/3.0/',
},
termsOfService='https://developers.google.com/terms/',
title='Cloud Life Sciences API',
version='v2beta',
servers=[{'url': 'https://lifesciences.googleapis.com/'}],
)
@app.get(
'/v2beta/{name}',
description=""" Gets the latest state of a long-running operation. Clients can use this method to poll the operation result at intervals as recommended by the API service. Authorization requires the following [Google IAM](https://cloud.google.com/iam) permission: * `lifesciences.operations.get` """,
tags=['lifescience_project_operations'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def lifesciences_projects_locations_operations_get(
name: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v2beta/{name}/locations',
description=""" Lists information about the supported locations for this service. """,
tags=['lifescience_project_operations'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def lifesciences_projects_locations_list(
name: str,
filter: Optional[str] = None,
page_size: Optional[int] = Query(None, alias='pageSize'),
page_token: Optional[str] = Query(None, alias='pageToken'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v2beta/{name}/operations',
description=""" Lists operations that match the specified filter in the request. Authorization requires the following [Google IAM](https://cloud.google.com/iam) permission: * `lifesciences.operations.list` """,
tags=['lifescience_project_operations'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def lifesciences_projects_locations_operations_list(
name: str,
filter: Optional[str] = None,
page_size: Optional[int] = Query(None, alias='pageSize'),
page_token: Optional[str] = Query(None, alias='pageToken'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v2beta/{name}:cancel',
description=""" Starts asynchronous cancellation on a long-running operation. The server makes a best effort to cancel the operation, but success is not guaranteed. Clients may use Operations.GetOperation or Operations.ListOperations to check whether the cancellation succeeded or the operation completed despite cancellation. Authorization requires the following [Google IAM](https://cloud.google.com/iam) permission: * `lifesciences.operations.cancel` """,
tags=['lifescience_project_operations'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def lifesciences_projects_locations_operations_cancel(
name: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: CancelOperationRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v2beta/{parent}/pipelines:run',
description=""" Runs a pipeline. The returned Operation's metadata field will contain a google.cloud.lifesciences.v2beta.Metadata object describing the status of the pipeline execution. The response field will contain a google.cloud.lifesciences.v2beta.RunPipelineResponse object if the pipeline completes successfully. **Note:** Before you can use this method, the *Life Sciences Service Agent* must have access to your project. This is done automatically when the Cloud Life Sciences API is first enabled, but if you delete this permission you must disable and re-enable the API to grant the Life Sciences Service Agent the required permissions. Authorization requires the following [Google IAM](https://cloud.google.com/iam/) permission: * `lifesciences.workflows.run` """,
tags=['lifescience_project_operations'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def lifesciences_projects_locations_pipelines_run(
parent: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: RunPipelineRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)