Illumio MCP Server
by alexgoller
- src
- illumio_mcp
import asyncio
import os
import json
import logging
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
from pydantic import AnyUrl, BaseModel
import mcp.server.stdio
import dotenv
import sys
from datetime import datetime, timedelta
from illumio import *
import pandas as pd
from json import JSONEncoder
from pathlib import Path
def setup_logging():
"""Configure logging based on environment"""
logger = logging.getLogger('illumio_mcp')
logger.setLevel(logging.DEBUG)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Determine log path based on environment
if os.environ.get('DOCKER_CONTAINER'):
log_path = Path('/var/log/illumio-mcp/illumio-mcp.log')
else:
# Use home directory for local logging
log_path = './illumio-mcp.log'
file_handler = logging.FileHandler(str(log_path))
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Prevent logs from propagating to root logger
logger.propagate = False
return logger
# Initialize logging
logger = setup_logging()
logger.debug("Loading environment variables")
dotenv.load_dotenv()
PCE_HOST = os.getenv("PCE_HOST")
PCE_PORT = os.getenv("PCE_PORT")
PCE_ORG_ID = os.getenv("PCE_ORG_ID")
API_KEY = os.getenv("API_KEY")
API_SECRET = os.getenv("API_SECRET")
MCP_BUG_MAX_RESULTS = 500
# Store notes as a simple key-value dict to demonstrate state management
notes: dict[str, str] = {}
server = Server("illumio-mcp")
logging.debug("Server initialized")
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
"""
List available prompts.
Each prompt can have optional arguments to customize its behavior.
"""
return [
types.Prompt(
name="summarize-notes",
description="Creates a summary of all notes",
arguments=[
types.PromptArgument(
name="style",
description="Style of the summary (brief/detailed)",
required=False,
)
],
),
types.Prompt(
name="ringfence-application",
description="Ringfence an application by deploying rulesets to limit the inbound and outbound traffic",
arguments=[
types.PromptArgument(
name="application_name",
description="Name of the application to ringfence",
required=True,
),
types.PromptArgument(
name="application_environment",
description="Environment of the application to ringfence",
required=True,
)
],
),
types.Prompt(
name="analyze-application-traffic",
description="Analyze the traffic flows for an application and environment",
arguments=[
types.PromptArgument(
name="application_name",
description="Name of the application to analyze",
required=True,
),
types.PromptArgument(
name="application_environment",
description="Environment of the application to analyze",
required=True,
)
]
)
]
@server.get_prompt()
async def handle_get_prompt(
name: str, arguments: dict[str, str] | None
) -> types.GetPromptResult:
"""
Generate a prompt by combining arguments with server state.
The prompt includes all current notes and can be customized via arguments.
"""
if name == "ringfence-application":
return types.GetPromptResult(
description="Ringfence an application by deploying rulesets to limit the inbound and outbound traffic",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"""
Ringfence the application {arguments['application_name']} in the environment {arguments['application_environment']}.
Always reference labels as hrefs like /orgs/1/labels/57 or similar.
Consumers means the source of the traffic, providers means the destination of the traffic.
First, retrieve all the traffic flows inside the application and environment. Analyze the connections. Then retrieve all the traffic flows inbound to the application and environment.
Inside the app, please be sure to have rules for each role or app tier to connect to the other tiers.
Always use traffic flows to find out what other applications and environemnts need to connect into {arguments['application_name']},
and then deploy rulesets to limit the inbound traffic to those applications and environments.
For traffic that is required to connect outbound from {arguments['application_name']}, deploy rulesets to limit the
outbound traffic to those applications and environments. If a consumer is coming from the same app and env, please use
all workloads for the rules inside the scope (intra-scope). If it comes from the outside, please use app, env and if possible role
If a remote app is connected as destination, a new ruleset needs to be created that has the name of the remote app and env,
all incoming connections need to be added as extra-scope rules in that ruleset.
The logic in illumio is the following.
If a scope exists. Rules define connections within the scope if unscoped consumers is not set to true. Unscoped consumers define inbound traffic from things outside the scope. The unscoped consumer is a set of labels being the source of inbound traffic. Provider is the destination. For the provider a value of AMS (short for all workloads) means that a connection is allowed for all workloads inside the scope. So for example if the source is role=monitoring, app=nagios, env=prod, then the rule for the app=ordering, env=prod application would be:
consumer: role=monitoring,app=nagios,env=prod
provider: role=All workloads
service: 5666/tcp
If a rule is setting unscoped consumers to "false", this means that the rule is intra scope. Repeating any label that is in the scope does not make sense for this. Instead use role or whatever specific label to characterize the thing in the scope.
e.g. for the loadbalancer to connect to the web-tier in ordering, prod the rule is:
scope: app=ordering, env=prod
consumers: role=loadbalancer
providers: role=web
service: 8080/tcp
unscoped consumers: false
This is a intra-scope rule allowing the role=loadbalancer,app=ordering,env=prod workloads to connect to the role=web,app=ordering,env=prod workloads on port 8080/tcp.
"""
)
)
]
)
elif name == "analyze-application-traffic":
return types.GetPromptResult(
description="Analyze the traffic flows for an application and environment",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"""
Please provide the traffic flows for {arguments['application_name']} in the environment {arguments['application_environment']}.
Order by inbound and outbound traffic and app/env/role tupels.
Find other label types that are of interest and show them. Display your results in a react component. Show protocol, port and try to
understand the traffic flows (e.g. 5666/tcp likely could be nagios).
Categorize traffic into infrastructure and application traffic.
Find out if the application is internet facing or not.
Show illumio role labels, as well as application and environment labels in the output.
"""
)
)
]
)
else:
raise ValueError(f"Unknown prompt: {name}")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
types.Tool(
name="add-note",
description="Add a new note",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"content": {"type": "string"},
},
"required": ["name", "content"],
},
),
types.Tool(
name="get-workloads",
description="Get workloads from the PCE",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
},
"required": ["name"],
},
),
types.Tool(
name="update-workload",
description="Update a workload in the PCE",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"ip_addresses": {"type": "array", "items": {"type": "string"}},
"labels": {"type": "array", "items":
{"key": {"type": "string"}, "value": {"type": "string"}}
},
},
"required": ["name", "ip_addresses"],
}
),
types.Tool(
name="get-labels",
description="Get all labels from PCE",
mimeType="application/json",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
},
"required": [],
}
),
types.Tool(
name="create-workload",
description="Create a Illumio Core unmanaged workload in the PCE",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"ip_addresses": {"type": "array", "items": {"type": "string"}},
"labels": {"type": "array", "items":
{"key": {"type": "string"}, "value": {"type": "string"}}
},
},
"required": ["name", "ip_addresses"],
}
),
types.Tool(
name="create-label",
description="Create a label of a specific type and the value in the PCE",
inputSchema={
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"type": "string"},
},
"required": ["key", "value"]
}
),
types.Tool(
name="delete-label",
description="Delete a label in the PCE",
inputSchema={
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"type": "string"},
},
"required": ["key", "value"]
}
),
types.Tool(
name="delete-workload",
description="Delete a workload from the PCE",
inputSchema={
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]
}
),
types.Tool(
name="get-traffic-flows",
description="Get traffic flows from the PCE with comprehensive filtering options",
inputSchema={
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Starting datetime (YYYY-MM-DD or timestamp)"},
"end_date": {"type": "string", "description": "Ending datetime (YYYY-MM-DD or timestamp)"},
"include_sources": {
"type": "array",
"items": {"type": "string"},
"description": "Sources to include (label/IP list/workload HREFs, FQDNs, IPs)"
},
"exclude_sources": {
"type": "array",
"items": {"type": "string"},
"description": "Sources to exclude (label/IP list/workload HREFs, FQDNs, IPs)"
},
"include_destinations": {
"type": "array",
"items": {"type": "string"},
"description": "Destinations to include (label/IP list/workload HREFs, FQDNs, IPs)"
},
"exclude_destinations": {
"type": "array",
"items": {"type": "string"},
"description": "Destinations to exclude (label/IP list/workload HREFs, FQDNs, IPs)"
},
"include_services": {
"type": "array",
"items": {
"type": "object",
"properties": {
"port": {"type": "integer"},
"proto": {"type": "string"}
}
}
},
"exclude_services": {
"type": "array",
"items": {
"type": "object",
"properties": {
"port": {"type": "integer"},
"proto": {"type": "string"}
}
}
},
"policy_decisions": {
"type": "array",
"items": {
"type": "string",
"enum": ["allowed", "blocked", "potentially_blocked", "unknown"]
}
},
"exclude_workloads_from_ip_list_query": {"type": "boolean"},
"max_results": {"type": "integer"},
"query_name": {"type": "string"}
},
"required": ["start_date", "end_date"]
}
),
types.Tool(
name="get-traffic-flows-summary",
description="Get traffic flows from the PCE in a summarized text format, this is a text format that is not a dataframe, it also is not json, the form is: 'From <source> to <destination> on <port> <proto>: <number of connections>'",
inputSchema={
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Starting datetime (YYYY-MM-DD or timestamp)"},
"end_date": {"type": "string", "description": "Ending datetime (YYYY-MM-DD or timestamp)"},
"include_sources": {
"type": "array",
"items": {"type": "string"},
"description": "Sources to include (label/IP list/workload HREFs, FQDNs, IPs). Best case these are hrefs like /orgs/1/labels/57 or similar. Other way is app=env as an example (label key and value)"
},
"exclude_sources": {
"type": "array",
"items": {"type": "string"},
"description": "Sources to exclude (label/IP list/workload HREFs, FQDNs, IPs). Best case these are hrefs like /orgs/1/labels/57 or similar. Other way is app=env as an example (label key and value)"
},
"include_destinations": {
"type": "array",
"items": {"type": "string"},
"description": "Destinations to include (label/IP list/workload HREFs, FQDNs, IPs). Best case these are hrefs like /orgs/1/labels/57 or similar. Other way is app=env as an example (label key and value)"
},
"exclude_destinations": {
"type": "array",
"items": {"type": "string"},
"description": "Destinations to exclude (label/IP list/workload HREFs, FQDNs, IPs). Best case these are hrefs like /orgs/1/labels/57 or similar. Other way is app=env as an example (label key and value)"
},
"include_services": {
"type": "array",
"items": {
"type": "object",
"properties": {
"port": {"type": "integer"},
"proto": {"type": "string"}
}
}
},
"exclude_services": {
"type": "array",
"items": {
"type": "object",
"properties": {
"port": {"type": "integer"},
"proto": {"type": "string"}
}
}
},
"policy_decisions": {
"type": "array",
"items": {
"type": "string",
"enum": ["allowed", "potentially_blocked", "blocked", "unknown"]
}
},
"exclude_workloads_from_ip_list_query": {"type": "boolean"},
"max_results": {"type": "integer"},
"query_name": {"type": "string"}
},
"required": ["start_date", "end_date"]
}
),
types.Tool(
name="check-pce-connection",
description="Are my credentials and the connection to the PCE working?",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="get-rulesets",
description="Get rulesets from the PCE",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Filter rulesets by name (optional)"
},
"enabled": {
"type": "boolean",
"description": "Filter by enabled/disabled status (optional)"
}
}
}
),
# add a delete-ruleset tool, either by href or name
types.Tool(
name="delete-ruleset",
description="Delete a ruleset from the PCE",
inputSchema={
"type": "object",
"properties": {
"href": {"type": "string"},
"name": {"type": "string"}
},
"oneOf": [
{"required": ["href"]},
{"required": ["name"]}
]
}
),
types.Tool(
name="get-iplists",
description="Get IP lists from the PCE",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Filter IP lists by name (optional)"
},
"description": {
"type": "string",
"description": "Filter by description (optional)"
},
"ip_ranges": {
"type": "array",
"items": {
"type": "string"
},
"description": "Filter by IP ranges (optional)"
}
}
}
),
types.Tool(
name="get-events",
description="Get events from the PCE",
inputSchema={
"type": "object",
"properties": {
"event_type": {
"type": "string",
"description": "Filter by event type (e.g., 'system_task.expire_service_account_api_keys')"
},
"severity": {
"type": "string",
"enum": ["emerg", "alert", "crit", "err", "warning", "notice", "info", "debug"],
"description": "Filter by event severity"
},
"status": {
"type": "string",
"enum": ["success", "failure"],
"description": "Filter by event status"
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return",
"default": 100
}
}
}
),
types.Tool(
name="create-ruleset",
description="Create a ruleset in the PCE with support for ring-fencing patterns",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name of the ruleset (e.g., 'RS-ELK'). Must be unique in the PCE."},
"description": {"type": "string", "description": "Description of the ruleset (optional)"},
"scopes": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "string"}
},
"description": "List of label combinations that define scopes. Each scope is an array of label values. This need to be label references like /orgs/1/labels/57 or similar. Get the label href from the get-labels tool."
},
"rules": {
"type": "array",
"items": {
"type": "object",
"properties": {
"providers": {
"type": "array",
"items": {"type": "string"},
"description": "Array of provider labels, 'ams' for all workloads, or IP list references (e.g., 'iplist:Any (0.0.0.0/0)')"
},
"consumers": {
"type": "array",
"items": {"type": "string"},
"description": "Array of consumer labels, 'ams' for all workloads, or IP list references (e.g., 'iplist:Any (0.0.0.0/0)')"
},
"ingress_services": {
"type": "array",
"items": {
"type": "object",
"properties": {
"port": {"type": "integer"},
"proto": {"type": "string"}
},
"required": ["port", "proto"]
}
},
"unscoped_consumers": {
"type": "boolean",
"description": "Whether to allow unscoped consumers (extra-scope rule)",
"default": False
}
},
"required": ["providers", "consumers", "ingress_services"]
}
}
},
"required": ["name", "scopes"]
}
),
types.Tool(
name="get-services",
description="Get services from the PCE with optional filtering",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Filter services by name"
},
"description": {
"type": "string",
"description": "Filter services by description"
},
"port": {
"type": "integer",
"description": "Filter services by port number"
},
"proto": {
"type": "string",
"description": "Filter services by protocol (e.g., tcp, udp)"
},
"process_name": {
"type": "string",
"description": "Filter services by process name"
}
}
}
),
types.Tool(
name="update-label",
description="Update an existing label in the PCE",
inputSchema={
"type": "object",
"properties": {
"href": {
"type": "string",
"description": "Label href (e.g., /orgs/1/labels/42). Either href or both key and value must be provided to identify the label."
},
"key": {
"type": "string",
"description": "Label type (e.g., role, app, env, loc)"
},
"value": {
"type": "string",
"description": "Current value of the label"
},
"new_value": {
"type": "string",
"description": "New value for the label"
}
},
"oneOf": [
{"required": ["href", "key", "new_value"]},
{"required": ["key", "value", "new_value"]}
]
}
),
types.Tool(
name="create-iplist",
description="Create a new IP List in the PCE",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the IP List"
},
"description": {
"type": "string",
"description": "Description of the IP List"
},
"ip_ranges": {
"type": "array",
"description": "List of IP ranges to include",
"items": {
"type": "object",
"properties": {
"from_ip": {
"type": "string",
"description": "Starting IP address (IPv4 or IPv6)"
},
"to_ip": {
"type": "string",
"description": "Ending IP address (optional, for ranges)"
},
"description": {
"type": "string",
"description": "Description of this IP range (optional)"
},
"exclusion": {
"type": "boolean",
"description": "Whether this is an exclusion range",
"default": False
}
},
"required": ["from_ip"]
}
},
"fqdn": {
"type": "string",
"description": "Fully Qualified Domain Name (optional)"
}
},
"required": ["name", "ip_ranges"]
}
),
types.Tool(
name="update-iplist",
description="Update an existing IP List in the PCE",
inputSchema={
"type": "object",
"properties": {
"href": {
"type": "string",
"description": "Href of the IP List to update"
},
"name": {
"type": "string",
"description": "Name of the IP List to update (alternative to href)"
},
"description": {
"type": "string",
"description": "New description for the IP List (optional)"
},
"ip_ranges": {
"type": "array",
"description": "New list of IP ranges",
"items": {
"type": "object",
"properties": {
"from_ip": {
"type": "string",
"description": "Starting IP address (IPv4 or IPv6)"
},
"to_ip": {
"type": "string",
"description": "Ending IP address (optional, for ranges)"
},
"description": {
"type": "string",
"description": "Description of this IP range (optional)"
},
"exclusion": {
"type": "boolean",
"description": "Whether this is an exclusion range",
"default": False
}
},
"required": ["from_ip"]
}
},
"fqdn": {
"type": "string",
"description": "New Fully Qualified Domain Name (optional)"
}
},
"oneOf": [
{"required": ["href"]},
{"required": ["name"]}
]
}
),
types.Tool(
name="delete-iplist",
description="Delete an IP List from the PCE",
inputSchema={
"type": "object",
"properties": {
"href": {
"type": "string",
"description": "Href of the IP List to delete"
},
"name": {
"type": "string",
"description": "Name of the IP List to delete (alternative to href)"
}
},
"oneOf": [
{"required": ["href"]},
{"required": ["name"]}
]
}
),
types.Tool(
name="update-ruleset",
description="Update an existing ruleset in the PCE",
inputSchema={
"type": "object",
"properties": {
"href": {
"type": "string",
"description": "Href of the ruleset to update"
},
"name": {
"type": "string",
"description": "Name of the ruleset to update (alternative to href)"
},
"description": {
"type": "string",
"description": "New description for the ruleset"
},
"enabled": {
"type": "boolean",
"description": "Whether the ruleset is enabled"
},
"scopes": {
"type": "array",
"description": "New scopes for the ruleset",
"items": {
"type": "array",
"items": {
"oneOf": [
{
"type": "string",
"description": "Label href or key=value string"
},
{
"type": "object",
"properties": {
"href": {
"type": "string",
"description": "Label href"
}
},
"required": ["href"]
}
]
}
}
}
},
"oneOf": [
{"required": ["href"]},
{"required": ["name"]}
]
}
),
types.Tool(
name="delete-ruleset",
description="Delete a ruleset from the PCE",
inputSchema={
"type": "object",
"properties": {
"href": {
"type": "string",
"description": "Href of the ruleset to delete"
},
"name": {
"type": "string",
"description": "Name of the ruleset to delete (alternative to href)"
}
},
"oneOf": [
{"required": ["href"]},
{"required": ["name"]}
]
}
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
logger.debug(f"Handling tool call: {name} with arguments: {arguments}")
if name == "get-workloads":
# harmonize the logging
logger.debug("=" * 80)
logger.debug("GET WORKLOADS CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
logger.debug("Initializing PCE connection")
try:
logger.debug(f"PCE connection details - Host: {PCE_HOST}, Port: {PCE_PORT}, Org: {PCE_ORG_ID}")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
logger.debug("Credentials set")
connection_status = pce.check_connection()
logger.debug(f"PCE connection status: {connection_status}")
logger.debug("Fetching workloads from PCE")
workloads = pce.workloads.get(params={"include": "labels", "max_results": 10000})
logger.debug(f"Successfully retrieved {len(workloads)} workloads")
return [types.TextContent(
type="text",
text=f"Workloads: {workloads}"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "check-pce-connection":
logger.debug("Initializing PCE connection")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
return [types.TextContent(
type="text",
text=f"PCE connection successful"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "create-label":
logger.debug(f"Creating label with key: {arguments['key']} and value: {arguments['value']}")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
label = Label(key=arguments['key'], value=arguments['value'])
label = pce.labels.create(label)
logger.debug(f"Label created with status: {label}")
return [types.TextContent(
type="text",
text=f"Label created with status: {label}"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "delete-label":
logger.debug(f"Deleting label with key: {arguments['key']} and value: {arguments['value']}")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
label = pce.labels.get(params = { "key": arguments['key'], "value": arguments['value'] })
if label:
pce.labels.delete(label[0])
return [types.TextContent(
type="text",
text=f"Label deleted with status: {label}"
)]
else:
return [types.TextContent(
type="text",
text=f"Label not found"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "get-labels":
logger.debug("Initializing PCE connection")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
resp = pce.get('/labels')
labels = resp.json()
return [types.TextContent(
type="text",
text= f"Labels: {labels}"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="json",
text=f"Error: {error_msg}"
)]
elif name == "create-workload":
logger.debug(f"Creating workload with name: {arguments['name']} and ip_addresses: {arguments['ip_addresses']}")
logger.debug(f"Labels: {arguments['labels']}")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
interfaces = []
prefix = "eth"
if_count = 0
for ip in arguments['ip_addresses']:
intf = Interface(name = f"{prefix}{if_count}", address = ip)
interfaces.append(intf)
if_count += 1
workload_labels = []
for label in arguments['labels']:
logger.debug(f"Label: {label}")
# check if label already exists
label_resp = pce.labels.get(params = { "key": label['key'], "value": label['value'] })
if label_resp:
logger.debug(f"Label already exists: {label_resp}")
workload_label = label_resp[0] # Get the first matching label
else:
logger.debug(f"Label does not exist, creating: {label}")
new_label = Label(key=label['key'], value=label['value'])
workload_label = pce.labels.create(new_label)
workload_labels.append(workload_label)
logger.debug(f"Labels: {workload_labels}")
workload = Workload(
name=arguments['name'],
interfaces=interfaces,
labels=workload_labels,
hostname=arguments['name'] # Adding hostname which might be required
)
status = pce.workloads.create(workload)
logger.debug(f"Workload creation status: {status}")
return [types.TextContent(
type="text",
text=f"Workload created with status: {status}, workload: {workload}"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "update-workload":
logger.debug(f"Updating workload with name: {arguments['name']} and ip_addresses: {arguments['ip_addresses']}")
logger.debug(f"Labels: {arguments['labels']}")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
workload = pce.workloads.get(params = { "name": arguments['name'] })
if workload:
logger.debug(f"Workload found: {workload}")
interfaces = []
prefix = "eth"
if_count = 0
for ip in arguments['ip_addresses']:
intf = Interface(name = f"{prefix}{if_count}", address = ip)
interfaces.append(intf)
if_count += 1
workload_labels = []
for label in arguments['labels']:
logger.debug(f"Label: {label}")
# check if label already exists
label_resp = pce.labels.get(params = { "key": label['key'], "value": label['value'] })
if label_resp:
logger.debug(f"Label already exists: {label_resp}")
workload_label = label_resp[0] # Get the first matching label
else:
logger.debug(f"Label does not exist, creating: {label}")
new_label = Label(key=label['key'], value=label['value'])
workload_label = pce.labels.create(new_label)
workload_labels.append(workload_label)
logger.debug(f"Labels: {workload_labels}")
if workload_labels:
workload = pce.workloads.update(workload[0], labels=workload_labels)
logger.debug(f"Workload update status: {workload}")
return [types.TextContent(
type="text",
text=f"Workload updated with status: {workload}"
)]
elif interfaces:
workload = pce.workloads.update(workload[0], interfaces=interfaces)
logger.debug(f"Workload update status: {workload}")
return [types.TextContent(
type="text",
text=f"Workload updated with status: {workload}"
)]
elif interfaces and workload_labels:
workload = pce.workloads.update(workload[0], interfaces=interfaces, labels=workload_labels)
logger.debug(f"Workload update status: {workload}")
return [types.TextContent(
type="text",
text=f"Workload updated with status: {workload}"
)]
else:
logger.debug(f"Workload not found")
return [types.TextContent(
type="text",
text=f"Workload not found"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "delete-workload":
logger.debug(f"Deleting workload with name: {arguments['name']}")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
workload = pce.workloads.get(params = { "name": arguments['name'] })
if workload:
pce.workloads.delete(workload[0])
return [types.TextContent(
type="text",
text=f"Workload deleted with status: {status}"
)]
else:
return [types.TextContent(
type="text",
text=f"Workload not found"
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "get-traffic-flows":
logger.debug("=" * 80)
logger.debug("GET TRAFFIC FLOWS CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
# assume a default start date of 1 day ago and end date of now
if 'start_date' not in arguments:
arguments['start_date'] = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
if 'end_date' not in arguments:
arguments['end_date'] = datetime.now().strftime('%Y-%m-%d')
if not arguments or 'start_date' not in arguments or 'end_date' not in arguments:
error_msg = "Missing required arguments: 'start_date' and 'end_date' are required"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
logger.debug(f"Start Date: {arguments.get('start_date')}")
logger.debug(f"End Date: {arguments.get('end_date')}")
logger.debug(f"Include Sources: {arguments.get('include_sources', [])}")
logger.debug(f"Exclude Sources: {arguments.get('exclude_sources', [])}")
logger.debug(f"Include Destinations: {arguments.get('include_destinations', [])}")
logger.debug(f"Exclude Destinations: {arguments.get('exclude_destinations', [])}")
logger.debug(f"Include Services: {arguments.get('include_services', [])}")
logger.debug(f"Exclude Services: {arguments.get('exclude_services', [])}")
logger.debug(f"Policy Decisions: {arguments.get('policy_decisions', [])}")
logger.debug(f"Exclude Workloads from IP List: {arguments.get('exclude_workloads_from_ip_list_query', True)}")
logger.debug(f"Max Results: {arguments.get('max_results', 900)}")
logger.debug(f"Query Name: {arguments.get('query_name')}")
logger.debug("=" * 80)
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
logger.debug(f"Due to a condition in MCP, max results is set to {MCP_BUG_MAX_RESULTS}")
# TODO: fix this in the future...
arguments['max_results'] = MCP_BUG_MAX_RESULTS
traffic_query = TrafficQuery.build(
start_date=arguments['start_date'],
end_date=arguments['end_date'],
include_sources=arguments.get('include_sources', [[]]),
exclude_sources=arguments.get('exclude_sources', []),
include_destinations=arguments.get('include_destinations', [[]]),
exclude_destinations=arguments.get('exclude_destinations', []),
include_services=arguments.get('include_services', []),
exclude_services=arguments.get('exclude_services', []),
policy_decisions=arguments.get('policy_decisions', []),
exclude_workloads_from_ip_list_query=arguments.get('exclude_workloads_from_ip_list_query', True),
max_results=arguments.get('max_results', 10000),
query_name=arguments.get('query_name', 'mcp-traffic-query')
)
all_traffic = pce.get_traffic_flows_async(
query_name=arguments.get('query_name', 'mcp-traffic-query'),
traffic_query=traffic_query
)
# Convert the traffic flows to a serializable format
traffic_data = []
for flow in all_traffic:
try:
flow_dict = {
'src_ip': str(flow.src.ip) if flow.src and hasattr(flow.src, 'ip') else None,
'dst_ip': str(flow.dst.ip) if flow.dst and hasattr(flow.dst, 'ip') else None,
'proto': str(flow.service.proto) if flow.service and hasattr(flow.service, 'proto') else None,
'port': int(flow.service.port) if flow.service and hasattr(flow.service, 'port') else None,
'policy_decision': str(flow.policy_decision) if hasattr(flow, 'policy_decision') else None,
'num_connections': int(flow.num_connections) if hasattr(flow, 'num_connections') else 0
}
# Add workload information if available
if hasattr(flow.src, 'workload') and flow.src.workload:
flow_dict['src_workload'] = str(flow.src.workload.name)
if hasattr(flow.dst, 'workload') and flow.dst.workload:
flow_dict['dst_workload'] = str(flow.dst.workload.name)
traffic_data.append(flow_dict)
except Exception as e:
logger.error(f"Error processing flow: {e}")
continue
df = to_dataframe(all_traffic)
# group dataframe by src_ip, dst_ip, proto, port, policy_decision tuples, aggregate num_connections
df = df.groupby(['src_ip', 'dst_ip', 'proto', 'port', 'policy_decision']).agg({'num_connections': 'sum'}).reset_index()
# limit dataframe json output to less than 1048576
MAX_ROWS = 1000
if len(df) > MAX_ROWS:
logger.warning(f"Truncating results from {len(df)} to {MAX_ROWS} entries")
df = df.nlargest(MAX_ROWS, 'num_connections')
response_size = len(df.to_json(orient="records"))
if response_size > 1048576:
logger.warning(f"Response size exceeds 1MB limit. Truncating to {MAX_ROWS} entries")
step_down = 0.9
while response_size > 1048576 or step_down == 0:
rows = int(MAX_ROWS * step_down)
step_down = step_down - 0.1
df = df.nlargest(rows, 'num_connections')
response_size = len(df.to_json(orient="records"))
logger.debug(f"Response size: {response_size} Step down: {step_down}")
# trying this in case GC doesn't work
df_json = df.to_json(orient="records")
del df
# return dataframe df in json format
return [types.TextContent(
type="text",
text= df_json
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "get-traffic-flows-summary":
logger.debug("=" * 80)
logger.debug("GET TRAFFIC FLOWS SUMMARY CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug(f"Start Date: {arguments.get('start_date')}")
logger.debug(f"End Date: {arguments.get('end_date')}")
logger.debug(f"Include Sources: {arguments.get('include_sources', [])}")
logger.debug(f"Exclude Sources: {arguments.get('exclude_sources', [])}")
logger.debug(f"Include Destinations: {arguments.get('include_destinations', [])}")
logger.debug(f"Exclude Destinations: {arguments.get('exclude_destinations', [])}")
logger.debug(f"Include Services: {arguments.get('include_services', [])}")
logger.debug(f"Exclude Services: {arguments.get('exclude_services', [])}")
logger.debug(f"Policy Decisions: {arguments.get('policy_decisions', [])}")
logger.debug(f"Exclude Workloads from IP List: {arguments.get('exclude_workloads_from_ip_list_query', True)}")
logger.debug(f"Max Results: {arguments.get('max_results', 10000)}")
logger.debug(f"Query Name: {arguments.get('query_name')}")
logger.debug("=" * 80)
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
logger.debug(f"Due to a condition in MCP, max results is set to {MCP_BUG_MAX_RESULTS}")
# TODO: fix this in the future...
if 'max_results' in arguments and arguments.get('max_results') > MCP_BUG_MAX_RESULTS:
logger.debug(f"Setting max results to {MCP_BUG_MAX_RESULTS} from original value {arguments.get('max_results')}")
arguments['max_results'] = MCP_BUG_MAX_RESULTS
query = TrafficQuery.build(
start_date=arguments['start_date'],
end_date=arguments['end_date'],
include_sources=arguments.get('include_sources', [[]]),
exclude_sources=arguments.get('exclude_sources', []),
include_destinations=arguments.get('include_destinations', [[]]),
exclude_destinations=arguments.get('exclude_destinations', []),
include_services=arguments.get('include_services', []),
exclude_services=arguments.get('exclude_services', []),
policy_decisions=arguments.get('policy_decisions', []),
exclude_workloads_from_ip_list_query=arguments.get('exclude_workloads_from_ip_list_query', True),
max_results=arguments.get('max_results', 10000),
query_name=arguments.get('query_name', 'mcp-traffic-summary')
)
all_traffic = pce.get_traffic_flows_async(
query_name=arguments.get('query_name', 'mcp-traffic-summary'),
traffic_query=query
)
df = to_dataframe(all_traffic)
summary = summarize_traffic(df)
summary_lines = ""
# Ensure the summary is a list of strings
if isinstance(summary, list):
# join list to be one string separated by newlines
summary_lines = "\n".join(summary)
else:
summary_lines = str(summary)
logger.debug(f"Summary data type: {type(summary_lines)}")
logger.debug(f"Summary size: {len(summary_lines)}")
return [types.TextContent(
type="text",
text=summary_lines
)]
except Exception as e:
error_msg = f"Failed in PCE operation: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "get-rulesets":
logger.debug("=" * 80)
logger.debug("GET RULESETS CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug(f"Name filter: {arguments.get('name')}")
logger.debug(f"Enabled filter: {arguments.get('enabled')}")
logger.debug("=" * 80)
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Prepare filter parameters
params = {}
if arguments.get('name'):
params['name'] = arguments['name']
if arguments.get('enabled') is not None:
params['enabled'] = arguments['enabled']
rulesets = pce.rule_sets.get_all()
# Convert rulesets to serializable format
ruleset_data = []
for ruleset in rulesets:
rules = []
for rule in ruleset.rules:
rule_dict = {
'enabled': rule.enabled,
'description': rule.description,
'resolve_labels_as': str(rule.resolve_labels_as) if rule.resolve_labels_as else None,
'consumers': [str(consumer) for consumer in rule.consumers] if rule.consumers else [],
'providers': [str(provider) for provider in rule.providers] if rule.providers else [],
'ingress_services': [str(service) for service in rule.ingress_services] if rule.ingress_services else []
}
rules.append(rule_dict)
ruleset_dict = {
'href': ruleset.href,
'name': ruleset.name,
'enabled': ruleset.enabled,
'description': ruleset.description,
'scopes': [str(scope) for scope in ruleset.scopes] if ruleset.scopes else [],
'rules': rules
}
ruleset_data.append(ruleset_dict)
return [types.TextContent(
type="text",
text=json.dumps({
"rulesets": ruleset_data,
"total_count": len(ruleset_data)
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to get rulesets: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "get-iplists":
logger.debug("=" * 80)
logger.debug("GET IP LISTS CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug(f"Name filter: {arguments.get('name')}")
logger.debug(f"Description filter: {arguments.get('description')}")
logger.debug(f"IP ranges filter: {arguments.get('ip_ranges', [])}")
logger.debug("=" * 80)
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Prepare filter parameters
params = {}
if arguments.get('name'):
params['name'] = arguments['name']
if arguments.get('description'):
params['description'] = arguments['description']
params['max_results'] = 10000
ip_lists = pce.ip_lists.get(params=params)
# Convert IP lists to serializable format
iplist_data = []
for iplist in ip_lists:
iplist_dict = {
'href': iplist.href,
'name': iplist.name,
'description': iplist.description,
'ip_ranges': [str(ip_range) for ip_range in iplist.ip_ranges] if iplist.ip_ranges else [],
'fqdns': iplist.fqdns if hasattr(iplist, 'fqdns') else [],
'created_at': str(iplist.created_at) if hasattr(iplist, 'created_at') else None,
'updated_at': str(iplist.updated_at) if hasattr(iplist, 'updated_at') else None,
'deleted_at': str(iplist.deleted_at) if hasattr(iplist, 'deleted_at') else None,
'created_by': str(iplist.created_by) if hasattr(iplist, 'created_by') else None,
'updated_by': str(iplist.updated_by) if hasattr(iplist, 'updated_by') else None,
'deleted_by': str(iplist.deleted_by) if hasattr(iplist, 'deleted_by') else None
}
# Apply IP ranges filter if provided
if arguments.get('ip_ranges'):
if any(ip_range in iplist_dict['ip_ranges'] for ip_range in arguments['ip_ranges']):
iplist_data.append(iplist_dict)
else:
iplist_data.append(iplist_dict)
logger.debug(f"Found {len(iplist_data)} IP lists")
return [types.TextContent(
type="text",
text=json.dumps({
"ip_lists": iplist_data,
"total_count": len(iplist_data)
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to get IP lists: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "get-events":
logger.debug("=" * 80)
logger.debug("GET EVENTS CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Prepare filter parameters
params = {}
if arguments.get('event_type'):
params['event_type'] = arguments['event_type']
if arguments.get('severity'):
params['severity'] = arguments['severity']
if arguments.get('status'):
params['status'] = arguments['status']
if arguments.get('max_results'):
params['max_results'] = arguments['max_results']
events = pce.events.get(params=params)
# Convert events to serializable format
event_data = []
for event in events:
event_dict = {
'href': event.href,
'event_type': event.event_type,
'timestamp': str(event.timestamp) if hasattr(event, 'timestamp') else None,
'severity': event.severity if hasattr(event, 'severity') else None,
'status': event.status if hasattr(event, 'status') else None,
'created_by': str(event.created_by) if hasattr(event, 'created_by') else None,
'notification_type': event.notification_type if hasattr(event, 'notification_type') else None,
'info': event.info if hasattr(event, 'info') else None,
'pce_fqdn': event.pce_fqdn if hasattr(event, 'pce_fqdn') else None
}
event_data.append(event_dict)
return [types.TextContent(
type="text",
text=json.dumps({
"events": event_data,
"total_count": len(event_data)
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to get events: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "create-ruleset":
logger.debug("=" * 80)
logger.debug("CREATE RULESET CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# populate the label maps
label_href_map = {}
value_href_map = {}
for l in pce.labels.get(params={'max_results': 10000}):
label_href_map[l.href] = {"key": l.key, "value": l.value}
value_href_map["{}={}".format(l.key, l.value)] = l.href
# Check if ruleset already exists
logger.debug(f"Checking if ruleset '{arguments['name']}' already exists...")
existing_rulesets = pce.rule_sets.get(params={"name": arguments["name"]})
if existing_rulesets:
error_msg = f"Ruleset with name '{arguments['name']}' already exists"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=json.dumps({
"error": error_msg,
"existing_ruleset": {
"href": existing_rulesets[0].href,
"name": existing_rulesets[0].name
}
}, indent=2)
)]
# Create the ruleset
logger.debug(f"Instantiating ruleset object: {arguments['name']}")
ruleset = RuleSet(
name=arguments["name"],
description=arguments.get("description", "")
)
# Handle scopes
label_sets = []
if arguments.get("scopes"):
logger.debug(f"Processing scopes: {json.dumps(arguments['scopes'], indent=2)}")
for scope in arguments["scopes"]:
label_set = LabelSet(labels=[])
for label in scope:
logger.debug(f"Processing label: {label}")
if isinstance(label, dict) and "href" in label:
# Handle direct href references
logger.debug(f"Found label with href: {label['href']}")
append_label = pce.labels.get_by_reference(label["href"])
logger.debug(f"Appending label: {append_label}")
label_set.labels.append(append_label)
elif isinstance(label, str):
# Handle string references (either href or label value)
if label in value_href_map:
logger.debug(f"Found label value: {value_href_map[label]}")
append_label = pce.labels.get_by_reference(value_href_map[label])
else:
logger.debug(f"Assuming direct href: {label}")
append_label = pce.labels.get_by_reference(label)
logger.debug(f"Appending label: {append_label}")
label_set.labels.append(append_label)
else:
logger.warning(f"Unexpected label format: {label}")
continue
label_sets.append(label_set)
logger.debug(f"Label set: {label_set}")
else:
# If no scopes provided, create a default scope with all workloads
logger.debug("No scopes provided, creating default scope with all workloads")
label_sets = [LabelSet(labels=[])]
logger.debug(f"Final ruleset scopes count: {len(label_sets)}")
ruleset.scopes = label_sets
# Create the ruleset in PCE
logger.debug("Creating ruleset in PCE...")
logger.debug(f"Ruleset object scopes: {[str(ls.labels) for ls in ruleset.scopes]}")
ruleset = pce.rule_sets.create(ruleset)
logger.debug(f"Ruleset created with href: {ruleset.href}")
# Create rules if provided
created_rules = []
if arguments.get("rules"):
logger.debug(f"Processing rules: {json.dumps(arguments['rules'], indent=2)}")
for rule_def in arguments["rules"]:
logger.debug(f"Processing rule: {json.dumps(rule_def, indent=2)}")
# Process providers
providers = []
for provider in rule_def["providers"]:
if provider == "ams":
providers.append(AMS)
elif provider.startswith("iplist:"):
# Extract IP list name and look it up
ip_list_name = provider.split(":", 1)[1]
logger.debug(f"Looking up IP list: {ip_list_name}")
ip_lists = pce.ip_lists.get(params={"name": ip_list_name})
if ip_lists:
providers.append(ip_lists[0])
else:
logger.error(f"IP list not found: {ip_list_name}")
return [types.TextContent(
type="text",
text=json.dumps({"error": f"IP list not found: {ip_list_name}"})
)]
elif provider in value_href_map:
providers.append(pce.labels.get_by_reference(value_href_map[provider]))
else:
providers.append(pce.labels.get_by_reference(provider))
# Process consumers
consumers = []
for consumer in rule_def["consumers"]:
if consumer == "ams":
consumers.append(AMS)
elif consumer.startswith("iplist:"):
# Extract IP list name and look it up
ip_list_name = consumer.split(":", 1)[1]
logger.debug(f"Looking up IP list: {ip_list_name}")
ip_lists = pce.ip_lists.get(params={"name": ip_list_name})
if ip_lists:
consumers.append(ip_lists[0])
else:
logger.error(f"IP list not found: {ip_list_name}")
return [types.TextContent(
type="text",
text=json.dumps({"error": f"IP list not found: {ip_list_name}"})
)]
elif consumer in value_href_map:
consumers.append(pce.labels.get_by_reference(value_href_map[consumer]))
else:
consumers.append(pce.labels.get_by_reference(consumer))
# Create ingress services
ingress_services = []
for svc in rule_def["ingress_services"]:
service_port = ServicePort(
port=svc["port"],
proto=svc["proto"]
)
ingress_services.append(service_port)
# Build and create the rule
rule = Rule.build(
providers=providers,
consumers=consumers,
ingress_services=ingress_services,
unscoped_consumers=rule_def.get("unscoped_consumers", False)
)
created_rule = pce.rules.create(rule, parent=ruleset)
created_rules.append({
"href": created_rule.href,
"providers": [str(p) for p in providers],
"consumers": [str(c) for c in consumers],
"services": [f"{s.port}/{s.proto}" for s in ingress_services],
"unscoped_consumers": rule_def.get("unscoped_consumers", False)
})
# Update the response to include rules
return [types.TextContent(
type="text",
text=json.dumps({
"ruleset": {
"href": ruleset.href,
"name": ruleset.name,
"description": ruleset.description,
"rules": created_rules
}
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to create ruleset: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "delete-ruleset":
logger.debug("=" * 80)
logger.debug("DELETE RULESET CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
# add implementation here for delete-ruleset
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# check if href or name is provided, for href, delete, for name, get and delete
if "href" in arguments:
pce.rule_sets.delete(arguments["href"])
elif "name" in arguments:
rulesets = pce.rule_sets.get(params={"name": arguments["name"]})
if rulesets:
pce.rule_sets.delete(rulesets[0].href)
return [types.TextContent(type="text", text=json.dumps({"success": "Ruleset deleted successfully"}))]
else:
return [types.TextContent(type="text", text=json.dumps({"error": "Ruleset not found"}))]
else:
return [types.TextContent(type="text", text=json.dumps({"error": "Either href or name must be provided"}))]
except Exception as e:
error_msg = f"Failed to delete ruleset: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(type="text", text=json.dumps({"error": error_msg}))]
elif name == "get-services":
logger.debug("=" * 80)
logger.debug("GET SERVICES CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Prepare filter parameters
params = {}
if arguments.get('name'):
params['name'] = arguments['name']
if arguments.get('description'):
params['description'] = arguments['description']
if arguments.get('port'):
params['port'] = arguments['port']
if arguments.get('proto'):
params['proto'] = arguments['proto']
if arguments.get('process_name'):
params['process_name'] = arguments['process_name']
logger.debug(f"Querying services with params: {json.dumps(params, indent=2)}")
services = pce.services.get(params=params)
logger.debug(f"Found {len(services)} services")
# Convert services to serializable format
service_data = []
for service in services:
logger.debug(f"Processing service: {service.name} ({service.href})")
service_dict = {
'href': service.href,
'name': service.name,
'description': service.description if hasattr(service, 'description') else None,
'process_name': service.process_name if hasattr(service, 'process_name') else None,
'service_ports': []
}
# Add service ports - check both possible attribute names
ports = []
if hasattr(service, 'service_ports'):
# logger.debug(f"Found service_ports attribute for {service.name}")
ports = service.service_ports or [] # Handle None case
elif hasattr(service, 'ports'):
# logger.debug(f"Found ports attribute for {service.name}")
ports = service.ports or [] # Handle None case
logger.debug(f"Processing {len(ports)} ports for service {service.name}")
for port in ports:
try:
port_dict = {
'port': port.port,
'proto': port.proto
}
# Only add to_port if it exists and is different from port
if hasattr(port, 'to_port') and port.to_port is not None:
port_dict['to_port'] = port.to_port
service_dict['service_ports'].append(port_dict)
logger.debug(f"Added port {port.port}/{port.proto} to service {service.name}")
except AttributeError as e:
logger.warning(f"Error processing port {port} for service {service.name}: {e}")
continue
# Add windows services if present
if hasattr(service, 'windows_services'):
logger.debug(f"Found windows_services for {service.name}")
service_dict['windows_services'] = service.windows_services
service_data.append(service_dict)
logger.debug(f"Completed processing service: {service.name}")
logger.debug(f"Service data: {json.dumps(service_data, indent=2)}")
logger.debug(f"Successfully processed {len(service_data)} services")
return [types.TextContent(
type="text",
text=json.dumps({
"services": service_data,
"total_count": len(service_data)
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to get services: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg})
)]
elif name == "update-label":
logger.debug("Initializing PCE connection")
try:
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
href = arguments.get("href")
key = arguments.get("key")
value = arguments.get("value")
new_value = arguments.get("new_value")
# First, find the label
label = None
if href:
logger.debug(f"Looking up label by href: {href}")
try:
label = pce.labels.get_by_reference(href)
logger.debug(f"Found label by href: {label}")
except Exception as e:
logger.error(f"Failed to find label by href {href}: {str(e)}")
return [types.TextContent(
type="text",
text=f"Error: Label with href {href} not found"
)]
else:
logger.debug(f"Looking up label by key={key}, value={value}")
labels = pce.labels.get(params={"key": key, "value": value})
if labels and len(labels) > 0:
label = labels[0] # Get the first matching label
logger.debug(f"Found label by key-value: {label}")
else:
logger.error(f"No label found with key={key}, value={value}")
return [types.TextContent(
type="text",
text=f"Error: No label found with key={key}, value={value}"
)]
if label:
logger.debug(f"Updating label {label.href} with new_value={new_value}")
# Prepare the update payload - only include the new value
update_data = {
"value": new_value
}
# Update the label
updated_label = pce.labels.update(label.href, update_data)
logger.debug(f"Label updated successfully: {updated_label}")
return [types.TextContent(
type="text",
text=f"Successfully updated label: {updated_label}"
)]
else:
error_msg = "Failed to find label to update"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
except Exception as e:
error_msg = f"Failed to update label: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=f"Error: {error_msg}"
)]
elif name == "create-iplist":
logger.debug("=" * 80)
logger.debug("CREATE IP LIST CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Check if IP List already exists
logger.debug(f"Checking if IP List '{arguments['name']}' already exists...")
existing_iplists = pce.ip_lists.get(params={"name": arguments["name"]})
if existing_iplists:
error_msg = f"IP List with name '{arguments['name']}' already exists"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=json.dumps({
"error": error_msg,
"existing_iplist": {
"href": existing_iplists[0].href,
"name": existing_iplists[0].name
}
}, indent=2)
)]
# Create IP ranges
ip_ranges = []
for range_def in arguments["ip_ranges"]:
ip_range = {
"from_ip": range_def["from_ip"],
"exclusion": range_def.get("exclusion", False)
}
# Add optional fields if present
if "to_ip" in range_def:
ip_range["to_ip"] = range_def["to_ip"]
if "description" in range_def:
ip_range["description"] = range_def["description"]
ip_ranges.append(ip_range)
# Create the IP List object
iplist_data = {
"name": arguments["name"],
"ip_ranges": ip_ranges
}
# Add optional fields if present
if "description" in arguments:
iplist_data["description"] = arguments["description"]
if "fqdn" in arguments:
iplist_data["fqdn"] = arguments["fqdn"]
logger.debug(f"Creating IP List with data: {json.dumps(iplist_data, indent=2)}")
iplist = pce.ip_lists.create(iplist_data)
# Format response
response_data = {
"href": iplist.href,
"name": iplist.name,
"description": getattr(iplist, "description", None),
"ip_ranges": [
{
"from_ip": r.from_ip,
"to_ip": getattr(r, "to_ip", None),
"description": getattr(r, "description", None),
"exclusion": getattr(r, "exclusion", False)
} for r in iplist.ip_ranges
],
"fqdn": getattr(iplist, "fqdn", None)
}
return [types.TextContent(
type="text",
text=json.dumps(response_data, indent=2)
)]
except Exception as e:
error_msg = f"Failed to create IP List: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg}, indent=2)
)]
elif name == "update-iplist":
logger.debug("=" * 80)
logger.debug("UPDATE IP LIST CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Find the IP List
iplist = None
if "href" in arguments:
logger.debug(f"Looking up IP List by href: {arguments['href']}")
try:
iplist = pce.ip_lists.get_by_reference(arguments['href'])
except Exception as e:
logger.error(f"Failed to find IP List by href: {str(e)}")
return [types.TextContent(
type="text",
text=json.dumps({"error": f"IP List not found: {str(e)}"}, indent=2)
)]
else:
logger.debug(f"Looking up IP List by name: {arguments['name']}")
iplists = pce.ip_lists.get(params={"name": arguments["name"]})
if iplists:
iplist = iplists[0]
else:
return [types.TextContent(
type="text",
text=json.dumps({"error": f"IP List with name '{arguments['name']}' not found"}, indent=2)
)]
logger.debug(f"Found IP List: {iplist.href}, {iplist.name}")
# Prepare update data
update_data = {}
if "description" in arguments:
update_data["description"] = arguments["description"]
if "fqdn" in arguments:
update_data["fqdn"] = arguments["fqdn"]
if "ip_ranges" in arguments:
ip_ranges = []
for range_def in arguments["ip_ranges"]:
ip_range = {
"from_ip": range_def["from_ip"],
"exclusion": range_def.get("exclusion", False)
}
if "to_ip" in range_def:
ip_range["to_ip"] = range_def["to_ip"]
if "description" in range_def:
ip_range["description"] = range_def["description"]
ip_ranges.append(ip_range)
update_data["ip_ranges"] = ip_ranges
logger.debug(f"Updating IP List with data: {json.dumps(update_data, indent=2)}")
# Update the IP List
pce.ip_lists.update(iplist.href, update_data)
# Fetch the updated IP List to get the current state
updated_iplist = pce.ip_lists.get_by_reference(iplist.href)
# Format response
response_data = {
"href": updated_iplist.href,
"name": updated_iplist.name,
"description": getattr(updated_iplist, "description", None),
"ip_ranges": []
}
# Safely add IP ranges if they exist
if hasattr(updated_iplist, 'ip_ranges') and updated_iplist.ip_ranges:
for r in updated_iplist.ip_ranges:
range_data = {"from_ip": r.from_ip}
if hasattr(r, "to_ip"):
range_data["to_ip"] = r.to_ip
if hasattr(r, "description"):
range_data["description"] = r.description
if hasattr(r, "exclusion"):
range_data["exclusion"] = r.exclusion
response_data["ip_ranges"].append(range_data)
# Add FQDN if it exists
if hasattr(updated_iplist, "fqdn"):
response_data["fqdn"] = updated_iplist.fqdn
return [types.TextContent(
type="text",
text=json.dumps(response_data, indent=2)
)]
except Exception as e:
error_msg = f"Failed to update IP List: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg}, indent=2)
)]
elif name == "delete-iplist":
logger.debug("=" * 80)
logger.debug("DELETE IP LIST CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Find the IP List
iplist = None
if "href" in arguments:
logger.debug(f"Looking up IP List by href: {arguments['href']}")
try:
iplist = pce.ip_lists.get_by_reference(arguments['href'])
except Exception as e:
logger.error(f"Failed to find IP List by href: {str(e)}")
return [types.TextContent(
type="text",
text=json.dumps({"error": f"IP List not found: {str(e)}"}, indent=2)
)]
else:
logger.debug(f"Looking up IP List by name: {arguments['name']}")
iplists = pce.ip_lists.get(params={"name": arguments["name"]})
if iplists:
iplist = iplists[0]
else:
return [types.TextContent(
type="text",
text=json.dumps({"error": f"IP List with name '{arguments['name']}' not found"}, indent=2)
)]
# Delete the IP List
logger.debug(f"Deleting IP List: {iplist.href}")
pce.ip_lists.delete(iplist.href)
return [types.TextContent(
type="text",
text=json.dumps({
"message": f"Successfully deleted IP List: {iplist.name}",
"href": iplist.href
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to delete IP List: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg}, indent=2)
)]
elif name == "update-ruleset":
logger.debug("=" * 80)
logger.debug("UPDATE RULESET CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Find the ruleset
ruleset = None
if "href" in arguments:
logger.debug(f"Looking up ruleset by href: {arguments['href']}")
try:
ruleset = pce.rule_sets.get_by_reference(arguments['href'])
except Exception as e:
logger.error(f"Failed to find ruleset by href: {str(e)}")
return [types.TextContent(
type="text",
text=json.dumps({"error": f"Ruleset not found: {str(e)}"}, indent=2)
)]
else:
logger.debug(f"Looking up ruleset by name: {arguments['name']}")
rulesets = pce.rule_sets.get(params={"name": arguments["name"]})
if rulesets:
ruleset = rulesets[0]
else:
return [types.TextContent(
type="text",
text=json.dumps({"error": f"Ruleset with name '{arguments['name']}' not found"}, indent=2)
)]
# Prepare update data
update_data = {}
if "description" in arguments:
update_data["description"] = arguments["description"]
if "enabled" in arguments:
update_data["enabled"] = arguments["enabled"]
# Handle scopes if provided
if "scopes" in arguments:
logger.debug(f"Processing scopes: {json.dumps(arguments['scopes'], indent=2)}")
label_sets = []
for scope in arguments["scopes"]:
label_set = LabelSet(labels=[])
for label in scope:
logger.debug(f"Processing label: {label}")
if isinstance(label, dict) and "href" in label:
# Handle direct href references
logger.debug(f"Found label with href: {label['href']}")
append_label = pce.labels.get_by_reference(label["href"])
logger.debug(f"Appending label: {append_label}")
label_set.labels.append(append_label)
elif isinstance(label, str):
# Handle string references (either href or label value)
if "=" in label: # key=value format
key, value = label.split("=", 1)
labels = pce.labels.get(params={"key": key, "value": value})
if labels:
append_label = labels[0]
logger.debug(f"Appending label: {append_label}")
label_set.labels.append(append_label)
else: # direct href
append_label = pce.labels.get_by_reference(label)
logger.debug(f"Appending label: {append_label}")
label_set.labels.append(append_label)
label_sets.append(label_set)
logger.debug(f"Label set: {label_set}")
update_data["scopes"] = label_sets
# Update the ruleset
logger.debug(f"Updating ruleset with data: {update_data}")
updated_ruleset = pce.rule_sets.update(ruleset.href, update_data)
# Format response
response_data = {
"href": updated_ruleset.href,
"name": updated_ruleset.name,
"description": getattr(updated_ruleset, "description", None),
"enabled": getattr(updated_ruleset, "enabled", None),
"scopes": []
}
# Add scopes if they exist
if hasattr(updated_ruleset, "scopes"):
for scope in updated_ruleset.scopes:
scope_labels = []
for label in scope.labels:
scope_labels.append({
"href": label.href,
"key": label.key,
"value": label.value
})
response_data["scopes"].append(scope_labels)
return [types.TextContent(
type="text",
text=json.dumps(response_data, indent=2)
)]
except Exception as e:
error_msg = f"Failed to update ruleset: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg}, indent=2)
)]
elif name == "delete-ruleset":
logger.debug("=" * 80)
logger.debug("DELETE RULESET CALLED")
logger.debug(f"Arguments received: {json.dumps(arguments, indent=2)}")
logger.debug("=" * 80)
try:
logger.debug("Initializing PCE connection...")
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
# Find the ruleset
ruleset = None
if "href" in arguments:
logger.debug(f"Looking up ruleset by href: {arguments['href']}")
try:
ruleset = pce.rule_sets.get_by_reference(arguments['href'])
except Exception as e:
logger.error(f"Failed to find ruleset by href: {str(e)}")
return [types.TextContent(
type="text",
text=json.dumps({"error": f"Ruleset not found: {str(e)}"}, indent=2)
)]
else:
logger.debug(f"Looking up ruleset by name: {arguments['name']}")
rulesets = pce.rule_sets.get(params={"name": arguments["name"]})
if rulesets:
ruleset = rulesets[0]
else:
return [types.TextContent(
type="text",
text=json.dumps({"error": f"Ruleset with name '{arguments['name']}' not found"}, indent=2)
)]
# Delete the ruleset
logger.debug(f"Deleting ruleset: {ruleset.href}")
pce.rule_sets.delete(ruleset.href)
return [types.TextContent(
type="text",
text=json.dumps({
"message": f"Successfully deleted ruleset: {ruleset.name}",
"href": ruleset.href
}, indent=2)
)]
except Exception as e:
error_msg = f"Failed to delete ruleset: {str(e)}"
logger.error(error_msg, exc_info=True)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg}, indent=2)
)]
def to_dataframe(flows):
pce = PolicyComputeEngine(PCE_HOST, port=PCE_PORT, org_id=PCE_ORG_ID)
pce.set_credentials(API_KEY, API_SECRET)
label_href_map = {}
value_href_map = {}
for l in pce.labels.get(params={'max_results': 10000}):
label_href_map[l.href] = {"key": l.key, "value": l.value}
value_href_map["{}={}".format(l.key, l.value)] = l.href
if not flows:
logger.warning("Warning: Empty flows list received.")
return pd.DataFrame()
series_array = []
for flow in flows:
try:
f = {
'src_ip': flow.src.ip,
'src_hostname': flow.src.workload.name if flow.src.workload is not None else None,
'dst_ip': flow.dst.ip,
'dst_hostname': flow.dst.workload.name if flow.dst.workload is not None else None,
'proto': flow.service.proto,
'port': flow.service.port,
'process_name': flow.service.process_name,
'service_name': flow.service.service_name,
'policy_decision': flow.policy_decision,
'flow_direction': flow.flow_direction,
'num_connections': flow.num_connections,
'first_detected': flow.timestamp_range.first_detected,
'last_detected': flow.timestamp_range.last_detected,
}
# Add src and dst app and env labels
if flow.src.workload:
for l in flow.src.workload.labels:
if l.href in label_href_map:
key = label_href_map[l.href]['key']
value = label_href_map[l.href]['value']
f[f'src_{key}'] = value
if flow.dst.workload:
for l in flow.dst.workload.labels:
if l.href in label_href_map:
key = label_href_map[l.href]['key']
value = label_href_map[l.href]['value']
f[f'dst_{key}'] = value
series_array.append(f)
except AttributeError as e:
logger.debug(f"Error processing flow: {e}")
logger.debug(f"Flow object: {flow}")
df = pd.DataFrame(series_array)
return df
def summarize_traffic(df):
logger.debug(f"Summarizing traffic with dataframe: {df}")
# Define all possible group columns
potential_columns = ['src_app', 'src_env', 'dst_app', 'dst_env', 'proto', 'port']
# Filter to only use columns that exist in the DataFrame
group_columns = [col for col in potential_columns if col in df.columns]
if not group_columns:
logger.warning("No grouping columns found in DataFrame")
return "No traffic data available for summarization"
if df.empty:
logger.warning("Empty DataFrame received")
return "No traffic data available for summarization"
logger.debug(f"Using group columns: {group_columns}")
logger.debug(f"DataFrame shape before grouping: {df.shape}")
logger.debug(f"DataFrame columns: {df.columns.tolist()}")
logger.debug(f"First few rows of DataFrame:\n{df.head()}")
# Group by available columns
summary = df.groupby(group_columns)['num_connections'].sum().reset_index()
logger.debug(f"Summary shape after grouping: {summary.shape}")
logger.debug(f"Summary columns: {summary.columns.tolist()}")
logger.debug(f"First few rows of summary:\n{summary.head()}")
# Sort by number of connections in descending order
summary = summary.sort_values('num_connections', ascending=False)
# Convert to a more readable format
summary_list = []
for _, row in summary.iterrows():
# Build source and destination info based on available columns
src_info = []
if 'src_app' in row:
src_info.append(row['src_app'])
if 'src_env' in row:
src_info.append(f"({row['src_env']})")
src_str = " ".join(src_info) if src_info else "Unknown Source"
dst_info = []
if 'dst_app' in row:
dst_info.append(row['dst_app'])
if 'dst_env' in row:
dst_info.append(f"({row['dst_env']})")
dst_str = " ".join(dst_info) if dst_info else "Unknown Destination"
if src_str != dst_str:
port_info = f"port {row['port']}" if 'port' in row else "unknown port"
proto_info = f"proto {row['proto']}" if 'proto' in row else ""
summary_list.append(
f"From {src_str} to {dst_str} on {port_info} {proto_info}: {row['num_connections']} connections"
)
if not summary_list:
return "No traffic patterns to summarize"
return "\n".join(summary_list)
async def main():
# Run the server using stdin/stdout streams
logger.debug("Starting server")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="illumio-mcp",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
class ServicePortEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, ServicePort):
return {
'port': obj.port,
'protocol': obj.protocol
}
return super().default(obj)