# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-06-29T02:12:59+00:00
import argparse
import json
import os
from typing import *
from typing import Optional
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, UnsuportedSecurityStub
from fastapi import Query
from models import (
Alt,
CancelOperationRequest,
Empty,
FieldXgafv,
GenerateConnectManifestResponse,
GenerateExclusivityManifestResponse,
ListLocationsResponse,
ListMembershipsResponse,
ListOperationsResponse,
Membership,
Operation,
Policy,
SetIamPolicyRequest,
TestIamPermissionsRequest,
TestIamPermissionsResponse,
ValidateExclusivityResponse,
)
app = MCPProxy(
contact={'name': 'Google', 'url': 'https://google.com', 'x-twitter': 'youtube'},
description='',
license={
'name': 'Creative Commons Attribution 3.0',
'url': 'http://creativecommons.org/licenses/by/3.0/',
},
termsOfService='https://developers.google.com/terms/',
title='GKE Hub API',
version='v1beta1',
servers=[{'url': 'https://gkehub.googleapis.com/'}],
)
@app.delete(
'/v1beta1/{name}',
description=""" Deletes a long-running operation. This method indicates that the client is no longer interested in the operation result. It does not cancel the operation. If the server doesn't support this method, it returns `google.rpc.Code.UNIMPLEMENTED`. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_operations_delete(
name: str,
force: Optional[bool] = None,
request_id: Optional[str] = Query(None, alias='requestId'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{name}',
description=""" Gets the latest state of a long-running operation. Clients can use this method to poll the operation result at intervals as recommended by the API service. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_operations_get(
name: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.patch(
'/v1beta1/{name}',
description=""" Updates an existing Membership. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_patch(
name: str,
request_id: Optional[str] = Query(None, alias='requestId'),
update_mask: Optional[str] = Query(None, alias='updateMask'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: Membership = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{name}/locations',
description=""" Lists information about the supported locations for this service. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_list(
name: str,
filter: Optional[str] = None,
page_size: Optional[int] = Query(None, alias='pageSize'),
page_token: Optional[str] = Query(None, alias='pageToken'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{name}/operations',
description=""" Lists operations that match the specified filter in the request. If the server doesn't support this method, it returns `UNIMPLEMENTED`. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_operations_list(
name: str,
filter: Optional[str] = None,
page_size: Optional[int] = Query(None, alias='pageSize'),
page_token: Optional[str] = Query(None, alias='pageToken'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1beta1/{name}:cancel',
description=""" Starts asynchronous cancellation on a long-running operation. The server makes a best effort to cancel the operation, but success is not guaranteed. If the server doesn't support this method, it returns `google.rpc.Code.UNIMPLEMENTED`. Clients can use Operations.GetOperation or other methods to check whether the cancellation succeeded or whether the operation completed despite cancellation. On successful cancellation, the operation is not deleted; instead, it becomes an operation with an Operation.error value with a google.rpc.Status.code of 1, corresponding to `Code.CANCELLED`. """,
tags=['gkehub_operations_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_operations_cancel(
name: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: CancelOperationRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{name}:generateConnectManifest',
description=""" Generates the manifest for deployment of the GKE connect agent. **This method is used internally by Google-provided libraries.** Most clients should not need to call this method directly. """,
tags=['gkehub_manifest_generation'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_generate_connect_manifest(
name: str,
connect_agent_name: Optional[str] = Query(None, alias='connectAgent.name'),
connect_agent_namespace: Optional[str] = Query(
None, alias='connectAgent.namespace'
),
connect_agent_proxy: Optional[str] = Query(None, alias='connectAgent.proxy'),
image_pull_secret_content: Optional[str] = Query(
None, alias='imagePullSecretContent'
),
is_upgrade: Optional[bool] = Query(None, alias='isUpgrade'),
registry: Optional[str] = None,
version: Optional[str] = None,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{name}:generateExclusivityManifest',
description=""" GenerateExclusivityManifest generates the manifests to update the exclusivity artifacts in the cluster if needed. Exclusivity artifacts include the Membership custom resource definition (CRD) and the singleton Membership custom resource (CR). Combined with ValidateExclusivity, exclusivity artifacts guarantee that a Kubernetes cluster is only registered to a single GKE Hub. The Membership CRD is versioned, and may require conversion when the GKE Hub API server begins serving a newer version of the CRD and corresponding CR. The response will be the converted CRD and CR if there are any differences between the versions. """,
tags=['gkehub_manifest_generation'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def generate_exclusivity_manifest_for_membership(
name: str,
cr_manifest: Optional[str] = Query(None, alias='crManifest'),
crd_manifest: Optional[str] = Query(None, alias='crdManifest'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{parent}/memberships',
description=""" Lists Memberships in a given project and location. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_list(
parent: str,
filter: Optional[str] = None,
order_by: Optional[str] = Query(None, alias='orderBy'),
page_size: Optional[int] = Query(None, alias='pageSize'),
page_token: Optional[str] = Query(None, alias='pageToken'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1beta1/{parent}/memberships',
description=""" Creates a new Membership. **This is currently only supported for GKE clusters on Google Cloud**. To register other clusters, follow the instructions at https://cloud.google.com/anthos/multicluster-management/connect/registering-a-cluster. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_create(
parent: str,
membership_id: Optional[str] = Query(None, alias='membershipId'),
request_id: Optional[str] = Query(None, alias='requestId'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: Membership = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{parent}/memberships:validateExclusivity',
description=""" ValidateExclusivity validates the state of exclusivity in the cluster. The validation does not depend on an existing Hub membership resource. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_validate_exclusivity(
parent: str,
cr_manifest: Optional[str] = Query(None, alias='crManifest'),
intended_membership: Optional[str] = Query(None, alias='intendedMembership'),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/v1beta1/{resource}:getIamPolicy',
description=""" Gets the access control policy for a resource. Returns an empty policy if the resource exists and does not have a policy set. """,
tags=['gkehub_membership_management', 'gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_get_iam_policy(
resource: str,
options_requested_policy_version: Optional[int] = Query(
None, alias='options.requestedPolicyVersion'
),
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1beta1/{resource}:setIamPolicy',
description=""" Sets the access control policy on the specified resource. Replaces any existing policy. Can return `NOT_FOUND`, `INVALID_ARGUMENT`, and `PERMISSION_DENIED` errors. """,
tags=['gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_set_iam_policy(
resource: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: SetIamPolicyRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/v1beta1/{resource}:testIamPermissions',
description=""" Returns permissions that a caller has on the specified resource. If the resource does not exist, this will return an empty set of permissions, not a `NOT_FOUND` error. Note: This operation is designed to be used for building permission-aware UIs and command-line tools, not for authorization checking. This operation may "fail open" without warning. """,
tags=['gkehub_membership_iam_policy_management'],
security=[
UnsuportedSecurityStub(name="None"),
UnsuportedSecurityStub(name="None"),
],
)
def gkehub_projects_locations_memberships_test_iam_permissions(
resource: str,
field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'),
access_token: Optional[str] = None,
alt: Optional[Alt] = None,
callback: Optional[str] = None,
fields: Optional[str] = None,
key: Optional[str] = None,
oauth_token: Optional[str] = None,
pretty_print: Optional[bool] = Query(None, alias='prettyPrint'),
quota_user: Optional[str] = Query(None, alias='quotaUser'),
upload_protocol: Optional[str] = None,
upload_type: Optional[str] = Query(None, alias='uploadType'),
body: TestIamPermissionsRequest = None,
):
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)