import jwt
import logging
from functools import wraps
from flask import request, jsonify, g
from app.core.config import config
from app.core.errors import JSONRPCErrorCode
logger = logging.getLogger(__name__)
def verify_jwt_token(token: str) -> dict:
"""Verify JWT token and return payload"""
try:
# For development, we'll use a simple token validation
# In production, this should validate against a proper JWT with user info
if token == "devtoken":
return {"sub": "dev-user", "role": "admin"}
# Try to decode as proper JWT
payload = jwt.decode(
token,
config.JWT_SECRET,
algorithms=[config.JWT_ALGORITHM]
)
return payload
except jwt.ExpiredSignatureError:
raise Exception("Token has expired")
except jwt.InvalidTokenError:
raise Exception("Invalid token")
def require_auth(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({
"jsonrpc": "2.0",
"error": {
"code": JSONRPCErrorCode.SERVER_ERROR.value,
"message": "Authorization header required"
},
"id": None
}), 401
if not auth_header.startswith('Bearer '):
return jsonify({
"jsonrpc": "2.0",
"error": {
"code": JSONRPCErrorCode.SERVER_ERROR.value,
"message": "Bearer token required"
},
"id": None
}), 401
token = auth_header[7:] # Remove 'Bearer ' prefix
try:
payload = verify_jwt_token(token)
g.current_user = payload
return f(*args, **kwargs)
except Exception as e:
logger.warning(f"Authentication failed: {str(e)}")
return jsonify({
"jsonrpc": "2.0",
"error": {
"code": JSONRPCErrorCode.SERVER_ERROR.value,
"message": "Authentication failed"
},
"id": None
}), 401
return decorated_function
def create_jwt_token(user_data: dict) -> str:
"""Create a JWT token for a user"""
payload = {
"sub": user_data.get("username", "anonymous"),
"role": user_data.get("role", "user"),
"iat": jwt.datetime.utcnow(),
"exp": jwt.datetime.utcnow() + jwt.timedelta(hours=24)
}
return jwt.encode(payload, config.JWT_SECRET, algorithm=config.JWT_ALGORITHM)