Skip to main content
Glama

Cloud Datastore API MCP Server

main.py4.97 kB
# generated by fastapi-codegen: # filename: openapi.yaml # timestamp: 2025-06-29T01:47:14+00:00 import argparse import json import os from typing import * from typing import Optional from autogen.mcp.mcp_proxy import MCPProxy from autogen.mcp.mcp_proxy.security import BaseSecurity, UnsuportedSecurityStub from fastapi import Path, Query from models import ( Alt, FieldXgafv, GoogleDatastoreAdminV1beta1ExportEntitiesRequest, GoogleDatastoreAdminV1beta1ImportEntitiesRequest, GoogleLongrunningOperation, ) app = MCPProxy( contact={'name': 'Google', 'url': 'https://google.com', 'x-twitter': 'youtube'}, description='Accesses the schemaless NoSQL database to provide fully managed, robust, scalable storage for your application. ', license={ 'name': 'Creative Commons Attribution 3.0', 'url': 'http://creativecommons.org/licenses/by/3.0/', }, termsOfService='https://developers.google.com/terms/', title='Cloud Datastore API', version='v1beta1', servers=[{'url': 'https://datastore.googleapis.com/'}], ) @app.post( '/v1beta1/projects/{projectId}:export', description=""" Exports a copy of all or a subset of entities from Google Cloud Datastore to another storage system, such as Google Cloud Storage. Recent updates to entities may not be reflected in the export. The export occurs in the background and its progress can be monitored and managed via the Operation resource that is created. The output of an export may only be used once the associated operation is done. If an export operation is cancelled before completion it may leave partial data behind in Google Cloud Storage. """, tags=['datastore_export_import'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def datastore_projects_export( project_id: str = Path(..., alias='projectId'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: GoogleDatastoreAdminV1beta1ExportEntitiesRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/v1beta1/projects/{projectId}:import', description=""" Imports entities into Google Cloud Datastore. Existing entities with the same key are overwritten. The import occurs in the background and its progress can be monitored and managed via the Operation resource that is created. If an ImportEntities operation is cancelled, it is possible that a subset of the data has already been imported to Cloud Datastore. """, tags=['datastore_export_import'], security=[ UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), UnsuportedSecurityStub(name="None"), ], ) def datastore_projects_import( project_id: str = Path(..., alias='projectId'), field__xgafv: Optional[FieldXgafv] = Query(None, alias='$.xgafv'), access_token: Optional[str] = None, alt: Optional[Alt] = None, callback: Optional[str] = None, fields: Optional[str] = None, key: Optional[str] = None, oauth_token: Optional[str] = None, pretty_print: Optional[bool] = Query(None, alias='prettyPrint'), quota_user: Optional[str] = Query(None, alias='quotaUser'), upload_protocol: Optional[str] = None, upload_type: Optional[str] = Query(None, alias='uploadType'), body: GoogleDatastoreAdminV1beta1ImportEntitiesRequest = None, ): raise RuntimeError("Should be patched by MCPProxy and never executed") if __name__ == "__main__": parser = argparse.ArgumentParser(description="MCP Server") parser.add_argument( "transport", choices=["stdio", "sse", "streamable-http"], help="Transport mode (stdio, sse or streamable-http)", ) args = parser.parse_args() if "CONFIG_PATH" in os.environ: config_path = os.environ["CONFIG_PATH"] app.load_configuration(config_path) if "CONFIG" in os.environ: config = os.environ["CONFIG"] app.load_configuration_from_string(config) if "SECURITY" in os.environ: security_params = BaseSecurity.parse_security_parameters_from_env( os.environ, ) app.set_security_params(security_params) mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}")) app.get_mcp(**mcp_settings).run(transport=args.transport)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ag2-mcp-servers/cloud-datastore-api'

If you have feedback or need assistance with the MCP directory API, please join our Discord server