Skip to main content
Glama

Gmail API MCP Server

by rigzindorje
main.py8.33 kB
# generated by fastapi-codegen: # filename: openapi.yaml # timestamp: 2025-07-16T14:21:16+00:00 import argparse import json import os from typing import * from typing import Optional, Union from autogen.mcp.mcp_proxy import MCPProxy from autogen.mcp.mcp_proxy.security import BaseSecurity, HTTPBasic, HTTPBearer from models import ( AssetsGetResponse, ErrorResponse, FindingsGetResponse, IdentityGetResponse, InvestigationCreatePayload, InvestigationCreateResponse, InvestigationUpdatePayload, InvestigationUpdateResponse, PublicV2FindingsGetResponse, PublicV2InvestigationsGetResponse, RateLimitExceededResponse, RiskScoreRetrieveResponse, RiskScoreUpdatePayload, RiskScoreUpdateResponse, ) app = MCPProxy( description='The Splunk Enterprise Security API allows you to use and modify findings, investigations, risk scores, assets, and identities in Splunk Enterprise Security.\n', title='Splunk Enterprise Security API Reference', version='8.1.0', servers=[ { 'description': 'The production API server.', 'url': 'https://{stack}:{port}/servicesNS/nobody/missioncontrol', 'variables': { 'port': { 'default': '8089', 'description': 'This value is assigned by the service provider. For example `8089`.', }, 'stack': { 'default': 'blueridge.splunkcloud.com', 'description': 'This value is assigned by the service provider. For example, `blueridge.splunkcloud.com`.', }, }, } ], ) @app.get( '/public/v2/assets/{id}', description=""" Retrieve assets using the ID of the KV collection assets_by_str. Requires mc_assets_read or admin_all_objects capabilities. """, tags=['asset_information_retrieval'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_assets(id: str): """ Retrieve assets """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/public/v2/findings', description=""" The API for retrieving findings by the querying fields. Requires mc_investigation_read or admin_all_objects capabilities. """, tags=['finding_query_management'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_get_findings( finding_ids: Optional[str] = None, urgency: Optional[str] = None, status: Optional[str] = None, owner: Optional[str] = None, disposition: Optional[str] = None, limit: Optional[float] = None, offset: Optional[float] = None, sort: Optional[str] = None, fields: Optional[str] = None, earliest: Optional[str] = None, latest: Optional[str] = None, rule_title: Optional[str] = None, ): """ Retrieve findings by the querying fields """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/public/v2/findings/{id}', description=""" Retrieve a finding using its ID. Requires mc_investigation_read or admin_all_objects capabilities. """, tags=['finding_query_management'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_get_finding_by_id( id: str, earliest: Optional[str] = None, latest: Optional[str] = None, fields: Optional[str] = None, ): """ Retrieve findings """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/public/v2/identity/{id}', description=""" Retrieve an identity using the ID of the identity. Requires mc_identity_read or admin_all_objects capabilities. """, tags=['identity_detail_retrieval'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_get_identity(id: str): """ Retrieve an identity """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/public/v2/investigations', description=""" Retrieve investigations using query parameters. Requires mc_investigation_read or admin_all_objects capabilities. """, tags=['finding_query_management', 'investigation_lifecycle_management'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_list_investigations( ids: Optional[str] = None, limit: Optional[float] = None, offset: Optional[float] = None, sort: Optional[str] = None, disposition: Optional[str] = None, status: Optional[str] = None, owner: Optional[str] = None, urgency: Optional[str] = None, sensitivity: Optional[str] = None, create_time_min: Optional[float] = None, create_time_max: Optional[float] = None, update_time_min: Optional[float] = None, update_time_max: Optional[float] = None, ): """ Retrieve investigations """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/public/v2/investigations', description=""" Create investigations using provided fields. Requires mc_investigation_write and edit_notable_events OR admin_all_objects capabilities. """, tags=['investigation_lifecycle_management'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_create_investigation(body: InvestigationCreatePayload): """ Create investigations """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/public/v2/investigations/{id}', description=""" Update the investigation by id. Requires mc_investigation_write and edit_notable_events OR admin_all_objects capabilities. """, tags=['investigation_lifecycle_management'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_update_investigation(id: str, body: InvestigationUpdatePayload = ...): """ Update certain fields of an investigation by id """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.get( '/public/v2/risks/risk_scores/{entity}', description=""" Get the risk scores for a risk entity in Splunk Enterprise Security. Requires mc_risk_score_write or admin_all_objects capabilities. """, tags=['risk_score_update'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_risk_entity_risk_scores_retrieve( entity: str, entity_type: Optional[str] = None, earliest: Optional[str] = None, latest: Optional[str] = None, limit: Optional[float] = None, offset: Optional[float] = None, sort: Optional[str] = None, ): """ Get risk scores for a risk entity """ raise RuntimeError("Should be patched by MCPProxy and never executed") @app.post( '/public/v2/risks/risk_scores/{entity}', description=""" Add a risk modifier to a risk entity in Splunk Enterprise Security. Requires mc_risk_score_write or admin_all_objects capabilities. """, tags=['risk_score_update'], security=[ HTTPBearer(name="None"), HTTPBasic(name="None"), ], ) def public_v2_risk_entity_risk_scores_update( entity: str, body: RiskScoreUpdatePayload = ... ): """ Add risk modifiers """ raise RuntimeError("Should be patched by MCPProxy and never executed") if __name__ == "__main__": parser = argparse.ArgumentParser(description="MCP Server") parser.add_argument( "transport", choices=["stdio", "sse", "streamable-http"], help="Transport mode (stdio, sse or streamable-http)", ) args = parser.parse_args() if "CONFIG_PATH" in os.environ: config_path = os.environ["CONFIG_PATH"] app.load_configuration(config_path) if "CONFIG" in os.environ: config = os.environ["CONFIG"] app.load_configuration_from_string(config) if "SECURITY" in os.environ: security_params = BaseSecurity.parse_security_parameters_from_env( os.environ, ) app.set_security_params(security_params) mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}")) app.get_mcp(**mcp_settings).run(transport=args.transport)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rigzindorje/splunk-enterprise-security-api-reference'

If you have feedback or need assistance with the MCP directory API, please join our Discord server