# src/fctr_okta_mcp/tools/meta_tools.py
"""
Meta-Tools for ReAct Workflow
These tools guide the LLM through discovering and using Okta APIs.
Tools are registered with different tags for mode filtering:
- Agent Mode: tools tagged with "agent"
- Direct Mode: tools tagged with "direct"
"""
import os
import time
import asyncio
import json
from datetime import datetime, timezone, timedelta
from typing import Annotated
from fastmcp import FastMCP, Context
from fastmcp.exceptions import ToolError
from fctr_okta_mcp.prompts import get_code_generation_prompt as load_code_gen_prompt
from fctr_okta_mcp.utils.logger import get_logger, log_audit_event
from fctr_okta_mcp.security.subprocess_executor import execute_code_subprocess
from fctr_okta_mcp.utils.response_compressor import compress_response
# Server-side logger
logger = get_logger(__name__)
def register_meta_tools(mcp: FastMCP):
"""
Register all meta-tools for both Agent and Direct modes.
Agent Mode gets:
- get_available_operations (discovery)
- get_operation_details (schema)
- get_code_generation_prompt (template)
- execute_code (execution)
Direct Mode gets:
- get_code_generation_prompt (template)
- execute_code (execution)
- read_system_instructions (workflow guide)
"""
logger.debug("Registering meta-tools...")
# ===============================================
# DIRECT MODE ONLY TOOL (tags={"direct"})
# ===============================================
@mcp.tool(
tags={"direct"},
annotations={
"category": "workflow",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def read_system_instructions(ctx: Context) -> dict:
"""
**CALL THIS FIRST BEFORE USING ANY okta_* TOOLS!**
Returns critical system instructions for how to use the Okta API tools correctly.
The okta_* tools return sample data, but you MUST generate Python code for actual queries.
"""
await ctx.info("Loading Okta MCP workflow instructions...")
return {
"status": "success",
"instructions": """
CRITICAL INSTRUCTIONS FOR OKTA TOOLS:
The okta_* tools (okta_user_list, etc.) return SAMPLE DATA (3 results) + endpoint metadata.
Use the sample data to understand the response structure, then generate code.
STRICT RULES - NO HALLUCINATIONS:
- You may ONLY use okta_* tools that are visible to you.
- You may ONLY use API endpoints from tools you have called.
- If a user asks for data that requires a tool you don't have, you MUST refuse and explain what tools ARE available.
- NEVER generate code using endpoints from tools you haven't seen.
EXAMPLE REFUSAL:
User: "List users with their groups and factors"
If you only have okta_user_list (no okta_user_list_groups or okta_user_list_factors):
> "I can list users, but I don't have access to the group membership or MFA factors tools.
> Available tools: okta_user_list, okta_group_list
> I can only provide user data without group/factor information."
WORKFLOW:
1. Check what okta_* tools are available to you
- If a required tool is missing, tell the user
2. Call the relevant okta_* tool(s) to get:
- Sample data (3 results) - use this to understand response structure
- endpoint: The API endpoint to use
- method: The HTTP method
3. Call get_code_generation_prompt(operations, user_query, operation_details, mode='direct')
- ONLY include operations for tools you actually have
- IMPORTANT: Set mode='direct' to get instructions optimized for this workflow
4. Generate Python code using ONLY endpoints from tools you called
- Response format: {"status": "success", "data": [...]}
- Always extract: response.get("data", [])
- Use asyncio.gather() for concurrent calls (asyncio is pre-injected)
5. Call execute_code(code, is_test=False) for FULL results
IMPORTANT:
- DO NOT return the okta_* sample results as the final answer
- DO NOT use endpoints from tools you don't have access to
"""
}
# ===============================================
# AGENT MODE ONLY TOOLS (tags={"meta", "agent"})
# ===============================================
@mcp.tool(
tags={"meta", "agent"},
annotations={
"category": "meta",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def get_available_operations(
ctx: Context,
category: Annotated[str, "Filter by category: user, group, application, policy, etc. Empty string for all."] = ""
) -> dict:
"""
**CALL THIS FIRST!** Start here to discover what Okta API operations are available.
This is your entry point for the Okta API workflow:
1. Call this to see available operations
2. Call get_operation_details() for specific operations you need
3. Call get_code_generation_prompt() to get code template
4. Generate Python code using client.make_request()
5. Call execute_code() to test and execute
Returns list of available Okta API operations like 'user.list', 'group.get', etc.
Returns operation names like 'user.list', 'group.get', etc.
Use this to discover what operations are available before querying details.
"""
await ctx.info("Discovering available Okta API operations...")
try:
from fctr_okta_mcp.server import get_operations_catalog
# Use cached catalog - pass category filter directly
catalog = get_operations_catalog(mcp, category=category)
if category:
await ctx.info(f"Filtered to {catalog['count']} operations in '{category}' category")
else:
await ctx.info(f"Found {catalog['count']} total operations across all categories")
return catalog
except Exception as e:
raise ToolError(f"Failed to fetch operations: {str(e)}")
@mcp.tool(
tags={"meta", "agent"},
annotations={
"category": "meta",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def get_operation_details(
ctx: Context,
operation: Annotated[str, "Operation name in dot notation (e.g., 'user.list', 'group.get')"]
) -> dict:
"""
Get detailed schema for a specific Okta API operation.
Returns endpoint, method, parameters, and usage examples.
OKTA DOMAIN KNOWLEDGE YOU MUST APPLY:
User Status Values:
- ACTIVE: Can authenticate (default filter unless user asks otherwise)
- STAGED: Not activated yet
- PROVISIONED: Provisioned but not activated
- PASSWORD_RESET/PASSWORD_EXPIRED: Password issues
- LOCKED_OUT: Failed login attempts
- SUSPENDED: Temporarily suspended
- DEPROVISIONED: Deleted
Default Filters (apply unless user specifies otherwise):
- Users: status eq "ACTIVE"
- Applications: status eq "ACTIVE"
Essential Fields by Entity (ALWAYS include):
- Users: id, email, login, firstName, lastName, status
- Groups: id, name, description, type
- Applications: id, label, name, status, signOnMode
- DO NOT include timestamp fields unless explicitly requested
SCIM Filter Syntax (for 'search' parameter):
- Operators: eq, sw, co, gt, lt, ge, le, pr, and, or, not
- Examples: status eq "ACTIVE", profile.email sw "admin@"
- CRITICAL: Cannot mix 'q' (simple search) with 'search' (SCIM filter)
Default Sorting:
- Users: profile.lastName ascending
- Groups: name ascending
- Applications: label ascending
NEXT STEP: Use this info with get_code_generation_prompt() to generate Python code.
Returns endpoint, HTTP method, parameters, filters, and usage example.
Call this after selecting operations from get_available_operations().
"""
await ctx.info(f"Loading schema for operation: {operation}")
if not operation:
raise ToolError("operation parameter is required")
# Map operation name to tool name (user.list -> okta_user_list)
tool_name = f"okta_{operation.replace('.', '_')}"
try:
# Get tool details from server registry
from fctr_okta_mcp.server import get_tool_details
details = get_tool_details(mcp, tool_name)
if not details:
await ctx.warning(f"Operation '{operation}' not found in registry")
raise ToolError(f"Operation '{operation}' not found")
await ctx.info(f"Loaded schema with {len(details.get('parameters', {}))} parameters")
return {
"operation": operation,
"tool_name": tool_name,
**details,
"example": _get_operation_example(operation)
}
except ToolError:
raise
except Exception as e:
raise ToolError(f"Failed to get operation details: {str(e)}")
@mcp.tool(
tags={"meta", "agent", "execution", "direct"}, # Visible in both modes
annotations={
"category": "meta",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def get_code_generation_prompt(
ctx: Context,
operations: Annotated[list[str], "List of operation names to use, e.g. ['user.list', 'group.get']"],
user_query: Annotated[str, "The user's original query/question"],
operation_details: Annotated[list[dict], "Details from get_operation_details() for each operation"],
mode: Annotated[str, "Workflow mode: 'agent' (default) or 'direct'"] = "agent"
) -> dict:
"""
Generate code template and best practices for your Okta query.
YOUR CODE MUST FOLLOW THIS STRUCTURE:
```python
async def execute_query(client):
# 1. Make API calls using client.make_request()
response = await client.make_request(
endpoint="/api/v1/users",
method="GET",
params={"search": 'status eq "ACTIVE"'}, # Optional filters - ONLY use them when the user requests
entity_label="users" # Optional: for progress tracking
)
# 2. For concurrent calls, use asyncio.gather()
tasks = [client.make_request(...) for item in items]
results = await asyncio.gather(*tasks)
# 3. Return list of dicts
return all_results
```
OKTA API CLIENT METHODS (ONLY these exist!):
- client.make_request(endpoint, method, params, entity_label)
- await client.start_entity_progress(label, total)
- await client.update_entity_progress(label, processed) # NOT 'current'!
- await client.complete_entity_progress(label, success)
- client.concurrent_limit # Use for batch sizes
FORBIDDEN (these don't exist):
FORBIDDEN: client.log_progress(), client.get(), client.post()
PERFORMANCE REQUIREMENTS:
- Use client.concurrent_limit for batch sizes (never hardcode)
- Use asyncio.gather() for concurrent API calls
- Map results by index (order preserved in gather)
- Use progress tracking for operations with known totals
- All progress methods are ASYNC - must await them
NEXT STEP: Write your Python code, then call execute_code(code, is_test=True) to test it.
Combines operation details with user query to create a code generation prompt.
The generated code should use OktaAPIClient and handle pagination.
"""
await ctx.info(f"Generating code template for {len(operations)} operation(s): {', '.join(operations)}")
if not operations:
raise ToolError("At least one operation is required")
if not user_query:
raise ToolError("user_query is required")
# Build operations summary
ops_summary = _format_operations(operation_details)
# Load the code generation template and add context
is_direct_mode = (mode == "direct")
base_prompt = load_code_gen_prompt(ops_summary, is_direct_mode=is_direct_mode)
prompt = f'''
# Code Generation Task
## User Query
{user_query}
## Available Operations
{ops_summary}
{base_prompt}
'''
return {
"prompt": prompt,
"operations": operations,
"user_query": user_query
}
@mcp.tool(
tags={"meta", "agent", "execution", "direct"}, # Visible in both modes
annotations={
"category": "meta",
"readOnlyHint": False, # Code execution requires approval
"idempotentHint": False
}
)
async def execute_code(
ctx: Context,
code: Annotated[str, "Python async function code to execute"],
is_test: Annotated[bool, "True for test run (3 results max), False for full production run"] = True
) -> dict:
"""
Execute your generated Python code against the Okta API.
TWO MODES:
is_test=True (DEFAULT - Test Mode):
- Returns max 3 results for validation
- No CSV file created
- Use to verify code works before full run
is_test=False (Production Mode):
- Fetches ALL data (no limits)
- Saves results to CSV file
- Use after test run succeeds
WORKFLOW:
1. Call okta_* tools to see sample data and understand structure
2. Generate Python code based on those samples
3. Call execute_code(code, is_test=True) to TEST the code
4. If test succeeds, call execute_code(code, is_test=False) for full results
CODE REQUIREMENTS:
- Must define: async def execute_query(client)
- Must return: list of dictionaries (flattened for CSV)
- Must use: client.make_request(endpoint, method, params)
- Response format: {"status": "success", "data": [...]}
- Use asyncio.gather() for concurrent API calls
- Flatten nested data using colon format (e.g., "name:id")
"""
# Ensure is_test is a boolean
if isinstance(is_test, str):
is_test = is_test.lower() == "true"
mode = "TEST" if is_test else "PRODUCTION"
# === User Elicitation for Confirmation ===
# Show full code for user review before execution
elicit_message = f"""## Code Execution Confirmation
**Code to execute:**
```python
{code}
```
Review the code above and confirm execution against your Okta tenant.
"""
try:
# Request user confirmation via MCP elicitation
# Using response_type=None for simple Accept/Cancel dialog (no form fields)
elicit_result = await ctx.elicit(
message=elicit_message,
response_type=None
)
# Handle elicitation response
if elicit_result.action == "decline":
logger.info("User declined code execution")
return {
"success": False,
"is_test": is_test,
"result_count": 0,
"results": [],
"message": "Execution declined by user."
}
elif elicit_result.action == "cancel":
logger.info("User cancelled code execution")
return {
"success": False,
"is_test": is_test,
"result_count": 0,
"results": [],
"message": "Execution cancelled by user."
}
# action == "accept" - user clicked Accept button
elif elicit_result.action == "accept":
# User confirmed - proceed with execution
logger.info(f"User confirmed {mode} code execution")
except Exception as elicit_error:
# Elicitation not supported by client - log and continue
# This allows the tool to work with clients that don't support elicitation
logger.warning(f"Elicitation not available (client may not support it): {elicit_error}")
await ctx.info("Note: User confirmation skipped (not supported by client)")
await ctx.info(f"{mode} mode - Preparing to execute code...")
await ctx.report_progress(progress=0, total=100)
if not code:
raise ToolError("code parameter is required")
# Validate code has required function
if "async def execute_query" not in code:
raise ToolError("Code must define 'async def execute_query(client)' function")
# Start timing for audit
start_time = time.time()
logger.info(f"Executing code in {mode} mode")
await ctx.info("Executing code in isolated subprocess...")
await ctx.report_progress(progress=10, total=100)
# Track progress state for incremental updates
progress_state = {"current": 10, "last_update": time.time()}
# Progress callback to stream updates to MCP client
# CRITICAL: Must call report_progress() to reset MCP timeout clock!
async def progress_callback(message: str):
await ctx.info(f"[subprocess] {message}")
# Increment progress slightly with each update (10 -> 75 range)
# This keeps the connection alive by resetting the timeout
if progress_state["current"] < 75:
progress_state["current"] = min(75, progress_state["current"] + 1)
# Always report progress to reset the MCP client timeout
await ctx.report_progress(progress=progress_state["current"], total=100)
progress_state["last_update"] = time.time()
try:
# Execute in subprocess with streaming
exec_result = await execute_code_subprocess(
code=code,
is_test=is_test,
# timeout uses OKTA_MCP_EXECUTION_TIMEOUT_SECONDS env var (default 300s)
progress_callback=progress_callback
)
await ctx.report_progress(progress=80, total=100)
if not exec_result.success:
await ctx.error(f"Execution failed: {exec_result.error}")
log_audit_event(
action="execute_code",
status="error",
details={
"is_test": is_test,
"error": exec_result.error,
"stderr": exec_result.stderr[:500] if exec_result.stderr else None
}
)
raise ToolError(f"Code execution failed: {exec_result.error}")
results = exec_result.results
result_count = len(results) if isinstance(results, list) else 1
# Enforce test limit
if is_test and isinstance(results, list) and len(results) > 3:
await ctx.info(f"Test mode: showing 3 of {len(results)} results")
results = results[:3]
result_count = 3
csv_path = None
csv_url = None
csv_filename = None
# Save to CSV only in production mode
if not is_test and isinstance(results, list) and results:
from fctr_okta_mcp.resources.csv_output import save_results_csv, FILE_EXPIRY_SECONDS
csv_path, csv_filename = save_results_csv(results)
# Construct URL if HTTP transport is being used
http_base_url = os.getenv("_MCP_HTTP_BASE_URL")
if http_base_url and csv_filename:
csv_url = f"{http_base_url}/results/{csv_filename}"
await ctx.info(f"Saved {result_count} results - Download: {csv_url}")
await ctx.info(f"Note: CSV file expires in {FILE_EXPIRY_SECONDS // 60} minutes")
else:
await ctx.info(f"Saved {result_count} results to: {csv_path}")
await ctx.report_progress(progress=100, total=100)
# Calculate duration and log audit event
duration_ms = (time.time() - start_time) * 1000
log_audit_event(
action="execute_code",
status="success",
details={
"is_test": is_test,
"result_count": result_count,
"csv_file": csv_path,
"csv_url": csv_url,
"execution_time_ms": exec_result.execution_time_ms
},
duration_ms=duration_ms
)
logger.info(f"Code execution completed: {result_count} results in {duration_ms:.0f}ms")
# Return results (show max 3 in response even for production, full data in CSV)
# Compress results to save context size
sample_results = results[:3] if isinstance(results, list) and len(results) > 3 else results
if isinstance(sample_results, list):
sample_results = [compress_response(r) if isinstance(r, dict) else r for r in sample_results]
elif isinstance(sample_results, dict):
sample_results = compress_response(sample_results)
# Build response with URL if available
response = {
"success": True,
"is_test": is_test,
"result_count": result_count,
"results": sample_results,
}
if csv_url:
response["csv_url"] = csv_url
response["csv_expires_in_minutes"] = FILE_EXPIRY_SECONDS // 60
response["message"] = f"{'TEST' if is_test else 'PRODUCTION'} run complete. {result_count} results.\n\n📥 **Download CSV:** [{csv_filename}]({csv_url})\n\n⏰ *File expires in {FILE_EXPIRY_SECONDS // 60} minutes*"
elif csv_path:
response["csv_file"] = csv_path
response["message"] = f"{'TEST' if is_test else 'PRODUCTION'} run complete. {result_count} results. Saved to: {csv_path}"
else:
response["message"] = f"{'TEST' if is_test else 'PRODUCTION'} run complete. {result_count} results."
return response
except ToolError:
raise
except Exception as e:
await ctx.error(f"Execution failed: {str(e)}")
log_audit_event(
action="execute_code",
status="error",
details={"is_test": is_test, "error_type": type(e).__name__},
error=e
)
raise ToolError(f"Code execution failed: {str(e)}")
def _get_operation_example(operation: str) -> dict:
"""Return example usage for common operations."""
examples = {
"user.list": {
"description": "List users with optional search/filter",
"code": '''result = await client.make_request(
method="GET",
endpoint="/api/v1/users",
params={"search": 'profile.email sw "admin"', "limit": 200}
)'''
},
"user.get": {
"description": "Get a specific user by ID or login",
"code": '''result = await client.make_request(
method="GET",
endpoint=f"/api/v1/users/{user_id}"
)'''
},
"group.list": {
"description": "List groups with optional search",
"code": '''result = await client.make_request(
method="GET",
endpoint="/api/v1/groups",
params={"q": "Engineering", "limit": 200}
)'''
},
"group.get": {
"description": "Get a specific group by ID",
"code": '''result = await client.make_request(
method="GET",
endpoint=f"/api/v1/groups/{group_id}"
)'''
},
"application.list": {
"description": "List applications",
"code": '''result = await client.make_request(
method="GET",
endpoint="/api/v1/apps",
params={"limit": 200}
)'''
},
}
return examples.get(operation, {"description": "See parameters for usage", "code": ""})
def _format_operations(operation_details: list[dict]) -> str:
"""Format operation details for the prompt."""
formatted = []
for op in operation_details:
formatted.append(f"""
### {op.get('operation', 'Unknown')}
- Description: {op.get('description', 'N/A')}
- Parameters: {op.get('parameters', {})}
- Example: {op.get('example', {}).get('code', 'N/A')}
""")
return "\n".join(formatted)