#!/usr/bin/env python3
"""
Modelos de datos para autenticación OAuth2
"""
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, EmailStr
from enum import Enum
import uuid
class UserRole(str, Enum):
"""Roles de usuario"""
ADMIN = "admin"
USER = "user"
READONLY = "readonly"
class OAuthProvider(str, Enum):
"""Proveedores OAuth2 soportados"""
GOOGLE = "google"
GITHUB = "github"
MICROSOFT = "microsoft"
class User(BaseModel):
"""Modelo de usuario"""
id: str
email: EmailStr
name: str
avatar_url: Optional[str] = None
provider: OAuthProvider
provider_id: str
role: UserRole = UserRole.USER
is_active: bool = True
created_at: datetime
last_login: Optional[datetime] = None
@classmethod
def create_new(
cls,
email: str,
name: str,
provider: OAuthProvider,
provider_id: str,
avatar_url: Optional[str] = None,
role: UserRole = UserRole.USER
) -> "User":
"""Crear un nuevo usuario"""
return cls(
id=str(uuid.uuid4()),
email=email,
name=name,
avatar_url=avatar_url,
provider=provider,
provider_id=provider_id,
role=role,
created_at=datetime.now(timezone.utc)
)
class AccessToken(BaseModel):
"""Modelo de token de acceso"""
token: str
user_id: str
expires_at: datetime
scopes: List[str] = []
def is_expired(self) -> bool:
"""Verificar si el token ha expirado"""
return datetime.now(timezone.utc) > self.expires_at
@classmethod
def create_new(
cls,
user_id: str,
token: str,
expires_in_seconds: int = 3600,
scopes: List[str] = None
) -> "AccessToken":
"""Crear un nuevo token de acceso"""
return cls(
token=token,
user_id=user_id,
expires_at=datetime.now(timezone.utc) + timedelta(seconds=expires_in_seconds),
scopes=scopes or ["mcp:read", "mcp:write"]
)
class RefreshToken(BaseModel):
"""Modelo de token de refresco"""
token: str
user_id: str
expires_at: datetime
is_revoked: bool = False
def is_expired(self) -> bool:
"""Verificar si el token ha expirado"""
return datetime.now(timezone.utc) > self.expires_at
@classmethod
def create_new(
cls,
user_id: str,
token: str,
expires_in_days: int = 30
) -> "RefreshToken":
"""Crear un nuevo token de refresco"""
return cls(
token=token,
user_id=user_id,
expires_at=datetime.now(timezone.utc) + timedelta(days=expires_in_days)
)
class OAuthState(BaseModel):
"""Estado OAuth2 para prevenir CSRF"""
state: str
provider: OAuthProvider
redirect_uri: Optional[str] = None
created_at: datetime
expires_at: datetime
def is_expired(self) -> bool:
"""Verificar si el estado ha expirado"""
return datetime.now(timezone.utc) > self.expires_at
@classmethod
def create_new(
cls,
provider: OAuthProvider,
redirect_uri: Optional[str] = None,
expires_in_minutes: int = 10
) -> "OAuthState":
"""Crear un nuevo estado OAuth2"""
now = datetime.now(timezone.utc)
return cls(
state=str(uuid.uuid4()),
provider=provider,
redirect_uri=redirect_uri,
created_at=now,
expires_at=now + timedelta(minutes=expires_in_minutes)
)
# Modelos de petición/respuesta para la API
class TokenRequest(BaseModel):
"""Petición de token"""
grant_type: str
code: Optional[str] = None
refresh_token: Optional[str] = None
redirect_uri: Optional[str] = None
class TokenResponse(BaseModel):
"""Respuesta de token"""
access_token: str
token_type: str = "Bearer"
expires_in: int
refresh_token: Optional[str] = None
scope: str = "mcp:read mcp:write"
class UserInfo(BaseModel):
"""Información del usuario para la API"""
id: str
email: str
name: str
avatar_url: Optional[str] = None
role: UserRole
provider: OAuthProvider
created_at: datetime
last_login: Optional[datetime] = None
class AuthorizeRequest(BaseModel):
"""Petición de autorización"""
response_type: str = "code"
client_id: str
redirect_uri: str
scope: str = "mcp:read mcp:write"
state: Optional[str] = None
provider: OAuthProvider
# Configuración de proveedores OAuth2
class OAuthProviderConfig(BaseModel):
"""Configuración de proveedor OAuth2"""
client_id: str
client_secret: str
authorize_url: str
token_url: str
userinfo_url: str
scopes: List[str]
@property
def is_configured(self) -> bool:
"""Verificar si el proveedor está configurado"""
return bool(self.client_id and self.client_secret)
# Configuraciones por defecto de proveedores
OAUTH_PROVIDERS_CONFIG = {
OAuthProvider.GOOGLE: {
"authorize_url": "https://accounts.google.com/o/oauth2/v2/auth",
"token_url": "https://oauth2.googleapis.com/token",
"userinfo_url": "https://www.googleapis.com/oauth2/v2/userinfo",
"scopes": ["openid", "email", "profile"]
},
OAuthProvider.GITHUB: {
"authorize_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"userinfo_url": "https://api.github.com/user",
"scopes": ["user:email", "read:user"]
},
OAuthProvider.MICROSOFT: {
"authorize_url": "https://login.microsoftonline.com/common/oauth2/v2.0/authorize",
"token_url": "https://login.microsoftonline.com/common/oauth2/v2.0/token",
"userinfo_url": "https://graph.microsoft.com/v1.0/me",
"scopes": ["openid", "email", "profile"]
}
}
class AuthError(Exception):
"""Excepción base para errores de autenticación"""
def __init__(self, message: str, error_code: str = "auth_error"):
self.message = message
self.error_code = error_code
super().__init__(message)
class TokenExpiredError(AuthError):
"""Error de token expirado"""
def __init__(self):
super().__init__("Token has expired", "token_expired")
class InvalidTokenError(AuthError):
"""Error de token inválido"""
def __init__(self):
super().__init__("Invalid token", "invalid_token")
class InsufficientScopeError(AuthError):
"""Error de permisos insuficientes"""
def __init__(self, required_scope: str):
super().__init__(f"Insufficient scope. Required: {required_scope}", "insufficient_scope")