Cisco_ISE_MCP.py•21.8 kB
import os
from typing import List, Dict, Any, Optional
import asyncio
import json
from dotenv import load_dotenv
import httpx
from pydantic import BaseModel
from fastmcp import FastMCP
# Load environment variables
load_dotenv()
# Configuration for Cisco ISE ERS API
ISE_API_BASE_URL = os.getenv("ISE_API_BASE_URL", "https://your-ise-server/admin/ERS/v1")
ISE_USERNAME = os.getenv("ISE_USERNAME")
ISE_PASSWORD = os.getenv("ISE_PASSWORD")
if not ISE_USERNAME or not ISE_PASSWORD:
raise ValueError("ISE_USERNAME and ISE_PASSWORD environment variables are required")
# Helper function to get authentication headers
async def get_auth_headers() -> Dict[str, Any]:
"""
Authenticate with Cisco ISE and return Basic Auth headers.
"""
return {
"Authorization": f"Basic {httpx._auth.encode_username_password(ISE_USERNAME, ISE_PASSWORD)}",
"Content-Type": "application/json",
"Accept": "application/json"
}
# Pydantic models for common ISE resources
class Pagination(BaseModel):
total: Optional[int] = None
currentPage: Optional[int] = None
pageSize: Optional[int] = None
totalPages: Optional[int] = None
class Link(BaseModel):
href: Optional[str] = None
rel: Optional[str] = None
type: Optional[str] = None
class SearchResult(BaseModel):
SearchResult: Optional[Dict[str, Any]] = None
resources: Optional[List[Dict[str, Any]]] = None
nextPage: Optional[Dict[str, Any]] = None
previousPage: Optional[Dict[str, Any]] = None
# Models for Person (Identity) resources
class Person(BaseModel):
id: Optional[str] = None
name: Optional[Dict[str, str]] = None
title: Optional[str] = None
firstName: Optional[str] = None
lastName: Optional[str] = None
emailAddress: Optional[str] = None
telephoneNumber: Optional[str] = None
identityGroups: Optional[List[str]] = None
customAttributes: Optional[Dict[str, Any]] = None
link: Optional[Link] = None
class PersonRequest(BaseModel):
name: Optional[Dict[str, str]] = None
title: Optional[str] = None
firstName: Optional[str] = None
lastName: Optional[str] = None
emailAddress: Optional[str] = None
telephoneNumber: Optional[str] = None
identityGroups: Optional[List[str]] = None
customAttributes: Optional[Dict[str, Any]] = None
# Models for Endpoint resources
class Endpoint(BaseModel):
id: Optional[str] = None
macAddress: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
type: Optional[Dict[str, str]] = None
profileId: Optional[str] = None
profileName: Optional[str] = None
staticGroupAssignment: Optional[str] = None
dynamicGroupAssignment: Optional[List[str]] = None
customAttributes: Optional[Dict[str, Any]] = None
link: Optional[Link] = None
class EndpointRequest(BaseModel):
macAddress: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
type: Optional[Dict[str, str]] = None
profileId: Optional[str] = None
staticGroupAssignment: Optional[str] = None
customAttributes: Optional[Dict[str, Any]] = None
# Models for Network Device resources
class NetworkDevice(BaseModel):
id: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
type: Optional[str] = None
softwareVersion: Optional[str] = None
networkHierarchy: Optional[List[str]] = None
location: Optional[str] = None
deviceGroup: Optional[List[str]] = None
authenticationSettings: Optional[Dict[str, Any]] = None
modelInfo: Optional[Dict[str, Any]] = None
link: Optional[Link] = None
class NetworkDeviceRequest(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
type: Optional[str] = None
softwareVersion: Optional[str] = None
networkHierarchy: Optional[List[str]] = None
location: Optional[str] = None
deviceGroup: Optional[List[str]] = None
authenticationSettings: Optional[Dict[str, Any]] = None
modelInfo: Optional[Dict[str, Any]] = None
# Models for Authorization Profile resources
class AuthorizationProfile(BaseModel):
id: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
type: Optional[Dict[str, str]] = None
link: Optional[Link] = None
class AuthorizationProfileRequest(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
type: Optional[Dict[str, str]] = None
# Initialize FastMCP server
mcp = FastMCP("cisco_ise_mcp")
# Helper for ISE API calls with error handling
async def make_ise_request(method: str, endpoint: str, params: Optional[Dict] = None, data: Optional[Dict] = None) -> Dict[str, Any]:
"""
Make an authenticated request to the Cisco ISE ERS API.
Args:
method: HTTP method (GET, POST, PUT, DELETE).
endpoint: API endpoint path (e.g., 'ers/config/endpoint').
params: Query parameters (optional).
data: JSON payload for POST/PUT requests (optional).
Returns:
Dict containing the API response or an error message.
"""
url = f"{ISE_API_BASE_URL}/{endpoint.lstrip('/')}"
headers = await get_auth_headers()
try:
async with httpx.AsyncClient(verify=False, timeout=30.0) as client:
response = await client.request(method, url, headers=headers, params=params, json=data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
return {"error": "Authentication failed. Check your ISE credentials."}
elif e.response.status_code == 403:
return {"error": "Forbidden. Check your ISE permissions."}
elif e.response.status_code == 404:
return {"error": "Resource not found."}
else:
return {"error": f"ISE API error: {e.response.status_code} - {e.response.text}"}
except httpx.RequestError as e:
print(f"DEBUG: httpx.RequestError encountered: {e}")
return {"error": f"Network error: {str(e)}"}
except Exception as e:
print(f"DEBUG: Unexpected error encountered: {e}")
return {"error": f"Unexpected error: {str(e)}"}
# Person (Identity) Functions
@mcp.tool()
async def get_persons(filter_param: Optional[str] = None, size: Optional[int] = None, page: Optional[int] = 1) -> str:
"""
Retrieve a list of persons (identities) from Cisco ISE.
Args:
filter_param: Filter query (e.g., "name.firstName.CONTAINS.John") (optional).
size: Number of results per page (optional).
page: Page number (optional, default: 1).
Returns:
A JSON-formatted string listing persons.
"""
params = {"page": page}
if filter_param:
params["filter"] = filter_param
if size:
params["size"] = size
data = await make_ise_request("GET", "ers/config/person", params=params)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
persons = [Person(**person).dict() for person in data.get("SearchResult", {}).get("resources", [])]
if not persons:
return json.dumps({"message": "No persons found."}, indent=2)
return json.dumps(persons, indent=2)
@mcp.tool()
async def get_person(person_id: str) -> str:
"""
Retrieve details of a specific person.
Args:
person_id: The ID of the person to retrieve.
Returns:
A JSON-formatted string with person details.
"""
data = await make_ise_request("GET", f"ers/config/person/{person_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
person = Person(**data).dict()
return json.dumps(person, indent=2)
@mcp.tool()
async def create_person(person_data: Dict[str, Any]) -> str:
"""
Create a new person (identity).
Args:
person_data: Dictionary with person details (e.g., {'firstName': 'John', 'lastName': 'Doe', 'emailAddress': 'john@example.com'}).
Returns:
A JSON-formatted string with created person details.
"""
request_model = PersonRequest(**person_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("POST", "ers/config/person", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
person = Person(**data).dict()
return json.dumps({
"status": "success",
"created_person": person
}, indent=2)
@mcp.tool()
async def update_person(person_id: str, update_data: Dict[str, Any]) -> str:
"""
Update a person.
Args:
person_id: The ID of the person to update.
update_data: Dictionary with fields to update (e.g., {'firstName': 'Jane'}).
Returns:
A JSON-formatted string with updated person details.
"""
request_model = PersonRequest(**update_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("PUT", f"ers/config/person/{person_id}", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
person = Person(**data).dict()
return json.dumps({
"status": "success",
"updated_person": person
}, indent=2)
@mcp.tool()
async def delete_person(person_id: str) -> str:
"""
Delete a person.
Args:
person_id: The ID of the person to delete.
Returns:
A JSON-formatted string confirming deletion.
"""
data = await make_ise_request("DELETE", f"ers/config/person/{person_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
return json.dumps({
"status": "success",
"message": f"Person {person_id} deleted."
}, indent=2)
# Endpoint Functions
@mcp.tool()
async def get_endpoints(filter_param: Optional[str] = None, size: Optional[int] = None, page: Optional[int] = 1) -> str:
"""
Retrieve a list of endpoints from Cisco ISE.
Args:
filter_param: Filter query (e.g., "macAddress.EQ.00:11:22:33:44:55") (optional).
size: Number of results per page (optional).
page: Page number (optional, default: 1).
Returns:
A JSON-formatted string listing endpoints.
"""
params = {"page": page}
if filter_param:
params["filter"] = filter_param
if size:
params["size"] = size
data = await make_ise_request("GET", "ers/config/endpoint", params=params)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
endpoints = [Endpoint(**endpoint).dict() for endpoint in data.get("SearchResult", {}).get("resources", [])]
if not endpoints:
return json.dumps({"message": "No endpoints found."}, indent=2)
return json.dumps(endpoints, indent=2)
@mcp.tool()
async def get_endpoint(endpoint_id: str) -> str:
"""
Retrieve details of a specific endpoint.
Args:
endpoint_id: The ID of the endpoint to retrieve.
Returns:
A JSON-formatted string with endpoint details.
"""
data = await make_ise_request("GET", f"ers/config/endpoint/{endpoint_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
endpoint = Endpoint(**data).dict()
return json.dumps(endpoint, indent=2)
@mcp.tool()
async def create_endpoint(endpoint_data: Dict[str, Any]) -> str:
"""
Create a new endpoint.
Args:
endpoint_data: Dictionary with endpoint details (e.g., {'macAddress': '00:11:22:33:44:55', 'name': 'Test Device'}).
Returns:
A JSON-formatted string with created endpoint details.
"""
request_model = EndpointRequest(**endpoint_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("POST", "ers/config/endpoint", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
endpoint = Endpoint(**data).dict()
return json.dumps({
"status": "success",
"created_endpoint": endpoint
}, indent=2)
@mcp.tool()
async def update_endpoint(endpoint_id: str, update_data: Dict[str, Any]) -> str:
"""
Update an endpoint.
Args:
endpoint_id: The ID of the endpoint to update.
update_data: Dictionary with fields to update (e.g., {'name': 'Updated Device'}).
Returns:
A JSON-formatted string with updated endpoint details.
"""
request_model = EndpointRequest(**update_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("PUT", f"ers/config/endpoint/{endpoint_id}", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
endpoint = Endpoint(**data).dict()
return json.dumps({
"status": "success",
"updated_endpoint": endpoint
}, indent=2)
@mcp.tool()
async def delete_endpoint(endpoint_id: str) -> str:
"""
Delete an endpoint.
Args:
endpoint_id: The ID of the endpoint to delete.
Returns:
A JSON-formatted string confirming deletion.
"""
data = await make_ise_request("DELETE", f"ers/config/endpoint/{endpoint_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
return json.dumps({
"status": "success",
"message": f"Endpoint {endpoint_id} deleted."
}, indent=2)
# Network Device Functions
@mcp.tool()
async def get_network_devices(filter_param: Optional[str] = None, size: Optional[int] = None, page: Optional[int] = 1) -> str:
"""
Retrieve a list of network devices from Cisco ISE.
Args:
filter_param: Filter query (e.g., "name.CONTAINS.Switch") (optional).
size: Number of results per page (optional).
page: Page number (optional, default: 1).
Returns:
A JSON-formatted string listing network devices.
"""
params = {"page": page}
if filter_param:
params["filter"] = filter_param
if size:
params["size"] = size
data = await make_ise_request("GET", "ers/config/networkdevice", params=params)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
devices = [NetworkDevice(**device).dict() for device in data.get("SearchResult", {}).get("resources", [])]
if not devices:
return json.dumps({"message": "No network devices found."}, indent=2)
return json.dumps(devices, indent=2)
@mcp.tool()
async def get_network_device(device_id: str) -> str:
"""
Retrieve details of a specific network device.
Args:
device_id: The ID of the network device to retrieve.
Returns:
A JSON-formatted string with network device details.
"""
data = await make_ise_request("GET", f"ers/config/networkdevice/{device_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
device = NetworkDevice(**data).dict()
return json.dumps(device, indent=2)
@mcp.tool()
async def create_network_device(device_data: Dict[str, Any]) -> str:
"""
Create a new network device.
Args:
device_data: Dictionary with network device details (e.g., {'name': 'Test Switch', 'type': 'Cisco_Switch'}).
Returns:
A JSON-formatted string with created network device details.
"""
request_model = NetworkDeviceRequest(**device_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("POST", "ers/config/networkdevice", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
device = NetworkDevice(**data).dict()
return json.dumps({
"status": "success",
"created_device": device
}, indent=2)
@mcp.tool()
async def update_network_device(device_id: str, update_data: Dict[str, Any]) -> str:
"""
Update a network device.
Args:
device_id: The ID of the network device to update.
update_data: Dictionary with fields to update (e.g., {'name': 'Updated Switch'}).
Returns:
A JSON-formatted string with updated network device details.
"""
request_model = NetworkDeviceRequest(**update_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("PUT", f"ers/config/networkdevice/{device_id}", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
device = NetworkDevice(**data).dict()
return json.dumps({
"status": "success",
"updated_device": device
}, indent=2)
@mcp.tool()
async def delete_network_device(device_id: str) -> str:
"""
Delete a network device.
Args:
device_id: The ID of the network device to delete.
Returns:
A JSON-formatted string confirming deletion.
"""
data = await make_ise_request("DELETE", f"ers/config/networkdevice/{device_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
return json.dumps({
"status": "success",
"message": f"Network device {device_id} deleted."
}, indent=2)
# Authorization Profile Functions
@mcp.tool()
async def get_authorization_profiles(filter_param: Optional[str] = None, size: Optional[int] = None, page: Optional[int] = 1) -> str:
"""
Retrieve a list of authorization profiles from Cisco ISE.
Args:
filter_param: Filter query (e.g., "name.CONTAINS.VPN") (optional).
size: Number of results per page (optional).
page: Page number (optional, default: 1).
Returns:
A JSON-formatted string listing authorization profiles.
"""
params = {"page": page}
if filter_param:
params["filter"] = filter_param
if size:
params["size"] = size
data = await make_ise_request("GET", "ers/config/authorizationprofile", params=params)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
profiles = [AuthorizationProfile(**profile).dict() for profile in data.get("SearchResult", {}).get("resources", [])]
if not profiles:
return json.dumps({"message": "No authorization profiles found."}, indent=2)
return json.dumps(profiles, indent=2)
@mcp.tool()
async def get_authorization_profile(profile_id: str) -> str:
"""
Retrieve details of a specific authorization profile.
Args:
profile_id: The ID of the authorization profile to retrieve.
Returns:
A JSON-formatted string with authorization profile details.
"""
data = await make_ise_request("GET", f"ers/config/authorizationprofile/{profile_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
profile = AuthorizationProfile(**data).dict()
return json.dumps(profile, indent=2)
@mcp.tool()
async def create_authorization_profile(profile_data: Dict[str, Any]) -> str:
"""
Create a new authorization profile.
Args:
profile_data: Dictionary with authorization profile details (e.g., {'name': 'Test Profile', 'description': 'Test'}).
Returns:
A JSON-formatted string with created authorization profile details.
"""
request_model = AuthorizationProfileRequest(**profile_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("POST", "ers/config/authorizationprofile", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
profile = AuthorizationProfile(**data).dict()
return json.dumps({
"status": "success",
"created_profile": profile
}, indent=2)
@mcp.tool()
async def update_authorization_profile(profile_id: str, update_data: Dict[str, Any]) -> str:
"""
Update an authorization profile.
Args:
profile_id: The ID of the authorization profile to update.
update_data: Dictionary with fields to update (e.g., {'name': 'Updated Profile'}).
Returns:
A JSON-formatted string with updated authorization profile details.
"""
request_model = AuthorizationProfileRequest(**update_data)
request_dict = {k: v for k, v in request_model.dict().items() if v is not None}
data = await make_ise_request("PUT", f"ers/config/authorizationprofile/{profile_id}", data=request_dict)
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
profile = AuthorizationProfile(**data).dict()
return json.dumps({
"status": "success",
"updated_profile": profile
}, indent=2)
@mcp.tool()
async def delete_authorization_profile(profile_id: str) -> str:
"""
Delete an authorization profile.
Args:
profile_id: The ID of the authorization profile to delete.
Returns:
A JSON-formatted string confirming deletion.
"""
data = await make_ise_request("DELETE", f"ers/config/authorizationprofile/{profile_id}")
if "error" in data:
return json.dumps({"error": data["error"]}, indent=2)
return json.dumps({
"status": "success",
"message": f"Authorization profile {profile_id} deleted."
}, indent=2)
@mcp.resource("greeting: //{name}")
def greeting(name: str) -> str:
"""
Greet a user by name.
Args:
name: The name to include in the greeting.
Returns:
A greeting message.
"""
return f"Hello {name}! Welcome to Cisco ISE API tools."
if __name__ == "__main__":
mcp.run(transport="stdio") # Use stdio for integration