# src/fctr_okta_mcp/tools/okta/datetime_tools.py
"""
DateTime Utility Tools for Okta MCP Server.
Tools for generating and parsing timestamps for Okta API queries.
"""
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastmcp import FastMCP, Context
from fctr_okta_mcp.utils.logger import get_logger, log_audit_event
logger = get_logger(__name__)
def register_datetime_tools(mcp: FastMCP):
"""Register all datetime utility tools with the MCP server."""
logger.debug("Registering datetime tools...")
@mcp.tool(
tags={"okta", "datetime", "utility", "direct"},
annotations={
"category": "datetime",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_datetime_now(
ctx: Context,
buffer_hours: Annotated[int, "Optional number of hours to add (positive) or subtract (negative) from current time. Default is 0."] = 0
) -> dict:
"""
Get the current date and time in UTC, formatted for Okta API usage.
Returns current UTC timestamp in ISO 8601 format with microseconds and Z suffix,
suitable for Okta API date parameters and filtering.
## Buffer Hours
Use buffer_hours to get times in the past (negative) or future (positive):
- buffer_hours=0: Current time
- buffer_hours=-24: 24 hours ago
- buffer_hours=-168: 1 week ago (7*24 hours)
- buffer_hours=24: 24 hours from now
## Output Format
Returns timestamp in format: YYYY-MM-DDTHH:MM:SS.ffffffZ
Example: 2024-06-23T14:30:15.123456Z
## Use Cases
- Log event filtering: since="2024-06-22T00:00:00.000Z"
- User creation filters: lastUpdated gt "timestamp"
- Application audit queries with time ranges
- Policy rule time-based conditions
Perfect for constructing Okta API queries that require precise timestamps.
"""
await ctx.info(f"Getting current UTC time with buffer of {buffer_hours} hours")
try:
# Get current UTC time
now = datetime.now(timezone.utc)
# Add buffer if specified
if buffer_hours:
now += timedelta(hours=buffer_hours)
await ctx.info(f"Applied buffer of {buffer_hours} hours to current time")
# Format with 'Z' to explicitly indicate UTC
result = now.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
await ctx.info(f"Generated timestamp: {result}")
log_audit_event(
action="okta_datetime_now",
status="success",
details={"buffer_hours": buffer_hours, "timestamp": result}
)
return {
"status": "success",
"timestamp": result,
"buffer_hours": buffer_hours,
"description": f"UTC timestamp {'with buffer of ' + str(buffer_hours) + ' hours' if buffer_hours else '(current time)'}"
}
except Exception as e:
logger.exception(f"Error generating timestamp: {e}")
log_audit_event(
action="okta_datetime_now",
status="error",
details={"error": str(e)}
)
return {
"status": "error",
"error": str(e)
}
@mcp.tool(
tags={"okta", "datetime", "utility", "direct"},
annotations={
"category": "datetime",
"readOnlyHint": True,
"idempotentHint": True
}
)
async def okta_datetime_parse_relative(
ctx: Context,
time_expression: Annotated[str, "Natural language time expression like '2 days ago', 'yesterday', '1 week ago', 'last month'"]
) -> dict:
"""
Parse natural language time expressions into Okta API-compatible timestamps.
Converts human-readable time expressions into ISO 8601 formatted timestamps
with microseconds, suitable for Okta API queries and filtering.
## Supported Expressions
- Relative times: "2 days ago", "1 week ago", "3 months ago"
- Named times: "yesterday", "last week", "last month"
- Precise times: "1 hour ago", "30 minutes ago"
- Period boundaries: "beginning of today", "end of yesterday"
- Week/month boundaries: "start of this week", "end of last month"
## Output Format
Returns timestamp in format: YYYY-MM-DDTHH:MM:SS.ffffffZ
Example: 2024-06-21T00:00:00.000000Z
## Common Use Cases
- Log queries: 'since=parse_relative_time("24 hours ago")'
- User filters: 'lastUpdated gt parse_relative_time("1 week ago")'
- Application activity: 'created after parse_relative_time("yesterday")'
- Policy rule conditions with time-based criteria
Perfect for constructing Okta audit queries and date-based filters:
Example: filter='eventType eq "user.authentication.auth" and published gt "parsed_timestamp"'
"""
await ctx.info(f"Parsing time expression: '{time_expression}'")
try:
# Try to import dateparser
try:
import dateparser
except ImportError:
# Fallback to manual parsing if dateparser not available
return await _manual_parse_time(ctx, time_expression)
# Validate input
if not time_expression or not time_expression.strip():
raise ValueError("time_expression cannot be empty")
time_expression = time_expression.strip()
parsed_time = dateparser.parse(time_expression, settings={'RETURN_AS_TIMEZONE_AWARE': True})
if parsed_time is None:
logger.warning(f"Could not parse time expression: {time_expression}")
await ctx.warning(f"Could not parse time expression: '{time_expression}'")
return {
"status": "error",
"error": f"Could not parse time expression: '{time_expression}'",
"suggestion": "Try expressions like '2 days ago', 'yesterday', '1 week ago', 'last month'"
}
# Format result
result = parsed_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
await ctx.info(f"Successfully parsed '{time_expression}' to: {result}")
log_audit_event(
action="okta_datetime_parse_relative",
status="success",
details={"expression": time_expression, "timestamp": result}
)
return {
"status": "success",
"timestamp": result,
"original_expression": time_expression,
"description": f"Parsed '{time_expression}' to UTC timestamp"
}
except Exception as e:
logger.exception(f"Error parsing time expression '{time_expression}': {e}")
log_audit_event(
action="okta_datetime_parse_relative",
status="error",
details={"expression": time_expression, "error": str(e)}
)
return {
"status": "error",
"error": str(e),
"original_expression": time_expression
}
async def _manual_parse_time(ctx: Context, time_expression: str) -> dict:
"""Fallback manual time parsing when dateparser is not available."""
await ctx.info("Using manual time parsing (dateparser not available)")
now = datetime.now(timezone.utc)
expression_lower = time_expression.lower().strip()
try:
# Simple pattern matching
if "yesterday" in expression_lower:
parsed = now - timedelta(days=1)
elif "hour ago" in expression_lower or "hours ago" in expression_lower:
# Extract number
parts = expression_lower.split()
hours = int(parts[0]) if parts[0].isdigit() else 1
parsed = now - timedelta(hours=hours)
elif "day ago" in expression_lower or "days ago" in expression_lower:
parts = expression_lower.split()
days = int(parts[0]) if parts[0].isdigit() else 1
parsed = now - timedelta(days=days)
elif "week ago" in expression_lower or "weeks ago" in expression_lower:
parts = expression_lower.split()
weeks = int(parts[0]) if parts[0].isdigit() else 1
parsed = now - timedelta(weeks=weeks)
elif "month ago" in expression_lower or "months ago" in expression_lower:
parts = expression_lower.split()
months = int(parts[0]) if parts[0].isdigit() else 1
parsed = now - timedelta(days=months * 30) # Approximate
elif "last week" in expression_lower:
parsed = now - timedelta(weeks=1)
elif "last month" in expression_lower:
parsed = now - timedelta(days=30)
else:
return {
"status": "error",
"error": f"Could not parse: '{time_expression}'. Install 'dateparser' for better parsing.",
"suggestion": "Try: 'X hours ago', 'X days ago', 'X weeks ago', 'yesterday'"
}
result = parsed.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
return {
"status": "success",
"timestamp": result,
"original_expression": time_expression,
"description": f"Parsed '{time_expression}' to UTC timestamp (manual parsing)"
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"original_expression": time_expression
}
logger.info("DateTime tools registered successfully")