Skip to main content
Glama
ag2-mcp-servers

Cloud Logging API Server

main.py54.5 kB
# generated by fastapi-codegen: # filename: openapi.yaml # timestamp: 2025-06-29T02:23:29+00:00 import argparse import json import os from typing import * from typing import Optional from autogen.mcp.mcp_proxy import MCPProxy from autogen.mcp.mcp_proxy.security import BaseSecurity, UnsuportedSecurityStub from fastapi import Path, Query from models import ( Alt, CancelOperationRequest, CmekSettings, CopyLogEntriesRequest, Empty, FieldXgafv, Link, ListBucketsResponse, ListExclusionsResponse, ListLinksResponse, ListLocationsResponse, ListLogEntriesRequest, ListLogEntriesResponse, ListLogMetricsResponse, ListLogsResponse, ListMonitoredResourceDescriptorsResponse, ListOperationsResponse, ListSinksResponse, ListViewsResponse, LogBucket, LogExclusion, LogMetric, LogSink, LogView, Operation, ResourceNames, Settings, TailLogEntriesRequest, TailLogEntriesResponse, UndeleteBucketRequest, WriteLogEntriesRequest, WriteLogEntriesResponse, ) app = MCPProxy( contact={'name': 'Google', 'url': 'https://google.com', 'x-twitter': 'youtube'}, description='Writes log entries and manages your Cloud Logging configuration.', license={ 'name': 'Creative Commons Attribution 3.0', 'url': 'http://creativecommons.org/licenses/by/3.0/', }, termsOfService='https://developers.google.com/terms/', title='Cloud Logging API', version='v2', servers=[{'url': 'https://logging.googleapis.com/'}], ) @app.post( '/v2/entries:copy', description=""" Copies a set of log entries from a log bucket to a Cloud Storage bucket. """, tags=[ 'logging_entries_management', 'logging_projects_logs_management', 'logging_projects_metrics_management', ], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_entries_copy( field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: CopyLogEntriesRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/entries:list', description=""" Lists log entries. Use this method to retrieve log entries that originated from a project/folder/organization/billing account. For ways to export log entries, see Exporting Logs (https://cloud.google.com/logging/docs/export). """, tags=['logging_entries_management', 'logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_entries_list( field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: ListLogEntriesRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/entries:tail', description=""" Streaming read of log entries as they are ingested. Until the stream is terminated, it will continue reading logs. """, tags=['logging_entries_management', 'logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_entries_tail( field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: TailLogEntriesRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/entries:write', description=""" Writes log entries to Logging. This API method is the only way to send log entries to Logging. This method is used, directly or indirectly, by the Logging agent (fluentd) and all logging libraries configured to use Logging. A single request may contain log entries for a maximum of 1000 different resources (projects, organizations, billing accounts or folders) """, tags=['logging_entries_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_entries_write( field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: WriteLogEntriesRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/monitoredResourceDescriptors', description=""" Lists the descriptors for monitored resource types used by Logging. """, tags=['logging_operations_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_monitored_resource_descriptors_list( page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.delete( '/v2/{logName}', description=""" Deletes all the log entries in a log for the _Default Log Bucket. The log reappears if it receives new entries. Log entries written shortly before the delete operation might not be deleted. Entries received after the delete operation with a timestamp before the operation will be deleted. """, tags=['logging_projects_logs_management', 'logging_entries_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_logs_delete( log_name: str = Path(..., alias='logName'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.delete( '/v2/{metricName}', description=""" Deletes a logs-based metric. """, tags=['logging_metrics_management', 'logging_projects_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_metrics_delete( metric_name: str = Path(..., alias='metricName'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{metricName}', description=""" Gets a logs-based metric. """, tags=['logging_metrics_management', 'logging_projects_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_metrics_get( metric_name: str = Path(..., alias='metricName'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.put( '/v2/{metricName}', description=""" Creates or updates a logs-based metric. """, tags=['logging_metrics_management', 'logging_projects_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_metrics_update( metric_name: str = Path(..., alias='metricName'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogMetric = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.delete( '/v2/{name}', description=""" Deletes a view on a log bucket. If an UNAVAILABLE error is returned, this indicates that system is not in a state where it can delete the view. If this occurs, please try again in a few minutes. """, tags=['logging_operations_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_views_delete( name: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{name}', description=""" Gets the latest state of a long-running operation. Clients can use this method to poll the operation result at intervals as recommended by the API service. """, tags=[ 'logging_projects_logs_management', 'logging_projects_metrics_management', 'logging_exclusions_management', ], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_operations_get( name: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.patch( '/v2/{name}', description=""" Updates a view on a log bucket. This method replaces the following fields in the existing view with values from the new view: filter. If an UNAVAILABLE error is returned, this indicates that system is not in a state where it can update the view. If this occurs, please try again in a few minutes. """, tags=['logging_views_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_views_patch( name: str, update_mask: Optional[str] = Query(None, alias='updateMask'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogView = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{name}/cmekSettings', description=""" Gets the Logging CMEK settings for the given resource.Note: CMEK for the Log Router can be configured for Google Cloud projects, folders, organizations and billing accounts. Once configured for an organization, it applies to all projects and folders in the Google Cloud organization.See Enabling CMEK for Log Router (https://cloud.google.com/logging/docs/routing/managed-encryption) for more information. """, tags=['logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_get_cmek_settings( name: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.patch( '/v2/{name}/cmekSettings', description=""" Updates the Log Router CMEK settings for the given resource.Note: CMEK for the Log Router can currently only be configured for Google Cloud organizations. Once configured, it applies to all projects and folders in the Google Cloud organization.UpdateCmekSettings will fail if 1) kms_key_name is invalid, or 2) the associated service account does not have the required roles/cloudkms.cryptoKeyEncrypterDecrypter role assigned for the key, or 3) access to the key is disabled.See Enabling CMEK for Log Router (https://cloud.google.com/logging/docs/routing/managed-encryption) for more information. """, tags=['logging_settings_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_update_cmek_settings( name: str, update_mask: Optional[str] = Query(None, alias='updateMask'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: CmekSettings = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{name}/locations', description=""" Lists information about the supported locations for this service. """, tags=['logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_list( name: str, filter: Optional[str] = None, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{name}/operations', description=""" Lists operations that match the specified filter in the request. If the server doesn't support this method, it returns UNIMPLEMENTED. """, tags=['logging_projects_logs_management', 'logging_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_operations_list( name: str, filter: Optional[str] = None, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{name}/settings', description=""" Gets the Log Router settings for the given resource.Note: Settings for the Log Router can be get for Google Cloud projects, folders, organizations and billing accounts. Currently it can only be configured for organizations. Once configured for an organization, it applies to all projects and folders in the Google Cloud organization.See Enabling CMEK for Log Router (https://cloud.google.com/logging/docs/routing/managed-encryption) for more information. """, tags=['logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_get_settings( name: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.patch( '/v2/{name}/settings', description=""" Updates the Log Router settings for the given resource.Note: Settings for the Log Router can currently only be configured for Google Cloud organizations. Once configured, it applies to all projects and folders in the Google Cloud organization.UpdateSettings will fail if 1) kms_key_name is invalid, or 2) the associated service account does not have the required roles/cloudkms.cryptoKeyEncrypterDecrypter role assigned for the key, or 3) access to the key is disabled. 4) location_id is not supported by Logging. 5) location_id violate OrgPolicy.See Enabling CMEK for Log Router (https://cloud.google.com/logging/docs/routing/managed-encryption) for more information. """, tags=['logging_settings_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_update_settings( name: str, update_mask: Optional[str] = Query(None, alias='updateMask'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: Settings = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{name}:cancel', description=""" Starts asynchronous cancellation on a long-running operation. The server makes a best effort to cancel the operation, but success is not guaranteed. If the server doesn't support this method, it returns google.rpc.Code.UNIMPLEMENTED. Clients can use Operations.GetOperation or other methods to check whether the cancellation succeeded or whether the operation completed despite cancellation. On successful cancellation, the operation is not deleted; instead, it becomes an operation with an Operation.error value with a google.rpc.Status.code of 1, corresponding to Code.CANCELLED. """, tags=['logging_operations_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_operations_cancel( name: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: CancelOperationRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{name}:undelete', description=""" Undeletes a log bucket. A bucket that has been deleted can be undeleted within the grace period of 7 days. """, tags=['logging_buckets_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_undelete( name: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: UndeleteBucketRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{name}:updateAsync', description=""" Updates a log bucket asynchronously.If the bucket has a lifecycle_state of DELETE_REQUESTED, then FAILED_PRECONDITION will be returned.After a bucket has been created, the bucket's location cannot be changed. """, tags=['logging_buckets_management', 'logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_update_async( name: str, update_mask: Optional[str] = Query(None, alias='updateMask'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogBucket = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/buckets', description=""" Lists log buckets. """, tags=['logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/buckets', description=""" Creates a log bucket that can be used to store log entries. After a bucket has been created, the bucket's location cannot be changed. """, tags=['logging_buckets_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_create( parent: str, bucket_id: Optional[str] = Query(None, alias='bucketId'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogBucket = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/buckets:createAsync', description=""" Creates a log bucket asynchronously that can be used to store log entries.After a bucket has been created, the bucket's location cannot be changed. """, tags=['logging_buckets_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_create_async( parent: str, bucket_id: Optional[str] = Query(None, alias='bucketId'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogBucket = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/exclusions', description=""" Lists all the exclusions on the _Default sink in a parent resource. """, tags=[ 'logging_projects_logs_management', 'logging_projects_metrics_management', 'logging_exclusions_management', 'logging_settings_management', ], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_exclusions_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/exclusions', description=""" Creates a new exclusion in the _Default sink in a specified parent resource. Only log entries belonging to that resource can be excluded. You can have up to 10 exclusions in a resource. """, tags=['logging_exclusions_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_exclusions_create( parent: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogExclusion = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/links', description=""" Lists links. """, tags=['logging_projects_logs_management', 'logging_entries_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_links_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/links', description=""" Asynchronously creates a linked dataset in BigQuery which makes it possible to use BigQuery to read the logs stored in the log bucket. A log bucket may currently only contain one link. """, tags=['logging_link_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_links_create( parent: str, link_id: Optional[str] = Query(None, alias='linkId'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: Link = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/logs', description=""" Lists the logs in projects, organizations, folders, or billing accounts. Only logs that have entries are listed. """, tags=['logging_projects_logs_management', 'logging_projects_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_logs_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), resource_names: Optional[ResourceNames] = Query(None, alias='resourceNames'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/metrics', description=""" Lists logs-based metrics. """, tags=['logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_metrics_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/metrics', description=""" Creates a logs-based metric. """, tags=['logging_metrics_management', 'logging_projects_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_metrics_create( parent: str, field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogMetric = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/sinks', description=""" Lists sinks. """, tags=['logging_projects_logs_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_sinks_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/sinks', description=""" Creates a sink that exports specified log entries to a destination. The export of newly-ingested log entries begins immediately, unless the sink's writer_identity is not permitted to write to the destination. A sink can export log entries only from the resource owning the sink. """, tags=['logging_sinks_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_sinks_create( parent: str, unique_writer_identity: Optional[bool] = Query(None, alias='uniqueWriterIdentity'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogSink = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{parent}/views', description=""" Lists views on a log bucket. """, tags=['logging_projects_logs_management', 'logging_projects_metrics_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_views_list( parent: str, page_size: Optional[int] = Query(None, alias='pageSize'), page_token: Optional[str] = Query(None, alias='pageToken'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v2/{parent}/views', description=""" Creates a view over log entries in a log bucket. A bucket may contain a maximum of 30 views. """, tags=['logging_views_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_locations_buckets_views_create( parent: str, view_id: Optional[str] = Query(None, alias='viewId'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogView = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.delete( '/v2/{sinkName}', description=""" Deletes a sink. If the sink has a unique writer_identity, then that service account is also deleted. """, tags=['logging_sinks_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_sinks_delete( sink_name: str = Path(..., alias='sinkName'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/v2/{sinkName}', description=""" Gets a sink. """, tags=['logging_sinks_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_sinks_get( sink_name: str = Path(..., alias='sinkName'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.patch( '/v2/{sinkName}', description=""" Updates a sink. This method replaces the following fields in the existing sink with values from the new sink: destination, and filter.The updated sink might also have a new writer_identity; see the unique_writer_identity field. """, tags=['logging_sinks_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_projects_sinks_patch( sink_name: str = Path(..., alias='sinkName'), unique_writer_identity: Optional[bool] = Query(None, alias='uniqueWriterIdentity'), update_mask: Optional[str] = Query(None, alias='updateMask'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogSink = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.put( '/v2/{sinkName}', description=""" Updates a sink. This method replaces the following fields in the existing sink with values from the new sink: destination, and filter.The updated sink might also have a new writer_identity; see the unique_writer_identity field. """, tags=['logging_sinks_management'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def logging_sinks_update( sink_name: str = Path(..., alias='sinkName'), unique_writer_identity: Optional[bool] = Query(None, alias='uniqueWriterIdentity'), update_mask: Optional[str] = Query(None, alias='updateMask'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: LogSink = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") if __name__ == "__main__": parser = argparse.ArgumentParser(description="MCP Server") parser.add_argument( "transport", choices=["stdio", "sse", "streamable-http"], help="Transport mode (stdio, sse or streamable-http)", ) args = parser.parse_args() if "CONFIG_PATH" in os.environ: config_path = os.environ["CONFIG_PATH"] app.load_configuration(config_path) if "CONFIG" in os.environ: config = os.environ["CONFIG"] app.load_configuration_from_string(config) if "SECURITY" in os.environ: security_params = BaseSecurity.parse_security_parameters_from_env( os.environ, ) app.set_security_params(security_params) mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}")) app.get_mcp(**mcp_settings).run(transport=args.transport)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ag2-mcp-servers/cloud-logging-api'

If you have feedback or need assistance with the MCP directory API, please join our Discord server