Skip to main content
Glama
dstreefkerk

ms-sentinel-mcp-server

by dstreefkerk
security_investigation.py14.8 kB
""" FILE: prompts/security_investigation.py DESCRIPTION: Provides security investigation prompts for the MCP server. """ from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp.prompts import base def register_prompts(mcp: FastMCP): """Register security investigation prompts.""" @mcp.prompt("sentinel_hunting_investigate_ip", "Investigate an IP address") def investigate_ip_address(ip_address: str) -> list[base.Message]: """ Investigate a potentially suspicious IP address in Microsoft Sentinel. Args: ip_address: The IP address to investigate """ return [ base.UserMessage( f"I need to investigate the IP address {ip_address} that appeared in our logs. What should I check?" ), base.AssistantMessage( """ I'll guide you through a structured investigation of this IP address. Here's what we should check: First, let's determine where this IP appears in our environment: 1. Check for connections to/from this IP across your network 2. Identify which systems have communicated with this IP 3. Determine the volume and frequency of these communications Then, we'll analyze the context and behavior: 4. Review the activities associated with this IP 5. Identify any unusual patterns or known malicious behaviors 6. Check if this IP appears in security incidents or alerts Let me help you create KQL queries for each step of this investigation. """ ), base.UserMessage( "That sounds like a good approach. Can you provide the KQL queries I should use?" ), base.AssistantMessage( f""" Here are the KQL queries you can use to investigate IP address {ip_address}: ### 1. Check for connections to/from this IP ```kusto // Look for this IP in network events let ip = "{ip_address}"; DeviceNetworkEvents | where TimeGenerated > ago(7d) | where RemoteIP == ip or LocalIP == ip | summarize ConnectionCount=count(), FirstSeen=min(TimeGenerated), LastSeen=max(TimeGenerated) by DeviceName, LocalIP, RemoteIP, RemotePort, InitiatingProcessFileName | sort by ConnectionCount desc ``` ### 2. Check for this IP in security alerts ```kusto // Find security alerts involving this IP let ip = "{ip_address}"; SecurityAlert | where TimeGenerated > ago(30d) | where ExtendedProperties has ip or Entities has ip | project TimeGenerated, AlertName, AlertSeverity, Entities, ExtendedProperties ``` ### 3. Check authentication attempts from this IP ```kusto // Check for authentication attempts let ip = "{ip_address}"; SigninLogs | where TimeGenerated > ago(30d) | where IPAddress == ip | summarize AuthAttempts=count(), SuccessfulAuths=countif(ResultType == 0), FailedAuths=countif(ResultType != 0) by UserPrincipalName, AppDisplayName | sort by AuthAttempts desc ``` ### 4. Check for this IP in firewall logs ```kusto // Check Common Security Log (firewall) for this IP let ip = "{ip_address}"; CommonSecurityLog | where TimeGenerated > ago(7d) | where SourceIP == ip or DestinationIP == ip | summarize ConnectionCount=count() by SourceIP, DestinationIP, DestinationPort, DeviceAction | sort by ConnectionCount desc ``` Let me know if you need any more specific queries or if you want to dive deeper into any of these areas. """ ), base.UserMessage( "This is very helpful. Should I also check if this IP is known to be malicious?" ), base.AssistantMessage( f""" Absolutely! Here's how you can check if the IP address {ip_address} is known to be malicious: ### Check against threat intelligence feeds ```kusto // Check this IP against threat intelligence let ip = "{ip_address}"; ThreatIntelligenceIndicator | where TimeGenerated > ago(30d) | where NetworkIP == ip | project TimeGenerated, Description, ThreatType, ConfidenceScore, Provider ``` ### Check against Microsoft Defender threat analytics ```kusto // Check Microsoft Defender for Endpoint evaluations let ip = "{ip_address}"; DeviceNetworkEvents | where TimeGenerated > ago(7d) | where RemoteIP == ip | join kind=inner ( DeviceThreatEvents | where TimeGenerated > ago(7d) ) on DeviceId, Timestamp | project TimeGenerated, DeviceName, RemoteIP, RemoteUrl, ThreatName, ThreatFamilyName, ThreatLevel ``` ### Check for IP geolocation and classification ```kusto // Get enriched information about the IP let ip = "{ip_address}"; // Assuming you've enabled IP enrichment features, otherwise use logic to enrich manually DeviceNetworkEvents | where TimeGenerated > ago(7d) | where RemoteIP == ip | extend GeoIP = geo_info_from_ip_address(RemoteIP) // If geo_info function is available | project TimeGenerated, DeviceName, RemoteIP, Country = tostring(GeoIP.country), City = tostring(GeoIP.city), Org = tostring(GeoIP.organization) | limit 10 ``` If you're using an IP reputation watchlist, you can also check against that: ```kusto // Check against IP watchlist (if you maintain one) let ip = "{ip_address}"; _GetWatchlist('IPReputation') | where SearchKey == ip | project IP = SearchKey, Reputation, ThreatType, Source, LastUpdated ``` These queries will help you determine if this IP is known to be malicious based on your threat intelligence data. """ ), ] @mcp.prompt("sentinel_incident_respond", "Incident response workflow") def incident_response_workflow(incident_id: int) -> list[base.Message]: """ Guide through a structured incident response process for a specific security incident. Args: incident_id: The incident ID number to investigate """ return [ base.UserMessage( f"I need to investigate and respond to security incident #{incident_id}. Can you guide me through the process?" ), base.AssistantMessage( f""" I'll guide you through a structured incident response process for incident #{incident_id}. Let's follow these steps: ## 1. Assess the Incident First, let's gather the basic details of this incident: ```kusto SecurityIncident | where IncidentNumber == {incident_id} | project IncidentNumber, Title, Description, Severity, Status, Classification, FirstActivityTime, LastActivityTime, CreatedTime = TimeGenerated, LastModifiedTime, Owner = OwnerName ``` ## 2. Review Associated Alerts Next, let's look at the alerts associated with this incident: ```kusto SecurityIncident | where IncidentNumber == {incident_id} | join ( SecurityAlert | where TimeGenerated > ago(30d) ) on $left.IncidentName == $right.SystemAlertId | project AlertName, AlertSeverity, AlertDescription = Description, TimeGenerated, Entities ``` Would you like me to help you run these initial queries? """ ), base.UserMessage( "Yes, that would be helpful. What should I do after I've gathered that information?" ), base.AssistantMessage( f""" After gathering the initial information about incident #{incident_id}, follow these next steps: ## 3. Investigate Impacted Entities Extract and investigate entities (users, hosts, IPs) from the incident: ```kusto SecurityIncident | where IncidentNumber == {incident_id} | join ( SecurityAlert | where TimeGenerated > ago(30d) ) on $left.IncidentName == $right.SystemAlertId | extend Entities = parse_json(Entities) | mv-expand Entity = Entities | extend EntityType = tostring(Entity.Type), EntityName = tostring(Entity.Name) | summarize AlertCount = count() by EntityType, EntityName | sort by AlertCount desc ``` For each impacted entity, perform targeted investigation: - For users: Check authentication logs, unusual activities - For hosts: Check process executions, network connections - For IPs: Check connections, reputation, geographic location ## 4. Establish Timeline Create a chronological timeline of events related to the incident: ```kusto let incident = SecurityIncident | where IncidentNumber == {incident_id} | project IncidentNumber, Title, IncidentName; let alerts = incident | join ( SecurityAlert | where TimeGenerated > ago(30d) ) on $left.IncidentName == $right.SystemAlertId | project TimeGenerated, ItemType="Alert", Title=AlertName, Description; let entities = alerts | extend Entities = parse_json(Entities) | mv-expand Entity = Entities | extend EntityType = tostring(Entity.Type), EntityName = tostring(Entity.Name) | distinct EntityType, EntityName; // You would add more event types here specific to the entities union alerts | project TimeGenerated, ItemType, Title, Description | sort by TimeGenerated asc ``` ## 5. Determine Scope & Impact Assess the scope and potential impact: - How many systems/users are affected? - What is the criticality of affected systems? - Is sensitive data potentially exposed? - Is the threat contained or active? ## 6. Containment & Remediation Develop and implement containment and remediation steps: - Block malicious IPs/domains at the firewall - Isolate compromised hosts - Reset compromised credentials - Remove malware/unauthorized access - Patch vulnerabilities ## 7. Documentation & Reporting Document your findings and actions: - Update the incident with your findings - Document remediation steps taken - Record lessons learned - Prepare reports for stakeholders ## 8. Close the Incident When appropriate, close the incident with proper classification: ```kusto // This shows the query to check the current status // Actual update would be done through the Sentinel UI or APIs SecurityIncident | where IncidentNumber == {incident_id} | project IncidentNumber, Status, Classification, ClassificationComment ``` Would you like me to provide more specific guidance for any of these steps? """ ), base.UserMessage( "This is comprehensive. Can you also show me how to check for similar incidents that might be related?" ), base.AssistantMessage( f""" Absolutely! Here's how you can check for similar or related incidents to incident #{incident_id}: ## Finding Related Incidents ### 1. Find incidents with similar entities ```kusto // First, extract entities from our incident let incidentEntities = SecurityIncident | where IncidentNumber == {incident_id} | join ( SecurityAlert | where TimeGenerated > ago(30d) ) on $left.IncidentName == $right.SystemAlertId | extend Entities = parse_json(Entities) | mv-expand Entity = Entities | extend EntityType = tostring(Entity.Type), EntityName = tostring(Entity.Name) | distinct EntityType, EntityName; // Then find other incidents with the same entities SecurityIncident | where IncidentNumber != {incident_id} | where TimeGenerated > ago(30d) | join ( SecurityAlert | join kind=inner ( incidentEntities ) on $left.Entities contains $right.EntityName ) on $left.IncidentName == $right.SystemAlertId | summarize AlertCount=count(), Entities=make_set(EntityName) by IncidentNumber, Title, Severity, Status | sort by IncidentNumber desc ``` ### 2. Find incidents with similar title or description ```kusto // Get our incident's title let incidentTitle = SecurityIncident | where IncidentNumber == {incident_id} | project Title; // Find incidents with similar titles SecurityIncident | where IncidentNumber != {incident_id} | where TimeGenerated > ago(30d) | where Title has_any (incidentTitle) or Title has_any (split(toscalar(incidentTitle), " ")) | project IncidentNumber, Title, Severity, Status, TimeGenerated | sort by TimeGenerated desc ``` ### 3. Find incidents across the same timeframe ```kusto // Get our incident's timeframe let incidentTimeframe = SecurityIncident | where IncidentNumber == {incident_id} | project StartTime = FirstActivityTime, EndTime = LastActivityTime; // Find incidents in a similar timeframe let timeRange = toscalar(incidentTimeframe | extend Duration = EndTime - StartTime | project Duration); SecurityIncident | where IncidentNumber != {incident_id} | where TimeGenerated > ago(30d) | where FirstActivityTime between (toscalar(incidentTimeframe | project StartTime) - timeRange .. toscalar(incidentTimeframe | project EndTime) + timeRange) | project IncidentNumber, Title, Severity, Status, FirstActivityTime, LastActivityTime | sort by FirstActivityTime asc ``` ### 4. Find incidents affecting the same systems ```kusto // Get systems affected in our incident let affectedSystems = SecurityIncident | where IncidentNumber == {incident_id} | join ( SecurityAlert | where TimeGenerated > ago(30d) ) on $left.IncidentName == $right.SystemAlertId | extend Entities = parse_json(Entities) | mv-expand Entity = Entities | where Entity.Type == "host" or Entity.Type == "hostName" | extend HostName = tostring(Entity.Name) | distinct HostName; // Find other incidents affecting these systems SecurityIncident | where IncidentNumber != {incident_id} | where TimeGenerated > ago(30d) | join ( SecurityAlert | where TimeGenerated > ago(30d) | extend Entities = parse_json(Entities) | mv-expand Entity = Entities | where Entity.Type == "host" or Entity.Type == "hostName" | extend HostName = tostring(Entity.Name) | where HostName in (affectedSystems) ) on $left.IncidentName == $right.SystemAlertId | summarize AlertCount=count() by IncidentNumber, Title, Severity, Status, TimeGenerated | sort by TimeGenerated desc ``` ### 5. Check for incident clusters using tactics or techniques ```kusto // Get techniques used in our incident let techniques = SecurityIncident | where IncidentNumber == {incident_id} | join ( SecurityAlert | where TimeGenerated > ago(30d) ) on $left.IncidentName == $right.SystemAlertId | extend Techniques = todynamic(AdditionalData).techniques | mv-expand Technique = Techniques | distinct tostring(Technique); // Find incidents with similar techniques SecurityIncident | where IncidentNumber != {incident_id} | where TimeGenerated > ago(30d) | join ( SecurityAlert | where TimeGenerated > ago(30d) | extend Techniques = todynamic(AdditionalData).techniques | mv-expand Technique = Techniques | where tostring(Technique) in (techniques) ) on $left.IncidentName == $right.SystemAlertId | summarize AlertCount=count(), Techniques=make_set(tostring(Technique)) by IncidentNumber, Title, Severity, Status, TimeGenerated | sort by TimeGenerated desc ``` These queries will help you identify incidents that might be related to incident #{incident_id} through common entities, systems, timing, or attack patterns. This can help you identify larger campaigns or persistent threats in your environment. """ ), ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dstreefkerk/ms-sentinel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server