# src/fctr_okta_mcp/tools/okta/log_tools.py
"""
System Log Tools for Okta MCP Server.
Tools for querying and analyzing Okta system log events.
"""
from typing import Annotated
from fastmcp import FastMCP, Context
from fctr_okta_mcp.utils.logger import get_logger, log_audit_event
from fctr_okta_mcp.utils.response_compressor import compress_response
logger = get_logger(__name__)
def register_log_tools(mcp: FastMCP):
"""Register all log event tools with the MCP server."""
logger.debug("Registering log event tools...")
@mcp.tool(
tags={"okta", "log", "events", "audit", "direct"},
annotations={
"category": "log",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_log_get_events(
ctx: Context,
since: Annotated[str, "Starting time in ISO 8601 format. Use `okta_datetime_parse_relative` tool (e.g., '24 hours ago') or `okta_datetime_now` tool with buffer_hours."] = "",
until: Annotated[str, "Ending time in ISO 8601 format. Use `okta_datetime_parse_relative` tool (e.g., 'now') or `okta_datetime_now` tool for current time."] = "",
filter: Annotated[str, "Okta expression language filter. Examples: 'eventType eq \"user.authentication.auth\"', 'outcome.result eq \"FAILURE\"'"] = "",
q: Annotated[str, "Free-text search term to search across event data (usernames, emails, IPs, etc.)"] = "",
sortOrder: Annotated[str, "Order of results: 'ASCENDING' (oldest first) or 'DESCENDING' (newest first, default)"] = "DESCENDING"
) -> dict:
"""
**CALL read_system_instructions() FIRST!**
Get Okta system log events with comprehensive filtering.
Returns detailed log events from Okta system logs including authentication,
user management, application access, policy changes, and administrative actions.
## Time Parameters
- **since**: Start time in ISO 8601 format
- **until**: End time in ISO 8601 format
- Use `okta_datetime_parse_relative` tool: "24 hours ago", "7 days ago", "1 week ago", "now"
- Use `okta_datetime_now` tool: Get current UTC time with optional buffer_hours offset
## Filter Parameter
Uses Okta expression language for precise event filtering:
- `eventType eq "user.authentication.auth"` - Authentication events
- `eventType eq "user.lifecycle.create"` - User creation events
- `eventType eq "user.lifecycle.activate"` - User activation events
- `eventType eq "user.lifecycle.suspend"` - User suspension events
- `eventType eq "application.lifecycle.create"` - App creation events
- `outcome.result eq "SUCCESS"` - Successful events only
- `outcome.result eq "FAILURE"` - Failed events only
- `actor.id eq "user_id"` - Events by specific user
- `target.id eq "target_id"` - Events targeting specific resource
## Common Event Types
- `user.authentication.auth` - User login attempts
- `user.authentication.sso` - SSO authentication
- `user.session.start` - Session initiation
- `user.session.end` - Session termination
- `user.lifecycle.create` - User creation
- `user.lifecycle.activate` - User activation
- `user.lifecycle.suspend` - User suspension
- `user.lifecycle.deactivate` - User deactivation
- `application.user_membership.add` - App assignment
- `application.user_membership.remove` - App removal
- `group.user_membership.add` - Group membership addition
- `group.user_membership.remove` - Group membership removal
- `policy.lifecycle.create` - Policy creation
- `policy.lifecycle.update` - Policy modification
## Search Parameter (q)
Free-text search across event data:
- Search for usernames, email addresses, application names
- Search for IP addresses, client information
- Search for error messages or specific text in events
## Example Filters
- Authentication failures: `eventType eq "user.authentication.auth" and outcome.result eq "FAILURE"`
- User lifecycle changes: `eventType sw "user.lifecycle"`
- Application events: `eventType sw "application"`
- Admin actions: `actor.type eq "User" and eventType sw "policy"`
- Specific user activity: `actor.alternateId eq "user@company.com"`
Returns SAMPLE DATA (3 results) + endpoint metadata for code generation.
You MUST generate Python code to get actual results - see read_system_instructions().
"""
filter_desc = []
if since:
filter_desc.append(f"since='{since}'")
if until:
filter_desc.append(f"until='{until}'")
if filter:
filter_desc.append(f"filter='{filter}'")
if q:
filter_desc.append(f"q='{q}'")
filter_log = ", ".join(filter_desc) if filter_desc else "no filters"
await ctx.info(f"Fetching sample log events ({filter_log})")
from fctr_okta_mcp.client.base_okta_api_client import OktaAPIClient
client = OktaAPIClient(ctx=ctx)
params = {"limit": 3} # Hardcoded for sample data
if since:
params["since"] = since
if until:
params["until"] = until
if filter:
params["filter"] = filter
if q:
params["q"] = q
if sortOrder:
if sortOrder.upper() not in ['ASCENDING', 'DESCENDING']:
await ctx.warning("Sort order must be 'ASCENDING' or 'DESCENDING'. Using 'DESCENDING'.")
params["sortOrder"] = "DESCENDING"
else:
params["sortOrder"] = sortOrder.upper()
await ctx.info("Calling Okta API: GET /api/v1/logs...")
result = await client.make_request(
method="GET",
endpoint="/api/v1/logs",
params=params,
max_results=3
)
if result.get("status") == "success":
data = result.get("data", [])
event_types = [e.get("eventType", "unknown") for e in data]
await ctx.info(f"Retrieved {len(data)} log event(s): {', '.join(event_types) if event_types else 'none'}")
result["_reminder"] = "This is SAMPLE DATA (3 results). Generate Python code using execute_code() to get full results."
log_audit_event(
action="okta_log_get_events",
status="success",
details={"filter": filter_log, "result_count": len(data)}
)
else:
await ctx.warning(f"API returned status: {result.get('status')}")
log_audit_event(
action="okta_log_get_events",
status="error",
details={"filter": filter_log, "api_status": result.get("status")}
)
return compress_response(result)
logger.info("Log event tools registered successfully")