Skip to main content
Glama
handlers.py4.82 kB
"""HTTP API request handlers.""" import json from typing import Dict, Any, Optional from datetime import datetime class HTTPError(Exception): """Base exception for HTTP errors.""" def __init__(self, status_code: int, message: str): self.status_code = status_code self.message = message super().__init__(message) class ValidationError(HTTPError): """Raised when request validation fails.""" def __init__(self, message: str): super().__init__(400, message) class NotFoundError(HTTPError): """Raised when requested resource is not found.""" def __init__(self, message: str = "Resource not found"): super().__init__(404, message) class BaseHandler: """Base class for HTTP request handlers.""" def __init__(self): self.logger = None def validate_request(self, data: Dict[str, Any], required_fields: list) -> None: """Validate incoming request data.""" for field in required_fields: if field not in data: raise ValidationError(f"Missing required field: {field}") if not data[field]: raise ValidationError(f"Empty value for field: {field}") def create_response(self, data: Any, status_code: int = 200) -> Dict[str, Any]: """Create standardized API response.""" return { 'status': 'success' if status_code < 400 else 'error', 'status_code': status_code, 'data': data, 'timestamp': datetime.now().isoformat() } def handle_error(self, error: Exception) -> Dict[str, Any]: """Handle and format errors for API response.""" if isinstance(error, HTTPError): return self.create_response( {'message': error.message}, error.status_code ) else: return self.create_response( {'message': 'Internal server error'}, 500 ) class UserHandler(BaseHandler): """Handler for user-related API endpoints.""" def __init__(self, user_service): super().__init__() self.user_service = user_service def create_user(self, request_data: Dict[str, Any]) -> Dict[str, Any]: """Create a new user account.""" try: self.validate_request(request_data, ['username', 'email', 'password']) user = self.user_service.create_user( username=request_data['username'], email=request_data['email'], password=request_data['password'] ) return self.create_response({ 'user_id': user.id, 'username': user.username, 'email': user.email }, 201) except Exception as e: return self.handle_error(e) def get_user(self, user_id: int) -> Dict[str, Any]: """Get user by ID.""" try: user = self.user_service.get_user(user_id) if not user: raise NotFoundError(f"User {user_id} not found") return self.create_response({ 'user_id': user.id, 'username': user.username, 'email': user.email, 'created_at': user.created_at.isoformat(), 'is_active': user.is_active }) except Exception as e: return self.handle_error(e) def update_user(self, user_id: int, request_data: Dict[str, Any]) -> Dict[str, Any]: """Update user information.""" try: user = self.user_service.get_user(user_id) if not user: raise NotFoundError(f"User {user_id} not found") updated_user = self.user_service.update_user(user_id, request_data) return self.create_response({ 'user_id': updated_user.id, 'username': updated_user.username, 'email': updated_user.email }) except Exception as e: return self.handle_error(e) def parse_json_body(request_body: str) -> Dict[str, Any]: """Parse JSON request body.""" try: return json.loads(request_body) except json.JSONDecodeError: raise ValidationError("Invalid JSON in request body") def extract_query_params(query_string: str) -> Dict[str, str]: """Extract query parameters from URL.""" params = {} if not query_string: return params for param in query_string.split('&'): if '=' in param: key, value = param.split('=', 1) params[key] = value return params

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FarhanAliRaza/claude-context-local'

If you have feedback or need assistance with the MCP directory API, please join our Discord server