"""
Session management for Zintlr MCP Server.
Stores user tokens (from Zintlr OAuth) in Redis with session IDs.
The session ID is used as the OAuth "code" and "access_token" for LLM clients.
"""
import json
import secrets
from datetime import datetime
from typing import Any
import redis.asyncio as redis
from app.config import settings
class SessionManager:
"""Manages user sessions in Redis."""
def __init__(self):
self._redis: redis.Redis | None = None
async def get_redis(self) -> redis.Redis:
"""Get or create Redis connection."""
if self._redis is None:
self._redis = redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True,
)
return self._redis
async def close(self) -> None:
"""Close Redis connection."""
if self._redis:
await self._redis.close()
self._redis = None
def _generate_session_id(self) -> str:
"""Generate a secure random session ID."""
return f"mcp_sess_{secrets.token_urlsafe(32)}"
def _session_key(self, session_id: str) -> str:
"""Get Redis key for a session."""
return f"zintlr_mcp:session:{session_id}"
async def create_session(
self,
access_token: str,
key: str,
visitor_id: str,
extra_data: dict[str, Any] | None = None,
) -> str:
"""
Create a new session with user tokens.
Args:
access_token: Encrypted JWT access token from Zintlr
key: Encrypted user key from Zintlr
visitor_id: Visitor ID from Zintlr
extra_data: Optional additional data to store
Returns:
Session ID (used as OAuth code/access_token for LLM clients)
"""
r = await self.get_redis()
session_id = self._generate_session_id()
session_data = {
"access_token": access_token,
"key": key,
"visitor_id": visitor_id,
"created_at": datetime.utcnow().isoformat(),
**(extra_data or {}),
}
await r.setex(
self._session_key(session_id),
settings.session_expire_seconds,
json.dumps(session_data),
)
return session_id
async def get_session(self, session_id: str) -> dict[str, Any] | None:
"""
Get session data by session ID.
Args:
session_id: The session ID
Returns:
Session data dict or None if not found/expired
"""
r = await self.get_redis()
data = await r.get(self._session_key(session_id))
if data:
return json.loads(data)
return None
async def get_user_tokens(self, session_id: str) -> dict[str, str] | None:
"""
Get user tokens from session.
Args:
session_id: The session ID
Returns:
Dict with access_token, key, visitor_id or None
"""
session = await self.get_session(session_id)
if session:
return {
"access_token": session.get("access_token", ""),
"key": session.get("key", ""),
"visitor_id": session.get("visitor_id", ""),
}
return None
async def delete_session(self, session_id: str) -> bool:
"""
Delete a session.
Args:
session_id: The session ID to delete
Returns:
True if deleted, False if not found
"""
r = await self.get_redis()
result = await r.delete(self._session_key(session_id))
return result > 0
async def refresh_session(self, session_id: str) -> bool:
"""
Refresh session TTL.
Args:
session_id: The session ID
Returns:
True if refreshed, False if session not found
"""
r = await self.get_redis()
result = await r.expire(
self._session_key(session_id),
settings.session_expire_seconds,
)
return result
# Global session manager instance
session_manager = SessionManager()