Chronicle SecOps MCP Server
by emeryray2002
Verified
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python3
"""Example usage of the Chronicle Security Operations MCP server.
This example demonstrates how to use the secops-mcp server to perform
security operations tasks using Chronicle, including natural language search.
"""
import asyncio
import os
import argparse
from datetime import datetime, timedelta, timezone
import json
# Example configuration - defaults to environment variables if available
PROJECT_ID = os.environ.get("CHRONICLE_PROJECT_ID", "your-google-cloud-project-id")
CUSTOMER_ID = os.environ.get("CHRONICLE_CUSTOMER_ID", "your-chronicle-customer-id")
REGION = os.environ.get("CHRONICLE_REGION", "us")
# Chronicle security examples
async def security_examples(project_id: str, customer_id: str, region: str) -> None:
"""Run examples of Chronicle security API calls.
Args:
project_id: Google Cloud project ID
customer_id: Chronicle customer ID
region: Chronicle region
"""
print("\n=== Chronicle Security API Examples ===\n")
# Example 1: Search for network connection events using UDM query
print("Example 1: Search for network connection events using UDM query")
events_result = await search_security_events(
text='metadata.event_type = "NETWORK_CONNECTION"',
project_id=project_id,
customer_id=customer_id,
region=region,
hours_back=24,
max_events=5
)
# Handling the JSON response
total_events = events_result.get('total_events', 0)
event_list = events_result.get('events', [])
print(f"Found {total_events} events, showing details for {len(event_list)} events:")
# Process the events
for i, event_wrapper in enumerate(event_list, 1):
# Extract the actual event data from the wrapper
event = event_wrapper.get('event', {})
# Extract metadata
metadata = event.get('metadata', {})
event_time = metadata.get('eventTimestamp', 'Unknown')
event_type = metadata.get('eventType', 'Unknown')
print(f"\nEvent {i}:")
print(f" Time: {event_time}")
print(f" Type: {event_type}")
# Extract principal data (source)
principal = event.get('principal', {})
if 'ip' in principal:
print(f" Source IP: {', '.join(principal.get('ip', ['Unknown']))}")
if 'port' in principal:
print(f" Source Port: {principal.get('port', 'Unknown')}")
# Extract target data (destination)
target = event.get('target', {})
if 'ip' in target:
print(f" Target IP: {', '.join(target.get('ip', ['Unknown']))}")
if 'port' in target:
print(f" Target Port: {target.get('port', 'Unknown')}")
# Extract network info
network = event.get('network', {})
if 'ipProtocol' in network:
print(f" Protocol: {network.get('ipProtocol', 'Unknown')}")
if 'application_protocol' in network:
print(f" Application: {network.get('application_protocol', 'Unknown')}")
# Example 2: Get security alerts
print("\nExample 2: Get security alerts")
alerts = await get_security_alerts(
project_id=project_id,
customer_id=customer_id,
region=region,
hours_back=24,
max_alerts=5
)
print(alerts)
# Example 3: Look up an entity (example IP address)
print("\nExample 3: Look up an entity")
entity_info = await lookup_entity(
entity_value="8.8.8.8", # Example IP address (Google DNS)
project_id=project_id,
customer_id=customer_id,
region=region
)
print(entity_info)
# Example 4: List security rules
print("\nExample 4: List security rules")
rules = await list_security_rules(
project_id=project_id,
customer_id=customer_id,
region=region
)
print(rules)
# Example 5: Get IoC matches
print("\nExample 5: Get IoC matches")
iocs = await get_ioc_matches(
project_id=project_id,
customer_id=customer_id,
region=region,
hours_back=24,
max_matches=5
)
print(iocs)
# Example 6: Natural language security event search
print("\nExample 6: Natural language security event search")
nl_queries = [
"Show me network connections from the last hour",
"Find login attempts"
]
for query in nl_queries:
print(f"\nNatural language query: '{query}'")
nl_events = await search_security_events(
text=query, # Using the text parameter for natural language
project_id=project_id,
customer_id=customer_id,
region=region,
hours_back=6,
max_events=5
)
# Process results
total_nl_events = nl_events.get('total_events', 0)
nl_event_list = nl_events.get('events', [])
print(f"Found {total_nl_events} events, showing details for {len(nl_event_list)} events:")
# Display the first few events
for i, event_wrapper in enumerate(nl_event_list[:3], 1):
# Extract the actual event data from the wrapper
event = event_wrapper.get('event', {})
# Extract metadata
metadata = event.get('metadata', {})
event_time = metadata.get('eventTimestamp', 'Unknown')
event_type = metadata.get('eventType', 'Unknown')
print(f"\nEvent {i}:")
print(f" Time: {event_time}")
print(f" Type: {event_type}")
# Extract IP information
principal = event.get('principal', {})
target = event.get('target', {})
principal_ip = principal.get('ip', ['Unknown']) if 'ip' in principal else ['None']
target_ip = target.get('ip', ['Unknown']) if 'ip' in target else ['None']
print(f" Source IP: {', '.join(principal_ip)}")
print(f" Target IP: {', '.join(target_ip)}")
# Example 7: Interactive natural language query
print("\nExample 7: Interactive natural language query")
try:
custom_query = input("Enter a natural language query (e.g., 'Show me suspicious activity'): ")
if custom_query:
print(f"\nNatural language query: '{custom_query}'")
custom_events = await search_security_events(
text=custom_query,
project_id=project_id,
customer_id=customer_id,
region=region,
hours_back=24,
max_events=10
)
# Process results
total_custom_events = custom_events.get('total_events', 0)
custom_event_list = custom_events.get('events', [])
print(f"Found {total_custom_events} events, showing {len(custom_event_list)} events")
# Display the events
for i, event_wrapper in enumerate(custom_event_list, 1):
event = event_wrapper.get('event', {})
metadata = event.get('metadata', {})
event_time = metadata.get('eventTimestamp', 'Unknown')
event_type = metadata.get('eventType', 'Unknown')
print(f"\nEvent {i}:")
print(f" Time: {event_time}")
print(f" Type: {event_type}")
except KeyboardInterrupt:
print("\nInteractive query cancelled.")
async def main() -> None:
"""Parse command line arguments and run security examples."""
parser = argparse.ArgumentParser(description='Security Operations MCP examples')
parser.add_argument('--project-id', help='Google Cloud project ID',
default=os.environ.get("CHRONICLE_PROJECT_ID", ""))
parser.add_argument('--customer-id', help='Chronicle customer ID',
default=os.environ.get("CHRONICLE_CUSTOMER_ID", ""))
parser.add_argument('--region', help='Chronicle region',
default=os.environ.get("CHRONICLE_REGION", "us"))
parser.add_argument('--verbose', action='store_true', help='Show more detailed output')
args = parser.parse_args()
# Use defaults if not provided
if not args.project_id:
args.project_id = "725716774503" # Default value
print(f"Using default project ID: {args.project_id}")
if not args.customer_id:
args.customer_id = "c3c6260c1c9340dcbbb802603bbf9636" # Default value
print(f"Using default customer ID: {args.customer_id}")
print(f"Using Chronicle settings: project_id={args.project_id}, customer_id={args.customer_id}, region={args.region}")
await security_examples(args.project_id, args.customer_id, args.region)
# Import the functions after defining the example functions to avoid circular imports
from secops_mcp import (
search_security_events,
get_security_alerts,
lookup_entity,
list_security_rules,
get_ioc_matches
)
if __name__ == "__main__":
asyncio.run(main())