Chronicle SecOps MCP Server
by emeryray2002
Verified
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Dict, List, Optional
import os
import logging
from datetime import datetime, timedelta, timezone
from mcp.server.fastmcp import FastMCP
from secops import SecOpsClient
import time
# Initialize FastMCP server
mcp = FastMCP("secops-mcp")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("secops-mcp")
# Constants
USER_AGENT = "secops-app/1.0"
# Default Chronicle configuration from environment variables
DEFAULT_PROJECT_ID = os.environ.get("CHRONICLE_PROJECT_ID", "725716774503")
DEFAULT_CUSTOMER_ID = os.environ.get("CHRONICLE_CUSTOMER_ID", "c3c6260c1c9340dcbbb802603bbf9636")
DEFAULT_REGION = os.environ.get("CHRONICLE_REGION", "us")
# Initialize SecOpsClient
def get_chronicle_client(
project_id: str = None,
customer_id: str = None,
region: str = None
) -> Any:
"""Initialize and return a Chronicle client.
Args:
project_id: Google Cloud project ID (defaults to CHRONICLE_PROJECT_ID env var)
customer_id: Chronicle customer ID (defaults to CHRONICLE_CUSTOMER_ID env var)
region: Chronicle region (defaults to CHRONICLE_REGION env var or "us")
Returns:
Initialized Chronicle client
"""
# Use provided values or defaults from environment variables
project_id = project_id or DEFAULT_PROJECT_ID
customer_id = customer_id or DEFAULT_CUSTOMER_ID
region = region or DEFAULT_REGION
if not project_id or not customer_id:
raise ValueError(
"Chronicle project_id and customer_id must be provided either "
"as parameters or through environment variables "
"(CHRONICLE_PROJECT_ID, CHRONICLE_CUSTOMER_ID)"
)
client = SecOpsClient()
chronicle = client.chronicle(
customer_id=customer_id,
project_id=project_id,
region=region
)
return chronicle
# Chronicle Security Tools
@mcp.tool()
async def search_security_events(
text: str,
project_id: str = None,
customer_id: str = None,
hours_back: int = 24,
max_events: int = 100,
region: str = None
) -> Dict[str, Any]:
"""Search for security events in Chronicle using natural language.
This function allows you to search for events using everyday language instead of requiring
UDM query syntax. The natural language query will be automatically translated into a
Chronicle UDM query for execution.
Examples of natural language queries:
- "Show me network connections from yesterday for the domain google.com"
- "Display connections to IP address 192.168.1.100"
Args:
text: Natural language description of the events you want to find
project_id: Google Cloud project ID (defaults to config)
customer_id: Chronicle customer ID (defaults to config)
hours_back: How many hours to look back (default: 24)
max_events: Maximum number of events to return (default: 100)
region: Chronicle region (defaults to config)
Returns:
Complete JSON object containing the search results, including events and metadata
"""
try:
logger.info(f"Searching security events with natural language query: {text}")
chronicle = get_chronicle_client(project_id, customer_id, region)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=hours_back)
logger.info(f"Search time range: {start_time} to {end_time}")
# Use the new natural language search method
events = chronicle.nl_search(
text=text,
start_time=start_time,
end_time=end_time,
max_events=max_events
)
# For compatibility with old format, check if we need to transform response
if isinstance(events, dict) and "events" in events:
total_events = events.get('total_events', 0)
event_list = events.get('events', [])
else:
# This might be the case with the standard library format
event_list = events if isinstance(events, list) else []
total_events = len(event_list)
events = {
"events": event_list,
"total_events": total_events
}
logger.info(f"Search results: {total_events} total events, {len(event_list)} returned")
# Return the entire events object
return events
except Exception as e:
logger.error(f"Error searching security events: {str(e)}", exc_info=True)
# Return an error object that can be processed by the model
return {
"error": str(e),
"events": [],
"total_events": 0
}
@mcp.tool()
async def get_security_alerts(
project_id: str = None,
customer_id: str = None,
hours_back: int = 24,
max_alerts: int = 10,
status_filter: str = "feedback_summary.status != \"CLOSED\"",
region: str = None
) -> str:
"""Get security alerts from Chronicle.
Args:
project_id: Google Cloud project ID (defaults to config)
customer_id: Chronicle customer ID (defaults to config)
hours_back: How many hours to look back (default: 24)
max_alerts: Maximum number of alerts to return (default: 10)
status_filter: Query string to filter alerts by status (default: exclude closed)
region: Chronicle region (defaults to config)
Returns:
Formatted string with security alerts
"""
try:
chronicle = get_chronicle_client(project_id, customer_id, region)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=hours_back)
alert_response = chronicle.get_alerts(
start_time=start_time,
end_time=end_time,
snapshot_query=status_filter,
max_alerts=max_alerts
)
# The response format depends on the secops library version
# Try to handle both formats
if isinstance(alert_response, dict):
alert_list = alert_response.get('alerts', {}).get('alerts', [])
else:
# Might be a direct list of alerts in the standard library
alert_list = alert_response if isinstance(alert_response, list) else []
if not alert_list:
return "No security alerts found for the specified time range."
result = f"Found {len(alert_list)} security alerts:\n\n"
for i, alert in enumerate(alert_list, 1):
# Try to access fields with different possible structures
rule_name = None
if 'detection' in alert and isinstance(alert['detection'], list) and len(alert['detection']) > 0:
rule_name = alert['detection'][0].get('ruleName', 'Unknown Rule')
else:
rule_name = alert.get('ruleName', 'Unknown Rule')
created_time = alert.get('createdTime', 'Unknown')
# Try different possible status field paths
status = 'Unknown'
if 'feedbackSummary' in alert and isinstance(alert['feedbackSummary'], dict):
status = alert['feedbackSummary'].get('status', 'Unknown')
elif 'status' in alert:
status = alert.get('status', 'Unknown')
# Try different possible severity field paths
severity = 'Unknown'
if 'feedbackSummary' in alert and isinstance(alert['feedbackSummary'], dict):
severity = alert['feedbackSummary'].get('severityDisplay', 'Unknown')
elif 'severity' in alert:
severity = alert.get('severity', 'Unknown')
result += f"Alert {i}:\n"
result += f"Rule: {rule_name}\n"
result += f"Created: {created_time}\n"
result += f"Status: {status}\n"
result += f"Severity: {severity}\n"
# Add case information if available
case_name = alert.get('caseName')
if case_name:
result += f"Associated Case: {case_name}\n"
result += "\n"
return result
except Exception as e:
return f"Error retrieving security alerts: {str(e)}"
@mcp.tool()
async def lookup_entity(
entity_value: str,
project_id: str = None,
customer_id: str = None,
hours_back: int = 24,
region: str = None
) -> str:
"""Look up an entity (IP, domain, hash, etc.) in Chronicle.
Args:
entity_value: Value to look up (IP, domain, hash, etc.)
project_id: Google Cloud project ID (defaults to config)
customer_id: Chronicle customer ID (defaults to config)
hours_back: How many hours to look back (default: 24)
region: Chronicle region (defaults to config)
Returns:
Entity summary information
"""
try:
chronicle = get_chronicle_client(project_id, customer_id, region)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=hours_back)
entity_summary = chronicle.summarize_entity(
start_time=start_time,
end_time=end_time,
value=entity_value,
return_alerts=True
)
# Handle different possible response formats
entities = []
if hasattr(entity_summary, 'entities'):
entities = entity_summary.entities
elif isinstance(entity_summary, dict) and 'entities' in entity_summary:
entities = entity_summary.get('entities', [])
elif isinstance(entity_summary, list):
entities = entity_summary
if not entities:
return f"No information found for entity: {entity_value}"
result = f"Entity Summary for {entity_value}:\n\n"
for entity in entities:
# Try to access fields with different possible structures
entity_type = "Unknown"
first_seen = "Unknown"
last_seen = "Unknown"
count = 0
asset = None
# Try to get entity_type
if hasattr(entity, 'metadata') and hasattr(entity.metadata, 'entity_type'):
entity_type = entity.metadata.entity_type
elif isinstance(entity, dict) and 'metadata' in entity:
if isinstance(entity['metadata'], dict):
entity_type = entity['metadata'].get('entityType', 'Unknown')
# Try to get metrics
if hasattr(entity, 'metric'):
first_seen = entity.metric.first_seen
last_seen = entity.metric.last_seen
count = entity.metric.count
elif isinstance(entity, dict) and 'metric' in entity:
if isinstance(entity['metric'], dict):
first_seen = entity['metric'].get('firstSeen', 'Unknown')
last_seen = entity['metric'].get('lastSeen', 'Unknown')
count = entity['metric'].get('count', 0)
# Try to get asset
if hasattr(entity, 'metadata') and hasattr(entity.metadata, 'asset'):
asset = entity.metadata.asset
elif isinstance(entity, dict) and 'metadata' in entity:
if isinstance(entity['metadata'], dict):
asset = entity['metadata'].get('asset')
result += f"Entity Type: {entity_type}\n"
result += f"First Seen: {first_seen}\n"
result += f"Last Seen: {last_seen}\n"
result += f"Event Count: {count}\n"
# Add asset information if available
if asset:
result += f"Asset: {asset}\n"
result += "\n"
# Add alert information if available
alert_counts = []
if hasattr(entity_summary, 'alert_counts'):
alert_counts = entity_summary.alert_counts
elif isinstance(entity_summary, dict) and 'alertCounts' in entity_summary:
alert_counts = entity_summary.get('alertCounts', [])
if alert_counts:
result += "Associated Alerts:\n"
for alert in alert_counts:
rule = "Unknown"
count = 0
if hasattr(alert, 'rule'):
rule = alert.rule
count = alert.count
elif isinstance(alert, dict):
rule = alert.get('rule', 'Unknown')
count = alert.get('count', 0)
result += f"- Rule: {rule}, Count: {count}\n"
return result
except Exception as e:
return f"Error looking up entity: {str(e)}"
@mcp.tool()
async def list_security_rules(
project_id: str = None,
customer_id: str = None,
region: str = None
) -> Dict[str, Any]:
"""List security detection rules from Chronicle.
Args:
project_id: Google Cloud project ID (defaults to config)
customer_id: Chronicle customer ID (defaults to config)
region: Chronicle region (defaults to config)
Returns:
Raw response from the Chronicle API containing security detection rules
"""
try:
chronicle = get_chronicle_client(project_id, customer_id, region)
rules_response = chronicle.list_rules()
return rules_response
except Exception as e:
logger.error(f"Error listing security rules: {str(e)}", exc_info=True)
return {
"error": str(e),
"rules": []
}
@mcp.tool()
async def get_ioc_matches(
project_id: str = None,
customer_id: str = None,
hours_back: int = 24,
max_matches: int = 20,
region: str = None
) -> str:
"""Get Indicators of Compromise (IoCs) matches from Chronicle.
Args:
project_id: Google Cloud project ID (defaults to config)
customer_id: Chronicle customer ID (defaults to config)
hours_back: How many hours to look back (default: 24)
max_matches: Maximum number of matches to return (default: 20)
region: Chronicle region (defaults to config)
Returns:
Formatted string with IoC matches
"""
try:
chronicle = get_chronicle_client(project_id, customer_id, region)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=hours_back)
iocs = chronicle.list_iocs(
start_time=start_time,
end_time=end_time,
max_matches=max_matches
)
# Handle different possible response formats
matches = []
if isinstance(iocs, dict) and 'matches' in iocs:
matches = iocs.get('matches', [])
elif isinstance(iocs, list):
matches = iocs
if not matches:
return "No IoC matches found for the specified time range."
result = f"Found {len(matches)} IoC matches:\n\n"
for i, match in enumerate(matches, 1):
# Get the indicator information
indicator_type = "Unknown"
indicator_value = "Unknown"
sources = []
# Try to extract artifactIndicator differently based on response format
if isinstance(match, dict):
if 'artifactIndicator' in match and isinstance(match['artifactIndicator'], dict):
# Get the first key-value pair from artifactIndicator
indicator_dict = match.get('artifactIndicator', {})
if indicator_dict:
indicator_type = next(iter(indicator_dict.keys()), 'Unknown')
indicator_value = next(iter(indicator_dict.values()), 'Unknown')
sources = match.get('sources', [])
sources_str = ', '.join(sources) if sources else 'Unknown'
result += f"IoC {i}:\n"
result += f"Type: {indicator_type}\n"
result += f"Value: {indicator_value}\n"
result += f"Sources: {sources_str}\n\n"
return result
except Exception as e:
return f"Error retrieving IoC matches: {str(e)}"
def main() -> None:
"""Run the MCP server for SecOps tools.
This function initializes and starts the MCP server with all the defined tools.
"""
# Initialize and run the server
mcp.run(transport='stdio')
if __name__ == "__main__":
main()