"""Authorization module.
Provides unified permission check decorators and quota management.
"""
import json
import os
from datetime import datetime, timezone
from enum import Enum
from functools import wraps
from pathlib import Path
from typing import Callable, Optional
# Upgrade URL
UPGRADE_URL = "https://alphameta.app"
class Tier(Enum):
"""Permission tier enum."""
FREE = "free"
PRO = "pro"
PREMIUM = "premium"
# Monthly quotas by tier and API key status
# Format: (tier, has_api_key) -> monthly_quota
MONTHLY_QUOTAS = {
(Tier.FREE, False): 100, # No API Key
(Tier.FREE, True): 500, # With API Key
(Tier.PRO, True): 2000, # PRO
(Tier.PREMIUM, True): 5000, # PREMIUM
}
class AuthManager:
"""Authorization manager.
Monthly quota system with local cache storage.
"""
def __init__(self) -> None:
self._api_key: Optional[str] = None
self._tier: Tier = Tier.FREE
self._used: int = 0
self._last_reset: Optional[datetime] = None
self._cache_file = self._get_cache_file()
self._load()
def _get_cache_file(self) -> Path:
"""Get cache file path."""
home = Path(os.path.expanduser("~"))
ibkr_mcp_dir = home / ".ibkr-mcp"
ibkr_mcp_dir.mkdir(exist_ok=True)
return ibkr_mcp_dir / "quota.json"
def _load(self) -> None:
"""Load quota data from local cache."""
try:
if self._cache_file.exists():
with open(self._cache_file, "r") as f:
data = json.load(f)
self._used = data.get("used", 0)
self._last_reset = datetime.fromisoformat(
data.get("last_reset", "2000-01-01T00:00:00Z")
)
# Check if reset is needed (first day of month)
if self._should_reset():
self._reset()
except Exception:
self._used = 0
self._last_reset = None
def _should_reset(self) -> bool:
"""Check if quota needs to be reset (first day of month)."""
if self._last_reset is None:
return True
now = datetime.now(timezone.utc)
return (now.year, now.month) > (self._last_reset.year, self._last_reset.month)
def _reset(self) -> None:
"""Reset quota."""
self._used = 0
self._last_reset = datetime.now(timezone.utc)
self._save()
def _save(self) -> None:
"""Save quota data to local cache."""
try:
data = {
"used": self._used,
"last_reset": self._last_reset.isoformat() if self._last_reset else None,
}
with open(self._cache_file, "w") as f:
json.dump(data, f, indent=2)
except Exception:
pass # Silently fail, does not affect functionality
def _get_quota(self) -> int:
"""Get monthly quota based on tier and API key status."""
has_key = self._api_key is not None
return MONTHLY_QUOTAS.get((self._tier, has_key), 50)
def has_api_key(self) -> bool:
"""Check if API key is configured."""
return self._api_key is not None
async def initialize(self, api_key: Optional[str] = None) -> None:
"""Initialize authorization manager.
Args:
api_key: Optional API Key (cloud verification not implemented in MVP)
"""
self._api_key = api_key
if api_key:
# TODO: Implement cloud verification and tier lookup in the future
self._tier = Tier.FREE
else:
self._tier = Tier.FREE
self._load()
def can_use_pro(self) -> bool:
"""Check if PRO permission is available (always returns False in MVP)."""
return False
def can_use_premium(self) -> bool:
"""Check if PREMIUM permission is available (always returns False in MVP)."""
return False
def check_quota(self) -> tuple[bool, str]:
"""Check and consume quota.
Returns:
(ok, message): Whether quota is sufficient and the message
"""
# Check if reset is needed
if self._should_reset():
self._reset()
quota = self._get_quota()
if self._used >= quota:
remaining = 0
return (
False,
f"Monthly quota exceeded ({self._used}/{quota})",
)
# Consume quota
self._used += 1
self._save()
remaining = quota - self._used
return True, f"Quota: {remaining}/{quota}"
def get_status(self) -> dict:
"""Get current authorization status."""
quota = self._get_quota()
return {
"tier": self._tier.value,
"used": self._used,
"quota": quota,
"remaining": quota - self._used,
"api_key_configured": self._api_key is not None,
}
# Global singleton
_auth_manager: Optional[AuthManager] = None
def get_auth_manager() -> AuthManager:
"""Get AuthManager singleton."""
global _auth_manager
if _auth_manager is None:
_auth_manager = AuthManager()
return _auth_manager
def require_tier(required: Tier = Tier.FREE):
"""Permission check decorator.
Args:
required: Required permission tier (default FREE)
Usage:
@mcp.tool()
@require_tier() # FREE, check quota by default
async def get_portfolio(...):
...
@mcp.tool()
@require_tier(required=Tier.PRO) # PRO
async def scan_option_signals(...):
...
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
auth = get_auth_manager()
# Tier error messages
if required == Tier.PRO and not auth.can_use_pro():
return {
"error": "PRO_REQUIRED",
"message": f"PRO account required, upgrade: {UPGRADE_URL}",
}
if required == Tier.PREMIUM and not auth.can_use_premium():
return {
"error": "PREMIUM_REQUIRED",
"message": f"PREMIUM account required, upgrade: {UPGRADE_URL}",
}
# Quota check (FREE/PRO)
if required in (Tier.FREE, Tier.PRO):
ok, msg = auth.check_quota()
if not ok:
return {
"error": "QUOTA_EXCEEDED",
"message": f"{msg}, upgrade: {UPGRADE_URL}",
}
return await func(*args, **kwargs)
return wrapper
return decorator
__all__ = ["Tier", "AuthManager", "require_tier", "get_auth_manager"]