main.py•8.33 kB
# generated by fastapi-codegen:
# filename: openapi.yaml
# timestamp: 2025-07-16T14:21:16+00:00
import argparse
import json
import os
from typing import *
from typing import Optional, Union
from autogen.mcp.mcp_proxy import MCPProxy
from autogen.mcp.mcp_proxy.security import BaseSecurity, HTTPBasic, HTTPBearer
from models import (
AssetsGetResponse,
ErrorResponse,
FindingsGetResponse,
IdentityGetResponse,
InvestigationCreatePayload,
InvestigationCreateResponse,
InvestigationUpdatePayload,
InvestigationUpdateResponse,
PublicV2FindingsGetResponse,
PublicV2InvestigationsGetResponse,
RateLimitExceededResponse,
RiskScoreRetrieveResponse,
RiskScoreUpdatePayload,
RiskScoreUpdateResponse,
)
app = MCPProxy(
description='The Splunk Enterprise Security API allows you to use and modify findings, investigations, risk scores, assets, and identities in Splunk Enterprise Security.\n',
title='Splunk Enterprise Security API Reference',
version='8.1.0',
servers=[
{
'description': 'The production API server.',
'url': 'https://{stack}:{port}/servicesNS/nobody/missioncontrol',
'variables': {
'port': {
'default': '8089',
'description': 'This value is assigned by the service provider. For example `8089`.',
},
'stack': {
'default': 'blueridge.splunkcloud.com',
'description': 'This value is assigned by the service provider. For example, `blueridge.splunkcloud.com`.',
},
},
}
],
)
@app.get(
'/public/v2/assets/{id}',
description=""" Retrieve assets using the ID of the KV collection assets_by_str. Requires mc_assets_read or admin_all_objects capabilities. """,
tags=['asset_information_retrieval'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_assets(id: str):
"""
Retrieve assets
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/public/v2/findings',
description=""" The API for retrieving findings by the querying fields. Requires mc_investigation_read or admin_all_objects capabilities. """,
tags=['finding_query_management'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_get_findings(
finding_ids: Optional[str] = None,
urgency: Optional[str] = None,
status: Optional[str] = None,
owner: Optional[str] = None,
disposition: Optional[str] = None,
limit: Optional[float] = None,
offset: Optional[float] = None,
sort: Optional[str] = None,
fields: Optional[str] = None,
earliest: Optional[str] = None,
latest: Optional[str] = None,
rule_title: Optional[str] = None,
):
"""
Retrieve findings by the querying fields
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/public/v2/findings/{id}',
description=""" Retrieve a finding using its ID. Requires mc_investigation_read or admin_all_objects capabilities. """,
tags=['finding_query_management'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_get_finding_by_id(
id: str,
earliest: Optional[str] = None,
latest: Optional[str] = None,
fields: Optional[str] = None,
):
"""
Retrieve findings
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/public/v2/identity/{id}',
description=""" Retrieve an identity using the ID of the identity. Requires mc_identity_read or admin_all_objects capabilities. """,
tags=['identity_detail_retrieval'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_get_identity(id: str):
"""
Retrieve an identity
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/public/v2/investigations',
description=""" Retrieve investigations using query parameters. Requires mc_investigation_read or admin_all_objects capabilities. """,
tags=['finding_query_management', 'investigation_lifecycle_management'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_list_investigations(
ids: Optional[str] = None,
limit: Optional[float] = None,
offset: Optional[float] = None,
sort: Optional[str] = None,
disposition: Optional[str] = None,
status: Optional[str] = None,
owner: Optional[str] = None,
urgency: Optional[str] = None,
sensitivity: Optional[str] = None,
create_time_min: Optional[float] = None,
create_time_max: Optional[float] = None,
update_time_min: Optional[float] = None,
update_time_max: Optional[float] = None,
):
"""
Retrieve investigations
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/public/v2/investigations',
description=""" Create investigations using provided fields. Requires mc_investigation_write and edit_notable_events OR admin_all_objects capabilities. """,
tags=['investigation_lifecycle_management'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_create_investigation(body: InvestigationCreatePayload):
"""
Create investigations
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/public/v2/investigations/{id}',
description=""" Update the investigation by id. Requires mc_investigation_write and edit_notable_events OR admin_all_objects capabilities. """,
tags=['investigation_lifecycle_management'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_update_investigation(id: str, body: InvestigationUpdatePayload = ...):
"""
Update certain fields of an investigation by id
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.get(
'/public/v2/risks/risk_scores/{entity}',
description=""" Get the risk scores for a risk entity in Splunk Enterprise Security. Requires mc_risk_score_write or admin_all_objects capabilities. """,
tags=['risk_score_update'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_risk_entity_risk_scores_retrieve(
entity: str,
entity_type: Optional[str] = None,
earliest: Optional[str] = None,
latest: Optional[str] = None,
limit: Optional[float] = None,
offset: Optional[float] = None,
sort: Optional[str] = None,
):
"""
Get risk scores for a risk entity
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
@app.post(
'/public/v2/risks/risk_scores/{entity}',
description=""" Add a risk modifier to a risk entity in Splunk Enterprise Security. Requires mc_risk_score_write or admin_all_objects capabilities. """,
tags=['risk_score_update'],
security=[
HTTPBearer(name="None"),
HTTPBasic(name="None"),
],
)
def public_v2_risk_entity_risk_scores_update(
entity: str, body: RiskScoreUpdatePayload = ...
):
"""
Add risk modifiers
"""
raise RuntimeError("Should be patched by MCPProxy and never executed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument(
"transport",
choices=["stdio", "sse", "streamable-http"],
help="Transport mode (stdio, sse or streamable-http)",
)
args = parser.parse_args()
if "CONFIG_PATH" in os.environ:
config_path = os.environ["CONFIG_PATH"]
app.load_configuration(config_path)
if "CONFIG" in os.environ:
config = os.environ["CONFIG"]
app.load_configuration_from_string(config)
if "SECURITY" in os.environ:
security_params = BaseSecurity.parse_security_parameters_from_env(
os.environ,
)
app.set_security_params(security_params)
mcp_settings = json.loads(os.environ.get("MCP_SETTINGS", "{}"))
app.get_mcp(**mcp_settings).run(transport=args.transport)