# src/fctr_okta_mcp/tools/okta/policy_tools.py
"""
Policy and Network Zone Tools for Okta MCP Server.
Tools for managing Okta policies, policy rules, and network zones.
"""
from typing import Annotated
from fastmcp import FastMCP, Context
from fctr_okta_mcp.utils.logger import get_logger, log_audit_event
from fctr_okta_mcp.utils.response_compressor import compress_response
logger = get_logger(__name__)
def register_policy_tools(mcp: FastMCP):
"""Register all policy-related tools with the MCP server."""
logger.debug("Registering policy tools...")
@mcp.tool(
tags={"okta", "policy", "rules", "direct"},
annotations={
"category": "policy",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_policy_list_rules(
ctx: Context,
policyId: Annotated[str, "The ID of the policy to list rules for (required)"]
) -> dict:
"""
**CALL read_system_instructions() FIRST!**
List all rules for a specific Okta policy.
## Path Parameters
- **policyId** (String, Required): The ID of the policy.
## Response Fields
Returns array of policy rules, each containing:
- **id**: Rule identifier
- **name**: Rule name
- **status**: Rule status (ACTIVE, INACTIVE)
- **priority**: Rule priority (lower = higher priority)
- **system**: Whether this is a system-managed rule
- **conditions**: Rule trigger conditions
- **actions**: Actions taken when rule matches
## Policy Rule Information
- Rule names and descriptions
- Activation status (ACTIVE, INACTIVE)
- Priority ordering within policy
- Condition expressions and logic
- Actions taken when rule matches
## Common Rule Types
- Authentication policies (MFA requirements)
- Authorization policies (access controls)
- Password policies (complexity rules)
- Sign-on policies (SSO behaviors)
## Rule Conditions Include
- Network zones and IP ranges
- User and group memberships
- Application context
- Device and platform requirements
- Risk and context factors
## Actions and Behaviors
- MFA factor requirements
- Session management
- Access grants/denials
- Redirections and workflows
## Common Use Cases
- Policy rule audit and review
- Security compliance assessment
- Troubleshoot access issues
- Rule optimization and cleanup
- Access control verification
Returns SAMPLE DATA (3 results) + endpoint metadata for code generation.
"""
await ctx.info(f"Fetching rules for policy: {policyId}")
from fctr_okta_mcp.client.base_okta_api_client import OktaAPIClient
client = OktaAPIClient(ctx=ctx)
await ctx.info(f"Calling Okta API: GET /api/v1/policies/{policyId}/rules...")
result = await client.make_request(
method="GET",
endpoint=f"/api/v1/policies/{policyId}/rules",
params={"limit": 3},
max_results=3
)
if result.get("status") == "success":
data = result.get("data", [])
rule_names = [r.get("name", "unknown") for r in data]
await ctx.info(f"Retrieved {len(data)} rule(s): {', '.join(rule_names) if rule_names else 'none'}")
result["_reminder"] = "This is SAMPLE DATA (3 results). Generate Python code using execute_code() to get full results."
result["policy_id"] = policyId
log_audit_event(
action="okta_policy_list_rules",
status="success",
details={"policy_id": policyId, "result_count": len(data)}
)
else:
await ctx.warning(f"API returned status: {result.get('status')}")
log_audit_event(
action="okta_policy_list_rules",
status="error",
details={"policy_id": policyId, "api_status": result.get("status")}
)
return compress_response(result)
@mcp.tool(
tags={"okta", "policy", "rules", "direct"},
annotations={
"category": "policy",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_policy_get_rule(
ctx: Context,
policyId: Annotated[str, "The ID of the policy that contains the rule (required)"],
ruleId: Annotated[str, "The ID of the specific rule to retrieve (required)"]
) -> dict:
"""
**CALL read_system_instructions() FIRST!**
Get detailed information about a specific Okta policy rule.
## Path Parameters
- **policyId** (String, Required): The ID of the policy containing the rule.
- **ruleId** (String, Required): The ID of the specific rule to retrieve.
## Response Fields
Returns comprehensive rule configuration including:
- **id**: Rule identifier
- **name**: Rule name
- **status**: Rule status (ACTIVE, INACTIVE)
- **priority**: Priority ordering
- **system**: Whether system-managed
- **conditions**: Full condition configuration
- **actions**: Full action configuration
## Authentication Rule Information
- Required MFA factors and methods
- Factor sequencing and fallbacks
- Enrollment requirements
- Verification policies
## Network Zone Constraints
- Allowed/blocked IP ranges
- Geographic restrictions
- Proxy and VPN handling
- Dynamic zone evaluation
## Access Control Actions
- Grant/deny decisions
- Step-up authentication triggers
- Session duration and management
- Redirect behaviors
## Risk and Context Factors
- Device trust requirements
- Location-based rules
- Behavioral analysis integration
- Threat intelligence inputs
## Common Use Cases
- Detailed rule configuration review
- Security policy troubleshooting
- Compliance audit requirements
- Rule modification planning
- Access control verification
"""
await ctx.info(f"Fetching rule {ruleId} for policy {policyId}")
from fctr_okta_mcp.client.base_okta_api_client import OktaAPIClient
client = OktaAPIClient(ctx=ctx)
await ctx.info(f"Calling Okta API: GET /api/v1/policies/{policyId}/rules/{ruleId}...")
result = await client.make_request(
method="GET",
endpoint=f"/api/v1/policies/{policyId}/rules/{ruleId}",
params={}
)
if result.get("status") == "success":
data = result.get("data", {})
rule_name = data.get("name", "unknown")
rule_status = data.get("status", "unknown")
await ctx.info(f"Retrieved rule: {rule_name} (status: {rule_status})")
log_audit_event(
action="okta_policy_get_rule",
status="success",
details={"policy_id": policyId, "rule_id": ruleId, "name": rule_name}
)
else:
await ctx.warning(f"API returned status: {result.get('status')}")
log_audit_event(
action="okta_policy_get_rule",
status="error",
details={"policy_id": policyId, "rule_id": ruleId, "api_status": result.get("status")}
)
return compress_response(result)
@mcp.tool(
tags={"okta", "network", "zones", "direct"},
annotations={
"category": "policy",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_network_list_zones(
ctx: Context,
filter: Annotated[str, "Filter zones by type (IP, DYNAMIC) or status (ACTIVE, INACTIVE). Example: 'type eq \"IP\"'"] = ""
) -> dict:
"""
**CALL read_system_instructions() FIRST!**
List all network zones defined in the Okta organization.
## Query Parameters
- **filter** (String, Optional): Filter by type or status.
## Response Fields
Returns array of network zone objects:
- **id**: Zone identifier
- **name**: Zone name
- **type**: Zone type (IP, DYNAMIC)
- **status**: Zone status (ACTIVE, INACTIVE)
- **usage**: Zone usage context
- **gateways**: Gateway IP configurations
- **proxies**: Proxy IP configurations
## Network Zone Types
- **IP**: Static IP address ranges and CIDR blocks
- **DYNAMIC**: Dynamic zones based on location or other criteria
- **BLOCKLIST**: IP ranges to block or restrict
- **POLICY**: Policy-specific network constraints
## Zone Information Includes
- Zone identification (ID, name, type)
- IP address ranges and gateway lists
- Proxy and ASN configurations
- Geographic location data
- Usage and application assignments
## IP Zone Details
- Static IP ranges (CIDR notation)
- Gateway IP addresses
- Proxy IP configurations
- ASN (Autonomous System Number) lists
## Dynamic Zone Criteria
- Geographic locations and countries
- ISP and carrier information
- Risk assessment factors
- Behavioral analysis inputs
## Zone Status Information
- ACTIVE: Currently enforced zones
- INACTIVE: Disabled or suspended zones
- Usage statistics and policy assignments
## Common Use Cases
- Network security policy review
- IP allowlist and blocklist management
- Geographic access control audit
- Compliance and regulatory reporting
- Network zone optimization
Returns SAMPLE DATA (3 results) + endpoint metadata for code generation.
"""
filter_log = f"filter='{filter}'" if filter else "no filters"
await ctx.info(f"Fetching network zones ({filter_log})")
from fctr_okta_mcp.client.base_okta_api_client import OktaAPIClient
client = OktaAPIClient(ctx=ctx)
params = {"limit": 3}
if filter:
params["filter"] = filter
await ctx.info("Calling Okta API: GET /api/v1/zones...")
result = await client.make_request(
method="GET",
endpoint="/api/v1/zones",
params=params,
max_results=3
)
if result.get("status") == "success":
data = result.get("data", [])
zone_names = [z.get("name", "unknown") for z in data]
await ctx.info(f"Retrieved {len(data)} zone(s): {', '.join(zone_names) if zone_names else 'none'}")
result["_reminder"] = "This is SAMPLE DATA (3 results). Generate Python code using execute_code() to get full results."
log_audit_event(
action="okta_network_list_zones",
status="success",
details={"filter": filter_log, "result_count": len(data)}
)
else:
await ctx.warning(f"API returned status: {result.get('status')}")
log_audit_event(
action="okta_network_list_zones",
status="error",
details={"filter": filter_log, "api_status": result.get("status")}
)
return compress_response(result)
@mcp.tool(
tags={"okta", "network", "zones", "direct"},
annotations={
"category": "policy",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_network_get_zone(
ctx: Context,
zoneId: Annotated[str, "The ID of the network zone to retrieve (required)"]
) -> dict:
"""
**CALL read_system_instructions() FIRST!**
Get detailed information about a specific network zone.
## Path Parameters
- **zoneId** (String, Required): The ID of the network zone.
## Response Fields
- **id**: Zone identifier
- **name**: Zone name
- **type**: Zone type (IP, DYNAMIC)
- **status**: Zone status (ACTIVE, INACTIVE)
- **usage**: Zone usage context (POLICY, BLOCKLIST)
- **gateways**: List of gateway IP configurations
- **proxies**: List of proxy IP configurations
- **created**: When the zone was created
- **lastUpdated**: When the zone was last modified
## Gateway Configuration
- **type**: Gateway type (CIDR, RANGE)
- **value**: IP address, CIDR block, or range
## Common Use Cases
- Get zone details before policy changes
- Verify IP ranges in a zone
- Audit zone configuration
- Troubleshoot network-based access rules
"""
await ctx.info(f"Fetching network zone: {zoneId}")
from fctr_okta_mcp.client.base_okta_api_client import OktaAPIClient
client = OktaAPIClient(ctx=ctx)
await ctx.info(f"Calling Okta API: GET /api/v1/zones/{zoneId}...")
result = await client.make_request(
method="GET",
endpoint=f"/api/v1/zones/{zoneId}",
params={}
)
if result.get("status") == "success":
data = result.get("data", {})
zone_name = data.get("name", "unknown")
zone_type = data.get("type", "unknown")
await ctx.info(f"Retrieved zone: {zone_name} (type: {zone_type})")
log_audit_event(
action="okta_network_get_zone",
status="success",
details={"zone_id": zoneId, "name": zone_name, "type": zone_type}
)
else:
await ctx.warning(f"API returned status: {result.get('status')}")
log_audit_event(
action="okta_network_get_zone",
status="error",
details={"zone_id": zoneId, "api_status": result.get("status")}
)
return compress_response(result)
logger.info("Policy tools registered successfully")