We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/NyxiumYuuki/GeoGuessrMCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Session management for GeoGuessr authentication.
"""
import asyncio
import logging
import secrets
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
import httpx
from ..config import settings
logger = logging.getLogger(__name__)
@dataclass
class UserSession:
"""Represents an authenticated GeoGuessr session."""
ncfa_cookie: str
user_id: str
username: str
email: str
created_at: datetime = field(default_factory=datetime.now)
expires_at: datetime | None = None
def is_valid(self) -> bool:
"""Check if the session is still valid."""
if self.expires_at and datetime.now(UTC) > self.expires_at:
return False
return bool(self.ncfa_cookie)
class SessionManager:
"""Manages user sessions for the MCP server."""
def __init__(self, default_cookie: str | None = None):
self._sessions: dict[str, UserSession] = {}
self._user_sessions: dict[str, str] = {}
self._default_cookie: str | None = default_cookie or settings.DEFAULT_NCFA_COOKIE
self._lock = asyncio.Lock()
@staticmethod
def _generate_session_token() -> str:
"""Generate a secure session token."""
return secrets.token_urlsafe(32)
async def login(
self, email: str, password: str, base_url: str = settings.GEOGUESSR_API_URL
) -> tuple[str, UserSession]:
"""
Authenticate with GeoGuessr and create a session.
Args:
email: User's email address
password: User's password
base_url: GeoGuessr API base URL
Returns:
tuple[str, UserSession]: (session_token, UserSession) on success
Raises:
ValueError: On authentication failure
"""
async with httpx.AsyncClient(timeout=30.0) as client:
# Attempt to sign in
response = await client.post(
f"{base_url}/v3/accounts/signin",
json={"email": email, "password": password},
headers={"Content-Type": "application/json"},
)
if response.status_code == 401:
raise ValueError("Invalid email or password")
elif response.status_code == 403:
raise ValueError("Account access denied")
elif response.status_code == 429:
raise ValueError("Too many login attempts")
elif response.status_code != 200:
raise ValueError(f"Login failed: {response.status_code}")
# Extract the _ncfa cookie
ncfa_cookie = self._extract_ncfa_cookie(response)
if not ncfa_cookie:
raise ValueError("No session cookie received")
# Get user profile
client.cookies.set("_ncfa", ncfa_cookie, domain=settings.GEOGUESSR_DOMAIN_NAME)
profile_response = await client.get(f"{base_url}/v3/profiles")
if profile_response.status_code != 200:
raise ValueError("Failed to retrieve user profile")
profile = profile_response.json()
# Create and store session
session = UserSession(
ncfa_cookie=ncfa_cookie,
user_id=profile.get("id", ""),
username=profile.get("nick", ""),
email=email,
expires_at=datetime.now(UTC) + timedelta(days=30),
)
session_token = await self._store_session(session)
logger.info(f"User {session.username} logged in successfully")
return session_token, session
@staticmethod
def _extract_ncfa_cookie(response: httpx.Response) -> str | None:
"""Extract _ncfa cookie from response."""
# Try cookies jar first
for cookie in response.cookies.jar:
if cookie.name == "_ncfa":
return cookie.value
# Try Set-Cookie header
set_cookie = response.headers.get("set-cookie", "")
if "_ncfa=" in set_cookie:
for part in set_cookie.split(";"):
if part.strip().startswith("_ncfa="):
return part.strip()[6:]
return None
async def _store_session(self, session: UserSession) -> str:
"""Store a session and return its token."""
async with self._lock:
session_token = self._generate_session_token()
# Remove old session for this user if exists
if session.user_id in self._user_sessions:
old_token = self._user_sessions[session.user_id]
self._sessions.pop(old_token, None)
self._sessions[session_token] = session
self._user_sessions[session.user_id] = session_token
return session_token
async def logout(self, session_token: str) -> bool:
"""
Logout and invalidate a session.
Args:
session_token: Token of the session to logout
Returns:
bool: True if session was found and removed, False otherwise
"""
async with self._lock:
if session_token in self._sessions:
session = self._sessions.pop(session_token)
self._user_sessions.pop(session.user_id, None)
logger.info(f"User {session.username} logged out")
return True
return False
async def get_session(self, session_token: str | None = None) -> UserSession | None:
"""
Get a session by token or return default if available.
Args:
session_token: Optional session token to look up
Returns:
UserSession if found and valid, None otherwise
"""
if session_token:
async with self._lock:
session = self._sessions.get(session_token)
if session and session.is_valid():
return session
elif session:
# Session expired, clean up
self._sessions.pop(session_token, None)
self._user_sessions.pop(session.user_id, None)
# Fall back to default cookie if available
if self._default_cookie:
return UserSession(
ncfa_cookie=self._default_cookie,
user_id="default",
username="default",
email="default",
)
return None
async def set_default_cookie(self, cookie: str) -> None:
"""
Set or update the default NCFA cookie.
Args:
cookie: The NCFA cookie value to set as default
"""
async with self._lock:
self._default_cookie = cookie
logger.info("Default NCFA cookie updated")
@staticmethod
async def validate_cookie(cookie: str) -> dict | None:
"""
Validate a cookie by making a test request.
Returns:
User profile dict if valid, None otherwise
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
client.cookies.set("_ncfa", cookie, domain=settings.GEOGUESSR_DOMAIN_NAME)
response = await client.get(f"{settings.GEOGUESSR_API_URL}/v3/profiles")
if response.status_code == 200:
return response.json()
except Exception as e:
logger.warning(f"Cookie validation failed: {e}")
return None