Skip to main content
Glama
cloud_audit_logs.py12.7 kB
import json from google.cloud import logging_v2 from services import client_instances def register(mcp_instance): """Register all Cloud Audit Logs resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://audit-logs/{project_id}") def list_audit_logs_resource(project_id: str = None) -> str: """List recent audit logs from a specific project""" try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() # Filter for audit logs only filter_str = 'logName:"cloudaudit.googleapis.com"' # List log entries entries = client.list_log_entries( request={ "resource_names": [f"projects/{project_id}"], "filter": filter_str, "page_size": 10, # Limiting to 10 for responsiveness } ) result = [] for entry in entries: log_data = { "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity.name if entry.severity else None, "log_name": entry.log_name, "resource": { "type": entry.resource.type, "labels": dict(entry.resource.labels) if entry.resource.labels else {}, } if entry.resource else {}, } # Handle payload based on type if hasattr(entry, "json_payload") and entry.json_payload: log_data["payload"] = dict(entry.json_payload) elif hasattr(entry, "proto_payload") and entry.proto_payload: log_data["payload"] = "Proto payload (details omitted)" elif hasattr(entry, "text_payload") and entry.text_payload: log_data["payload"] = entry.text_payload result.append(log_data) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://audit-logs/{project_id}/sinks") def list_log_sinks_resource(project_id: str = None) -> str: """List log sinks configured for a project""" try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" sinks = client.list_sinks(parent=parent) result = [] for sink in sinks: result.append( { "name": sink.name, "destination": sink.destination, "filter": sink.filter, "description": sink.description, "disabled": sink.disabled, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def list_audit_logs( project_id: str = None, filter_str: str = 'logName:"cloudaudit.googleapis.com"', page_size: int = 20, ) -> str: """ List Google Cloud Audit logs from a project Args: project_id: GCP project ID (defaults to configured project) filter_str: Log filter expression (defaults to audit logs) page_size: Maximum number of entries to return """ try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() print(f"Retrieving audit logs from project {project_id}...") # List log entries entries = client.list_log_entries( request={ "resource_names": [f"projects/{project_id}"], "filter": filter_str, "page_size": page_size, } ) result = [] for entry in entries: log_data = { "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity.name if entry.severity else None, "log_name": entry.log_name, "resource": { "type": entry.resource.type, "labels": dict(entry.resource.labels) if entry.resource.labels else {}, } if entry.resource else {}, } # Handle payload based on type if hasattr(entry, "json_payload") and entry.json_payload: log_data["payload"] = dict(entry.json_payload) elif hasattr(entry, "proto_payload") and entry.proto_payload: log_data["payload"] = "Proto payload (details omitted)" elif hasattr(entry, "text_payload") and entry.text_payload: log_data["payload"] = entry.text_payload result.append(log_data) return json.dumps( { "status": "success", "entries": result, "count": len(result), }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def filter_admin_activity_logs( project_id: str = None, service_name: str = "", resource_type: str = "", time_range: str = "1h", page_size: int = 20, ) -> str: """ Filter Admin Activity audit logs for specific services or resource types Args: project_id: GCP project ID (defaults to configured project) service_name: Optional service name to filter by (e.g., compute.googleapis.com) resource_type: Optional resource type to filter by (e.g., gce_instance) time_range: Time range for logs (e.g., 1h, 24h, 7d) page_size: Maximum number of entries to return """ try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() # Build filter string filter_parts = ['logName:"cloudaudit.googleapis.com%2Factivity"'] if service_name: filter_parts.append(f'protoPayload.serviceName="{service_name}"') if resource_type: filter_parts.append(f'resource.type="{resource_type}"') if time_range: filter_parts.append(f'timestamp >= "-{time_range}"') filter_str = " AND ".join(filter_parts) print(f"Filtering Admin Activity logs with: {filter_str}") # List log entries entries = client.list_log_entries( request={ "resource_names": [f"projects/{project_id}"], "filter": filter_str, "page_size": page_size, } ) result = [] for entry in entries: # Extract relevant fields for admin activity logs log_data = { "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "method_name": None, "resource_name": None, "service_name": None, "user": None, } # Extract data from proto payload if hasattr(entry, "proto_payload") and entry.proto_payload: payload = entry.proto_payload if hasattr(payload, "method_name"): log_data["method_name"] = payload.method_name if hasattr(payload, "resource_name"): log_data["resource_name"] = payload.resource_name if hasattr(payload, "service_name"): log_data["service_name"] = payload.service_name # Extract authentication info if hasattr(payload, "authentication_info"): auth_info = payload.authentication_info if hasattr(auth_info, "principal_email"): log_data["user"] = auth_info.principal_email result.append(log_data) return json.dumps( { "status": "success", "entries": result, "count": len(result), "filter": filter_str, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_log_sink( sink_name: str, destination: str, project_id: str = None, filter_str: str = 'logName:"cloudaudit.googleapis.com"', description: str = "", ) -> str: """ Create a log sink to export audit logs to a destination Args: sink_name: Name for the new log sink destination: Destination for logs (e.g., storage.googleapis.com/my-bucket) project_id: GCP project ID (defaults to configured project) filter_str: Log filter expression (defaults to audit logs) description: Optional description for the sink """ try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" # Create sink configuration sink = logging_v2.LogSink( name=sink_name, destination=destination, filter=filter_str, description=description, ) print(f"Creating log sink '{sink_name}' to export to {destination}...") # Create the sink response = client.create_sink( request={ "parent": parent, "sink": sink, } ) # Important: Recommend setting up IAM permissions writer_identity = response.writer_identity return json.dumps( { "status": "success", "name": response.name, "destination": response.destination, "filter": response.filter, "writer_identity": writer_identity, "next_steps": f"Important: Grant {writer_identity} the appropriate permissions on the destination", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def audit_log_investigation() -> str: """Prompt for investigating security incidents through audit logs""" return """ I need to investigate a potential security incident in my Google Cloud project. Please help me: 1. Determine what types of audit logs I should check (Admin Activity, Data Access, System Event) 2. Create an effective filter query to find relevant logs 3. Identify key fields to examine for signs of unusual activity 4. Suggest common indicators of potential security issues in audit logs """ @mcp_instance.prompt() def log_export_setup() -> str: """Prompt for setting up log exports for compliance""" return """ I need to set up log exports for compliance requirements. Please help me: 1. Understand the different destinations available for log exports 2. Design an effective filter to capture all required audit events 3. Implement a log sink with appropriate permissions 4. Verify my setup is correctly capturing and exporting logs """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/enesbol/gcp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server