Skip to main content
Glama
MCP_KYC_SERVER_ARCHITECTURE.md85.7 kB
# MCP KYC Server Architecture ## Executive Summary This document outlines the comprehensive architecture for a Model Context Protocol (MCP) server that integrates external KYC (Know Your Customer) APIs with advanced metadata management and Auto Tool Registry functionality. The server will be implemented in Python and deployed via Docker on AWS EC2. --- ## 1. Project Structure ``` kyc-mcp-server/ ├── src/ │ ├── __init__.py │ ├── main.py # MCP server entry point │ ├── server/ │ │ ├── __init__.py │ │ ├── mcp_server.py # Core MCP server implementation │ │ ├── handlers.py # Request/response handlers │ │ └── middleware.py # Authentication, logging middleware │ ├── tools/ │ │ ├── __init__.py │ │ ├── base_tool.py # Abstract base tool class │ │ ├── pan_verification.py # PAN verification tool │ │ ├── pan_aadhaar_link.py # PAN-Aadhaar link tool │ │ ├── aadhaar_okyc.py # Aadhaar offline eKYC tool │ │ ├── bank_verification.py # Bank account verification tools │ │ ├── digilocker.py # DigiLocker integration tools │ │ └── mca_search.py # MCA company/director search tools │ ├── registry/ │ │ ├── __init__.py │ │ ├── tool_registry.py # Auto tool discovery & registration │ │ ├── metadata_manager.py # Metadata schema management │ │ └── schema_generator.py # Dynamic schema generation │ ├── clients/ │ │ ├── __init__.py │ │ ├── base_client.py # Base HTTP client │ │ ├── kyc_api_client.py # KYC API wrapper │ │ └── retry_handler.py # Retry logic with exponential backoff │ ├── auth/ │ │ ├── __init__.py │ │ ├── jwt_manager.py # JWT token management │ │ ├── api_key_manager.py # API key validation │ │ └── consent_validator.py # User consent validation │ ├── cache/ │ │ ├── __init__.py │ │ ├── redis_cache.py # Redis caching layer │ │ └── cache_strategy.py # Cache invalidation strategies │ ├── models/ │ │ ├── __init__.py │ │ ├── requests.py # Request data models │ │ ├── responses.py # Response data models │ │ └── schemas.py # Pydantic schemas │ ├── utils/ │ │ ├── __init__.py │ │ ├── logger.py # Structured logging │ │ ├── validators.py # Input validation utilities │ │ ├── error_handler.py # Error handling & mapping │ │ └── rate_limiter.py # Rate limiting logic │ └── config/ │ ├── __init__.py │ ├── settings.py # Configuration management │ └── metadata.json # Tool metadata definitions ├── tests/ │ ├── __init__.py │ ├── unit/ │ │ ├── test_tools.py │ │ ├── test_registry.py │ │ └── test_clients.py │ ├── integration/ │ │ ├── test_api_integration.py │ │ └── test_mcp_server.py │ └── fixtures/ │ └── sample_responses.json ├── docker/ │ ├── Dockerfile │ ├── Dockerfile.dev │ └── docker-compose.yml ├── deployment/ │ ├── ec2-setup.sh │ ├── nginx.conf │ └── systemd/ │ └── kyc-mcp-server.service ├── docs/ │ ├── API.md │ ├── DEPLOYMENT.md │ └── DEVELOPMENT.md ├── scripts/ │ ├── generate_metadata.py │ ├── test_apis.py │ └── migrate_tools.py ├── .env.example ├── .gitignore ├── requirements.txt ├── requirements-dev.txt ├── pyproject.toml ├── README.md └── LICENSE ``` --- ## 2. Advanced Metadata Schema ### 2.1 Metadata Structure (`config/metadata.json`) ```json { "version": "1.0.0", "server": { "name": "kyc-mcp-server", "description": "MCP server for KYC API integration", "protocol_version": "2024-11-05" }, "tools": [ { "name": "verify_pan", "version": "1.0.0", "description": "Verify PAN card details with name and date of birth matching", "category": "kyc", "subcategory": "pan", "api_endpoint": "/kyc/pan/verify", "http_method": "POST", "authentication": { "type": "jwt_and_api_key", "headers": ["Authorization", "x-api-key"] }, "rate_limit": { "requests_per_minute": 60, "requests_per_hour": 1000 }, "cache": { "enabled": true, "ttl_seconds": 3600, "cache_key_fields": ["pan", "name_as_per_pan", "date_of_birth"] }, "input_schema": { "type": "object", "required": ["pan", "name_as_per_pan", "date_of_birth", "consent", "reason"], "properties": { "pan": { "type": "string", "pattern": "^[A-Z]{5}[0-9]{4}[A-Z]$", "description": "10-character PAN number", "example": "XXXPX1234A" }, "name_as_per_pan": { "type": "string", "minLength": 1, "maxLength": 100, "description": "Full name as per PAN card" }, "date_of_birth": { "type": "string", "pattern": "^\\d{2}/\\d{2}/\\d{4}$", "description": "Date of birth in DD/MM/YYYY format" }, "consent": { "type": "string", "enum": ["Y", "y"], "description": "User consent (must be 'Y' or 'y')" }, "reason": { "type": "string", "minLength": 1, "description": "Reason for verification" } } }, "output_schema": { "type": "object", "properties": { "pan": {"type": "string"}, "category": { "type": "string", "enum": ["individual", "company", "trust", "association_of_persons", "body_of_individuals", "firm"] }, "status": { "type": "string", "enum": ["valid", "invalid"] }, "remarks": {"type": ["string", "null"]}, "name_as_per_pan_match": {"type": "boolean"}, "date_of_birth_match": {"type": "boolean"}, "aadhaar_seeding_status": { "type": "string", "enum": ["y", "n", "na"] } } }, "error_mappings": [ { "api_code": 422, "api_message": "Invalid Pan pattern", "mcp_error": "INVALID_INPUT", "user_message": "The PAN format is invalid. Please provide a valid 10-character PAN." }, { "api_code": 422, "api_message": "Invalid parameter: Consent is required", "mcp_error": "CONSENT_REQUIRED", "user_message": "User consent is required for PAN verification." }, { "api_code": 503, "api_message": "Source Unavailable", "mcp_error": "SERVICE_UNAVAILABLE", "user_message": "The verification service is temporarily unavailable. Please try again later." } ], "retry_strategy": { "max_attempts": 3, "backoff_multiplier": 2, "retry_on_status_codes": [500, 502, 503, 504], "timeout_seconds": 30 } }, { "name": "check_pan_aadhaar_link", "version": "1.0.0", "description": "Check if PAN and Aadhaar are linked", "category": "kyc", "subcategory": "pan_aadhaar", "api_endpoint": "/kyc/pan-aadhaar/status", "http_method": "POST", "authentication": { "type": "jwt_and_api_key", "headers": ["Authorization", "x-api-key"] }, "rate_limit": { "requests_per_minute": 60, "requests_per_hour": 1000 }, "cache": { "enabled": true, "ttl_seconds": 7200, "cache_key_fields": ["pan", "aadhaar_number"] }, "input_schema": { "type": "object", "required": ["pan", "aadhaar_number", "consent", "reason"], "properties": { "pan": { "type": "string", "pattern": "^[A-Z]{3}P[A-Z][0-9]{4}[A-Z]$", "description": "Individual PAN number (4th character must be 'P')" }, "aadhaar_number": { "type": "string", "pattern": "^[0-9]{12}$", "description": "12-digit Aadhaar number" }, "consent": { "type": "string", "enum": ["Y", "y"], "description": "User consent" }, "reason": { "type": "string", "minLength": 1, "description": "Reason for checking link status" } } }, "output_schema": { "type": "object", "properties": { "aadhaar_seeding_status": { "type": "string", "enum": ["y", "n"] }, "message": {"type": "string"} } }, "error_mappings": [ { "api_code": 422, "api_message": "Individuals PAN is required", "mcp_error": "INVALID_PAN_TYPE", "user_message": "Only individual PAN cards can be linked with Aadhaar." } ], "retry_strategy": { "max_attempts": 3, "backoff_multiplier": 2, "retry_on_status_codes": [500, 502, 503, 504], "timeout_seconds": 30 } } ], "global_settings": { "base_url": "${KYC_API_BASE_URL}", "default_timeout": 30, "max_retries": 3, "log_level": "INFO", "enable_metrics": true } } ``` --- ## 3. Auto Tool Registry System ### 3.1 Architecture Overview ```mermaid graph TB A[Tool Discovery] --> B[Schema Generator] B --> C[Tool Registry] C --> D[MCP Server] E[Metadata Manager] --> B F[API Specifications] --> B subgraph "Auto Registration Flow" G[Scan tools/ directory] --> H[Load tool modules] H --> I[Extract tool metadata] I --> J[Generate MCP schemas] J --> K[Register with MCP server] end ``` ### 3.2 Tool Registry Implementation Pattern ```python # registry/tool_registry.py from typing import Dict, List, Type, Optional from abc import ABC, abstractmethod import importlib import inspect from pathlib import Path class ToolRegistry: """ Auto-discovery and registration system for MCP tools. Scans the tools directory and automatically registers all tool classes. """ def __init__(self, metadata_manager: 'MetadataManager'): self.metadata_manager = metadata_manager self.tools: Dict[str, Type['BaseTool']] = {} self.tool_metadata: Dict[str, dict] = {} def discover_tools(self, tools_dir: Path) -> List[Type['BaseTool']]: """ Automatically discover all tool classes in the tools directory. Returns: List of discovered tool classes """ discovered_tools = [] for py_file in tools_dir.glob("*.py"): if py_file.name.startswith("_"): continue module_name = f"tools.{py_file.stem}" module = importlib.import_module(module_name) for name, obj in inspect.getmembers(module, inspect.isclass): if (issubclass(obj, BaseTool) and obj is not BaseTool and not inspect.isabstract(obj)): discovered_tools.append(obj) return discovered_tools def register_tool(self, tool_class: Type['BaseTool']) -> None: """ Register a tool with the registry and generate its MCP schema. """ tool_instance = tool_class() tool_name = tool_instance.get_name() # Get metadata from metadata.json metadata = self.metadata_manager.get_tool_metadata(tool_name) # Generate MCP schema from metadata mcp_schema = self._generate_mcp_schema(metadata) # Register tool self.tools[tool_name] = tool_class self.tool_metadata[tool_name] = { 'metadata': metadata, 'mcp_schema': mcp_schema, 'instance': tool_instance } def _generate_mcp_schema(self, metadata: dict) -> dict: """ Generate MCP-compatible tool schema from metadata. """ return { 'name': metadata['name'], 'description': metadata['description'], 'inputSchema': metadata['input_schema'], 'outputSchema': metadata.get('output_schema', {}), } def get_tool(self, tool_name: str) -> Optional['BaseTool']: """Get a registered tool instance by name.""" tool_data = self.tool_metadata.get(tool_name) return tool_data['instance'] if tool_data else None def list_tools(self) -> List[dict]: """List all registered tools with their MCP schemas.""" return [ data['mcp_schema'] for data in self.tool_metadata.values() ] def auto_register_all(self, tools_dir: Path) -> int: """ Discover and register all tools automatically. Returns: Number of tools registered """ discovered = self.discover_tools(tools_dir) for tool_class in discovered: try: self.register_tool(tool_class) except Exception as e: logger.error(f"Failed to register {tool_class.__name__}: {e}") return len(self.tools) ``` ### 3.3 Base Tool Interface ```python # tools/base_tool.py from abc import ABC, abstractmethod from typing import Dict, Any, Optional from pydantic import BaseModel class BaseTool(ABC): """ Abstract base class for all MCP tools. All tools must inherit from this class and implement required methods. """ @abstractmethod def get_name(self) -> str: """Return the unique name of this tool.""" pass @abstractmethod async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Execute the tool with given parameters. Args: params: Validated input parameters Returns: Tool execution result """ pass def validate_input(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Validate input parameters against the tool's schema. Override for custom validation logic. """ # Default implementation uses Pydantic model if defined return params def get_cache_key(self, params: Dict[str, Any]) -> Optional[str]: """ Generate cache key for this tool execution. Return None to disable caching for this execution. """ return None def should_retry(self, error: Exception) -> bool: """Determine if the operation should be retried on this error.""" return False ``` --- ## 4. Core Components ### 4.1 MCP Server Implementation ```python # server/mcp_server.py from mcp.server import Server from mcp.types import Tool, TextContent from typing import Any, Sequence import logging class KYCMCPServer: """ Core MCP server implementation for KYC API integration. """ def __init__( self, tool_registry: ToolRegistry, auth_manager: AuthManager, cache_manager: CacheManager, rate_limiter: RateLimiter ): self.server = Server("kyc-mcp-server") self.tool_registry = tool_registry self.auth_manager = auth_manager self.cache_manager = cache_manager self.rate_limiter = rate_limiter self.logger = logging.getLogger(__name__) self._register_handlers() def _register_handlers(self): """Register MCP protocol handlers.""" @self.server.list_tools() async def list_tools() -> list[Tool]: """List all available tools.""" tools = [] for tool_data in self.tool_registry.tool_metadata.values(): schema = tool_data['mcp_schema'] tools.append(Tool( name=schema['name'], description=schema['description'], inputSchema=schema['inputSchema'] )) return tools @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]: """Execute a tool with given arguments.""" # Rate limiting if not await self.rate_limiter.check_limit(name): raise Exception("Rate limit exceeded") # Get tool instance tool = self.tool_registry.get_tool(name) if not tool: raise ValueError(f"Unknown tool: {name}") # Check cache cache_key = tool.get_cache_key(arguments) if cache_key: cached_result = await self.cache_manager.get(cache_key) if cached_result: return [TextContent( type="text", text=json.dumps(cached_result) )] # Execute tool try: result = await tool.execute(arguments) # Cache result if cache_key: await self.cache_manager.set( cache_key, result, ttl=tool.get_cache_ttl() ) return [TextContent( type="text", text=json.dumps(result) )] except Exception as e: self.logger.error(f"Tool execution failed: {e}") raise async def run(self): """Start the MCP server.""" from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) ``` ### 4.2 KYC API Client ```python # clients/kyc_api_client.py import httpx from typing import Dict, Any, Optional from tenacity import retry, stop_after_attempt, wait_exponential class KYCAPIClient: """ HTTP client for KYC API with retry logic and error handling. """ def __init__( self, base_url: str, api_key: str, jwt_token: str, timeout: int = 30 ): self.base_url = base_url self.api_key = api_key self.jwt_token = jwt_token self.timeout = timeout self.client = httpx.AsyncClient( base_url=base_url, timeout=timeout, headers={ "Authorization": jwt_token, "x-api-key": api_key, "Content-Type": "application/json" } ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), reraise=True ) async def post( self, endpoint: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """ Make POST request with retry logic. """ try: response = await self.client.post( endpoint, json=data, headers=headers or {} ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: # Map API errors to MCP errors raise self._map_error(e) def _map_error(self, error: httpx.HTTPStatusError) -> Exception: """Map HTTP errors to domain-specific exceptions.""" status_code = error.response.status_code if status_code == 422: return ValidationError(error.response.json().get('message')) elif status_code == 503: return ServiceUnavailableError("KYC service temporarily unavailable") else: return APIError(f"API error: {status_code}") async def close(self): """Close the HTTP client.""" await self.client.aclose() ``` ### 4.3 Authentication Manager ```python # auth/jwt_manager.py import jwt from datetime import datetime, timedelta from typing import Optional class JWTManager: """ Manages JWT token generation, validation, and refresh. """ def __init__(self, secret_key: str, algorithm: str = "HS256"): self.secret_key = secret_key self.algorithm = algorithm def generate_token( self, payload: dict, expires_in: int = 3600 ) -> str: """Generate a new JWT token.""" exp = datetime.utcnow() + timedelta(seconds=expires_in) payload['exp'] = exp return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) def validate_token(self, token: str) -> Optional[dict]: """Validate and decode JWT token.""" try: return jwt.decode( token, self.secret_key, algorithms=[self.algorithm] ) except jwt.ExpiredSignatureError: raise AuthenticationError("Token has expired") except jwt.InvalidTokenError: raise AuthenticationError("Invalid token") def refresh_token(self, token: str) -> str: """Refresh an existing token.""" payload = self.validate_token(token) payload.pop('exp', None) # Remove old expiration return self.generate_token(payload) ``` --- ## 5. Configuration Management ### 5.1 Settings Structure ```python # config/settings.py from pydantic_settings import BaseSettings from typing import Optional from functools import lru_cache class Settings(BaseSettings): """ Application settings loaded from environment variables. """ # Server Configuration SERVER_NAME: str = "kyc-mcp-server" SERVER_VERSION: str = "1.0.0" LOG_LEVEL: str = "INFO" # KYC API Configuration KYC_API_BASE_URL: str KYC_API_KEY: str KYC_JWT_SECRET: str KYC_JWT_ALGORITHM: str = "HS256" KYC_JWT_EXPIRY: int = 3600 # Redis Configuration REDIS_HOST: str = "localhost" REDIS_PORT: int = 6379 REDIS_DB: int = 0 REDIS_PASSWORD: Optional[str] = None REDIS_SSL: bool = False # Cache Configuration CACHE_ENABLED: bool = True CACHE_DEFAULT_TTL: int = 3600 CACHE_MAX_SIZE: int = 1000 # Rate Limiting RATE_LIMIT_ENABLED: bool = True RATE_LIMIT_PER_MINUTE: int = 60 RATE_LIMIT_PER_HOUR: int = 1000 # Retry Configuration MAX_RETRIES: int = 3 RETRY_BACKOFF_MULTIPLIER: int = 2 REQUEST_TIMEOUT: int = 30 # Monitoring ENABLE_METRICS: bool = True METRICS_PORT: int = 9090 class Config: env_file = ".env" case_sensitive = True @lru_cache() def get_settings() -> Settings: """Get cached settings instance.""" return Settings() ``` ### 5.2 Environment Variables (`.env.example`) ```bash # KYC API Configuration KYC_API_BASE_URL=https://api.sandbox.co.in KYC_API_KEY=your_api_key_here KYC_JWT_SECRET=your_jwt_secret_here KYC_JWT_ALGORITHM=HS256 KYC_JWT_EXPIRY=3600 # Redis Configuration REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0 REDIS_PASSWORD= REDIS_SSL=false # Cache Configuration CACHE_ENABLED=true CACHE_DEFAULT_TTL=3600 CACHE_MAX_SIZE=1000 # Rate Limiting RATE_LIMIT_ENABLED=true RATE_LIMIT_PER_MINUTE=60 RATE_LIMIT_PER_HOUR=1000 # Server Configuration LOG_LEVEL=INFO ENABLE_METRICS=true METRICS_PORT=9090 # Retry Configuration MAX_RETRIES=3 RETRY_BACKOFF_MULTIPLIER=2 REQUEST_TIMEOUT=30 ``` --- ## 6. Technology Stack ### 6.1 Core Dependencies ```toml # pyproject.toml [project] name = "kyc-mcp-server" version = "1.0.0" description = "MCP server for KYC API integration" requires-python = ">=3.11" dependencies = [ # MCP SDK "mcp>=1.0.0", # HTTP Client "httpx>=0.27.0", "httpx[http2]>=0.27.0", # Async Support "asyncio>=3.4.3", "aiofiles>=23.2.1", # Data Validation "pydantic>=2.5.0", "pydantic-settings>=2.1.0", # Caching "redis>=5.0.0", "hiredis>=2.3.0", # Retry Logic "tenacity>=8.2.3", # JWT "pyjwt>=2.8.0", "cryptography>=41.0.7", # Logging "structlog>=24.1.0", "python-json-logger>=2.0.7", # Monitoring "prometheus-client>=0.19.0", # Configuration "python-dotenv>=1.0.0", # Utilities "python-dateutil>=2.8.2", ] [project.optional-dependencies] dev = [ "pytest>=7.4.3", "pytest-asyncio>=0.21.1", "pytest-cov>=4.1.0", "pytest-mock>=3.12.0", "black>=23.12.1", "ruff>=0.1.9", "mypy>=1.7.1", "pre-commit>=3.6.0", ] [build-system] requires = ["setuptools>=68.0"] build-backend = "setuptools.build_meta" [tool.black] line-length = 88 target-version = ['py311'] [tool.ruff] line-length = 88 target-version = "py311" [tool.mypy] python_version = "3.11" strict = true warn_return_any = true warn_unused_configs = true [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] ``` ### 6.2 Recommended Stack Summary | Component | Technology | Justification | |-----------|-----------|---------------| | Language | Python 3.11+ | Excellent async support, rich ecosystem, MCP SDK availability | | MCP SDK | Official MCP Python SDK | Native protocol support | | HTTP Client | httpx | Modern async HTTP client with HTTP/2 support | | Validation | Pydantic v2 | Fast, type-safe data validation | | Caching | Redis | High-performance, distributed caching | | Retry Logic | Tenacity | Flexible retry/backoff strategies | | Logging | structlog | Structured logging for better observability | | Monitoring | Prometheus | Industry-standard metrics collection | | Testing | pytest + pytest-asyncio | Comprehensive async testing support | --- ## 7. Docker Deployment Strategy ### 7.1 Multi-Stage Dockerfile ```dockerfile # docker/Dockerfile # Stage 1: Builder FROM python:3.11-slim as builder WORKDIR /app # Install build dependencies RUN apt-get update && apt-get install -y \ gcc \ g++ \ make \ libffi-dev \ && rm -rf /var/lib/apt/lists/* # Copy requirements COPY requirements.txt . # Install Python dependencies RUN pip install --no-cache-dir --user -r requirements.txt # Stage 2: Run the MCP server.""" await self.server.run() async def shutdown(self): """Graceful shutdown.""" logger.info("shutting_down_application") if self.cache: await self.cache.close() if self.api_client: await self.api_client.close() logger.info("application_shutdown_complete") async def main(): """Main entry point.""" app = Application() # Setup signal handlers for graceful shutdown loop = asyncio.get_event_loop() def signal_handler(sig): logger.info("received_signal", signal=sig) asyncio.create_task(app.shutdown()) loop.stop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda s=sig: signal_handler(s)) try: await app.initialize() await app.run() except Exception as e: logger.error("application_error", error=str(e), exc_info=True) raise finally: await app.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` --- ## 13. Complete Tool Examples ### 13.1 PAN-Aadhaar Link Tool ```python # tools/pan_aadhaar_link.py from typing import Dict, Any from .base_tool import BaseTool from clients.kyc_api_client import KYCAPIClient from models.requests import PANAadhaarLinkRequest from models.responses import PANAadhaarLinkResponse from utils.metrics import track_tool_metrics from cache.cache_strategy import cached import hashlib class PANAadhaarLinkTool(BaseTool): """ Tool for checking PAN-Aadhaar link status. """ def __init__(self, api_client: KYCAPIClient): self.api_client = api_client def get_name(self) -> str: return "check_pan_aadhaar_link" @track_tool_metrics @cached(cache_manager, metadata_manager) async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Check if PAN and Aadhaar are linked. Args: params: Dictionary containing: - pan: Individual PAN number - aadhaar_number: 12-digit Aadhaar number - consent: User consent - reason: Reason for checking Returns: Link status with descriptive message """ # Validate input request = PANAadhaarLinkRequest(**params) # Prepare API request api_payload = { "@entity": "in.co.sandbox.kyc.pan_aadhaar.status", "pan": request.pan, "aadhaar_number": request.aadhaar_number, "consent": request.consent, "reason": request.reason } # Call API response = await self.api_client.post( endpoint="/kyc/pan-aadhaar/status", data=api_payload ) # Parse response result = PANAadhaarLinkResponse(**response['data']) return { "linked": result.aadhaar_seeding_status == "y", "status": result.aadhaar_seeding_status, "message": result.message, "checked_at": response['timestamp'] } def get_cache_key(self, params: Dict[str, Any]) -> str: """Generate cache key from PAN and Aadhaar.""" key_data = f"{params['pan']}|{params['aadhaar_number']}" return f"pan_aadhaar:{hashlib.sha256(key_data.encode()).hexdigest()}" ``` ### 13.2 Aadhaar Offline eKYC Tool ```python # tools/aadhaar_okyc.py from typing import Dict, Any from .base_tool import BaseTool from clients.kyc_api_client import KYCAPIClient from utils.metrics import track_tool_metrics class AadhaarOKYCTool(BaseTool): """ Tool for Aadhaar offline eKYC verification (OTP-based). Implements two-step process: generate OTP, then verify. """ def __init__(self, api_client: KYCAPIClient): self.api_client = api_client def get_name(self) -> str: return "aadhaar_okyc_generate_otp" @track_tool_metrics async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Generate OTP for Aadhaar verification. Args: params: Dictionary containing: - aadhaar_number: 12-digit Aadhaar number - consent: User consent - reason: Reason for verification Returns: Reference ID for OTP verification """ api_payload = { "@entity": "in.co.sandbox.kyc.aadhaar.okyc.otp.request", "aadhaar_number": params['aadhaar_number'], "consent": params['consent'], "reason": params['reason'] } response = await self.api_client.post( endpoint="/kyc/aadhaar/okyc/otp", data=api_payload ) return { "reference_id": response['data']['reference_id'], "message": response['data']['message'], "transaction_id": response['transaction_id'] } class AadhaarOKYCVerifyTool(BaseTool): """Tool for verifying Aadhaar OTP and retrieving eKYC data.""" def __init__(self, api_client: KYCAPIClient): self.api_client = api_client def get_name(self) -> str: return "aadhaar_okyc_verify_otp" @track_tool_metrics async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Verify OTP and retrieve Aadhaar eKYC data. Args: params: Dictionary containing: - reference_id: Reference ID from OTP generation - otp: 6-digit OTP Returns: Complete Aadhaar eKYC data """ api_payload = { "@entity": "in.co.sandbox.kyc.aadhaar.okyc.request", "reference_id": str(params['reference_id']), "otp": params['otp'] } response = await self.api_client.post( endpoint="/kyc/aadhaar/okyc/otp/verify", data=api_payload ) data = response['data'] # Handle different response scenarios if 'message' in data and data['message'] in ['Invalid OTP', 'OTP Expired', 'Invalid Reference Id']: return { "status": "failed", "message": data['message'] } # Successful verification return { "status": "success", "reference_id": data['reference_id'], "aadhaar_status": data['status'], "name": data['name'], "date_of_birth": data['date_of_birth'], "gender": data['gender'], "address": data['address'], "care_of": data.get('care_of'), "photo": data.get('photo'), # Base64 encoded "email_hash": data.get('email_hash'), "mobile_hash": data.get('mobile_hash'), "share_code": data.get('share_code') } ``` --- ## 14. System Architecture Diagram ```mermaid graph TB subgraph "Client Layer" A[MCP Client] end subgraph "MCP Server Container" B[MCP Protocol Handler] C[Tool Registry] D[Metadata Manager] E[Rate Limiter] F[Cache Manager] G[Auth Manager] end subgraph "Tool Layer" H[PAN Verification Tool] I[PAN-Aadhaar Link Tool] J[Aadhaar OKYC Tool] K[Bank Verification Tool] L[DigiLocker Tool] M[MCA Search Tool] end subgraph "Integration Layer" N[KYC API Client] O[Retry Handler] P[Error Mapper] end subgraph "External Services" Q[KYC API] R[Redis Cache] S[Prometheus] end A -->|MCP Protocol| B B --> C C --> D B --> E B --> F B --> G C --> H C --> I C --> J C --> K C --> L C --> M H --> N I --> N J --> N K --> N L --> N M --> N N --> O N --> P O --> Q F --> R B --> S ``` --- ## 15. Data Flow Diagram ```mermaid sequenceDiagram participant Client as MCP Client participant Server as MCP Server participant Registry as Tool Registry participant Cache as Redis Cache participant RateLimit as Rate Limiter participant Tool as Tool Instance participant API as KYC API Client->>Server: call_tool(verify_pan, params) Server->>RateLimit: check_limit(verify_pan) RateLimit-->>Server: allowed Server->>Registry: get_tool(verify_pan) Registry-->>Server: tool_instance Server->>Cache: get(cache_key) Cache-->>Server: null (cache miss) Server->>Tool: execute(params) Tool->>Tool: validate_input(params) Tool->>API: POST /kyc/pan/verify API-->>Tool: verification_result Tool-->>Server: formatted_result Server->>Cache: set(cache_key, result, ttl) Cache-->>Server: ok Server-->>Client: result ``` --- ## 16. Deployment Architecture on EC2 ```mermaid graph TB subgraph "AWS Cloud" subgraph "VPC" subgraph "Public Subnet" A[Application Load Balancer] B[NAT Gateway] end subgraph "Private Subnet" C[EC2 Instance] D[Docker: MCP Server] E[Docker: Redis] F[Docker: Prometheus] end end G[CloudWatch Logs] H[CloudWatch Metrics] I[S3 Backup] end J[Internet] --> A A --> C C --> D C --> E C --> F D --> B B --> J D --> G D --> H E --> I style D fill:#4CAF50 style E fill:#2196F3 style F fill:#FF9800 ``` ### 16.1 EC2 Instance Specifications **Recommended Instance Type:** `t3.medium` or `t3.large` - **vCPUs:** 2-4 - **Memory:** 4-8 GB - **Network:** Up to 5 Gbps - **Storage:** 30-50 GB EBS (gp3) **Security Group Rules:** - Inbound: Port 443 (HTTPS) from ALB - Inbound: Port 9090 (Metrics) from monitoring subnet - Outbound: Port 443 (HTTPS) to KYC API - Outbound: Port 6379 (Redis) internal only --- ## 17. Security Considerations ### 17.1 Security Layers ```python # server/middleware.py from typing import Callable from functools import wraps import time class SecurityMiddleware: """Security middleware for request validation and protection.""" def __init__(self, auth_manager, rate_limiter): self.auth_manager = auth_manager self.rate_limiter = rate_limiter def require_auth(self, func: Callable): """Decorator to require authentication.""" @wraps(func) async def wrapper(*args, **kwargs): # Extract and validate JWT token token = kwargs.get('token') if not token: raise AuthenticationError("Missing authentication token") payload = self.auth_manager.validate_token(token) kwargs['user_context'] = payload return await func(*args, **kwargs) return wrapper def rate_limit(self, func: Callable): """Decorator to enforce rate limiting.""" @wraps(func) async def wrapper(*args, **kwargs): tool_name = kwargs.get('tool_name') if not await self.rate_limiter.check_limit(tool_name): retry_after = self.rate_limiter.get_retry_after(tool_name) raise RateLimitError( f"Rate limit exceeded. Retry after {retry_after} seconds", details={'retry_after': retry_after} ) return await func(*args, **kwargs) return wrapper @staticmethod def sanitize_input(data: dict) -> dict: """Sanitize input data to prevent injection attacks.""" sanitized = {} for key, value in data.items(): if isinstance(value, str): # Remove potentially dangerous characters sanitized[key] = value.strip() else: sanitized[key] = value return sanitized ``` ### 17.2 Security Best Practices 1. **Secrets Management:** - Store API keys and JWT secrets in AWS Secrets Manager - Rotate credentials regularly - Never commit secrets to version control 2. **Network Security:** - Use VPC with private subnets - Implement security groups with least privilege - Enable VPC Flow Logs 3. **Data Protection:** - Encrypt data in transit (TLS 1.3) - Encrypt Redis data at rest - Implement PII data masking in logs 4. **Access Control:** - Use IAM roles for EC2 instance - Implement JWT-based authentication - Validate all input parameters --- ## 18. Monitoring & Alerting ### 18.1 Key Metrics to Monitor ```python # Prometheus metrics configuration # prometheus.yml global: scrape_interval: 15s evaluation_interval: 15s scrape_configs: - job_name: 'kyc-mcp-server' static_configs: - targets: ['kyc-mcp-server:9090'] metrics_path: '/metrics' alerting: alertmanagers: - static_configs: - targets: ['alertmanager:9093'] rule_files: - 'alerts.yml' ``` ### 18.2 Alert Rules ```yaml # alerts.yml groups: - name: kyc_mcp_alerts interval: 30s rules: - alert: HighErrorRate expr: rate(mcp_tool_requests_total{status="error"}[5m]) > 0.1 for: 5m labels: severity: warning annotations: summary: "High error rate detected" description: "Error rate is {{ $value }} errors/sec" - alert: RateLimitExceeded expr: rate(mcp_rate_limit_exceeded_total[5m]) > 10 for: 2m labels: severity: warning annotations: summary: "Rate limit frequently exceeded" - alert: CacheMissRateHigh expr: | rate(mcp_cache_misses_total[5m]) / (rate(mcp_cache_hits_total[5m]) + rate(mcp_cache_misses_total[5m])) > 0.8 for: 10m labels: severity: info annotations: summary: "Cache miss rate is high" - alert: APIResponseTimeSlow expr: histogram_quantile(0.95, rate(mcp_tool_request_duration_seconds_bucket[5m])) > 5 for: 5m labels: severity: warning annotations: summary: "API response time is slow" description: "95th percentile response time is {{ $value }}s" ``` --- ## 19. Testing Strategy ### 19.1 Unit Test Example ```python # tests/unit/test_pan_verification.py import pytest from unittest.mock import AsyncMock, MagicMock from tools.pan_verification import PANVerificationTool from models.responses import PANVerificationResponse @pytest.fixture def mock_api_client(): """Create mock API client.""" client = AsyncMock() return client @pytest.fixture def pan_tool(mock_api_client): """Create PAN verification tool with mock client.""" return PANVerificationTool(mock_api_client) @pytest.mark.asyncio async def test_pan_verification_success(pan_tool, mock_api_client): """Test successful PAN verification.""" # Arrange params = { "pan": "XXXPX1234A", "name_as_per_pan": "John Doe", "date_of_birth": "01/01/1990", "consent": "Y", "reason": "Testing" } mock_response = { "data": { "pan": "XXXPX1234A", "category": "individual", "status": "valid", "remarks": None, "name_as_per_pan_match": True, "date_of_birth_match": True, "aadhaar_seeding_status": "y" }, "timestamp": 1234567890 } mock_api_client.post.return_value = mock_response # Act result = await pan_tool.execute(params) # Assert assert result['pan'] == "XXXPX1234A" assert result['status'] == "valid" assert result['name_match'] is True assert result['dob_match'] is True mock_api_client.post.assert_called_once() @pytest.mark.asyncio async def test_pan_verification_invalid_format(pan_tool): """Test PAN verification with invalid format.""" params = { "pan": "INVALID", "name_as_per_pan": "John Doe", "date_of_birth": "01/01/1990", "consent": "Y", "reason": "Testing" } with pytest.raises(ValidationError): await pan_tool.execute(params) ``` ### 19.2 Integration Test Example ```python # tests/integration/test_api_integration.py import pytest from clients.kyc_api_client import KYCAPIClient from config.settings import get_settings @pytest.fixture async def api_client(): """Create real API client for integration tests.""" settings = get_settings() client = KYCAPIClient( base_url=settings.KYC_API_BASE_URL, api_key=settings.KYC_API_KEY, jwt_token="test_token", timeout=30 ) yield client await client.close() @pytest.mark.integration @pytest.mark.asyncio async def test_pan_verification_api(api_client): """Test actual PAN verification API call.""" payload = { "@entity": "in.co.sandbox.kyc.pan_verification.request", "pan": "XXXPX1234A", "name_as_per_pan": "John Doe", "date_of_birth": "01/01/1990", "consent": "Y", "reason": "Integration Testing" } response = await api_client.post("/kyc/pan/verify", payload) assert response['code'] == 200 assert 'data' in response assert response['data']['pan'] == "XXXPX1234A" ``` --- ## 20. Performance Optimization ### 20.1 Connection Pooling ```python # clients/base_client.py import httpx from typing import Optional class BaseHTTPClient: """Base HTTP client with connection pooling.""" def __init__( self, base_url: str, timeout: int = 30, max_connections: int = 100, max_keepalive_connections: int = 20 ): limits = httpx.Limits( max_connections=max_connections, max_keepalive_connections=max_keepalive_connections ) self.client = httpx.AsyncClient( base_url=base_url, timeout=timeout, limits=limits, http2=True # Enable HTTP/2 for better performance ) async def close(self): """Close client and release connections.""" await self.client.aclose() ``` ### 20.2 Batch Processing Support ```python # tools/batch_processor.py from typing import List, Dict, Any import asyncio class BatchProcessor: """ Process multiple tool requests in parallel with concurrency control. """ def __init__(self, max_concurrent: int = 10): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def process_batch( self, tool: 'BaseTool', requests: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """ Process multiple requests in parallel. Args: tool: Tool instance to execute requests: List of request parameters Returns: List of results in same order as requests """ async def process_one(params): async with self.semaphore: try: return await tool.execute(params) except Exception as e: return {"error": str(e), "params": params} tasks = [process_one(req) for req in requests] results = await asyncio.gather(*tasks, return_exceptions=True) return results ``` --- ## 21. API Catalog ### 21.1 Available Tools Summary | Tool Name | API Endpoint | Method | Description | Cache TTL | |-----------|-------------|--------|-------------|-----------| | `verify_pan` | `/kyc/pan/verify` | POST | Verify PAN with name/DOB matching | 1 hour | | `check_pan_aadhaar_link` | `/kyc/pan-aadhaar/status` | POST | Check PAN-Aadhaar link status | 2 hours | | `aadhaar_okyc_generate_otp` | `/kyc/aadhaar/okyc/otp` | POST | Generate Aadhaar OTP | No cache | | `aadhaar_okyc_verify_otp` | `/kyc/aadhaar/okyc/otp/verify` | POST | Verify Aadhaar OTP & get eKYC | No cache | | `verify_ifsc` | `/bank/:ifsc` | GET | Verify IFSC code | 24 hours | | `verify_bank_account_penny_drop` | `/bank/:ifsc/accounts/:account/verify` | GET | Penny drop verification | 1 hour | | `verify_bank_account_penniless` | `/bank/:ifsc/accounts/:account/penniless-verify` | GET | Penniless verification | 1 hour | | `digilocker_init_session` | `/kyc/digilocker/sessions/init` | POST | Initialize DigiLocker session | No cache | | `digilocker_get_status` | `/kyc/digilocker/sessions/:id/status` | GET | Get session status | No cache | | `digilocker_fetch_document` | `/kyc/digilocker/sessions/:id/documents/:type` | GET | Fetch document | No cache | | `mca_company_search` | `/mca/company/master-data/search` | POST | Search company by CIN/LLPIN | 6 hours | | `mca_director_search` | `/mca/director/master-data/search` | POST | Search director by DIN | 6 hours | --- ## 22. Scalability Considerations ### 22.1 Horizontal Scaling ```yaml # docker/docker-compose.scale.yml version: '3.8' services: kyc-mcp-server: build: context: .. dockerfile: docker/Dockerfile deploy: replicas: 3 # Run 3 instances resources: limits: cpus: '1' memory: 2G reservations: cpus: '0.5' memory: 1G environment: - KYC_API_BASE_URL=${KYC_API_BASE_URL} - REDIS_HOST=redis depends_on: - redis networks: - kyc-network nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro depends_on: - kyc-mcp-server networks: - kyc-network redis: image: redis:7-alpine command: redis-server --appendonly yes volumes: - redis-data:/data networks: - kyc-network networks: kyc-network: driver: bridge volumes: redis-data: ``` ### 22.2 Load Balancing Configuration ```nginx # deployment/nginx.conf upstream mcp_servers { least_conn; server kyc-mcp-server-1:9090; server kyc-mcp-server-2:9090; server kyc-mcp-server-3:9090; } server { listen 80; server_name _; location / { proxy_pass http://mcp_servers; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # Timeouts proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; # Health check proxy_next_upstream error timeout http_500 http_502 http_503; } location /health { access_log off; return 200 "healthy\n"; add_header Content-Type text/plain; } location /metrics { proxy_pass http://mcp_servers; allow 10.0.0.0/8; # Internal network only deny all; } } ``` --- ## 23. Development Workflow ### 23.1 Local Development Setup ```bash # Setup virtual environment python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate # Install dependencies pip install -e ".[dev]" # Setup pre-commit hooks pre-commit install # Copy environment template cp .env.example .env # Start local Redis docker run -d -p 6379:6379 redis:7-alpine # Run tests pytest tests/ -v --cov=src # Run server locally python -m src.main ``` ### 23.2 CI/CD Pipeline (GitHub Actions Example) ```yaml # .github/workflows/ci.yml name: CI/CD Pipeline on: push: branches: [main, develop] pull_request: branches: [main] jobs: test: runs-on: ubuntu-latest services: redis: image: redis:7-alpine ports: - 6379:6379 steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | pip install -e ".[dev]" - name: Run linters run: | black --check src/ ruff check src/ mypy src/ - name: Run tests run: | pytest tests/ -v --cov=src --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v3 build: needs: test runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3 - name: Build Docker image run: | docker build -f docker/Dockerfile -t kyc-mcp-server:latest . - name: Push to registry run: | echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin docker push kyc-mcp-server:latest deploy: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to EC2 uses: appleboy/ssh-action@master with: host: ${{ secrets.EC2_HOST }} username: ${{ secrets.EC2_USER }} key: ${{ secrets.EC2_SSH_KEY }} script: | cd /opt/kyc-mcp-server git pull docker-compose -f docker/docker-compose.yml pull docker-compose -f docker/docker-compose.yml up -d ``` --- ## 24. Extensibility & Future Enhancements ### 24.1 Adding New Tools To add a new KYC API tool: 1. **Create tool class** in `src/tools/new_tool.py`: ```python from .base_tool import BaseTool class NewTool(BaseTool): def get_name(self) -> str: return "new_tool_name" async def execute(self, params): # Implementation pass ``` 2. **Add metadata** to `config/metadata.json`: ```json { "name": "new_tool_name", "description": "Tool description", "api_endpoint": "/api/endpoint", "input_schema": { ... }, "output_schema": { ... } } ``` 3. **Tool auto-registers ** on server restart via the Auto Tool Registry 4. **Restart server** - Tool is immediately available ### 24.2 Plugin Architecture ```python # registry/plugin_loader.py from typing import List, Type import importlib.util from pathlib import Path class PluginLoader: """ Load external tool plugins from plugins directory. Enables third-party tool extensions. """ def __init__(self, plugins_dir: Path): self.plugins_dir = plugins_dir def load_plugins(self) -> List[Type['BaseTool']]: """Load all plugin tools.""" plugins = [] if not self.plugins_dir.exists(): return plugins for plugin_file in self.plugins_dir.glob("*.py"): if plugin_file.name.startswith("_"): continue spec = importlib.util.spec_from_file_location( f"plugins.{plugin_file.stem}", plugin_file ) if spec and spec.loader: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Find tool classes in module for attr_name in dir(module): attr = getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, BaseTool) and attr is not BaseTool): plugins.append(attr) return plugins ``` ### 24.3 Webhook Support ```python # server/webhooks.py from typing import Dict, Any, Callable import hmac import hashlib class WebhookManager: """ Manage webhooks for async API responses. Useful for long-running operations like DigiLocker sessions. """ def __init__(self, secret_key: str): self.secret_key = secret_key self.handlers: Dict[str, Callable] = {} def register_handler(self, event_type: str, handler: Callable): """Register webhook event handler.""" self.handlers[event_type] = handler def verify_signature(self, payload: bytes, signature: str) -> bool: """Verify webhook signature.""" expected = hmac.new( self.secret_key.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature) async def handle_webhook( self, event_type: str, payload: Dict[str, Any] ) -> Dict[str, Any]: """Process incoming webhook.""" handler = self.handlers.get(event_type) if not handler: raise ValueError(f"No handler for event type: {event_type}") return await handler(payload) ``` --- ## 25. Documentation & API Reference ### 25.1 Tool Documentation Template Each tool should include comprehensive documentation: ```python """ Tool: verify_pan Category: KYC > PAN Verification Version: 1.0.0 Description: Verifies PAN card details by matching the provided name and date of birth against government records. Returns validation status, category, and Aadhaar seeding status. Input Parameters: - pan (string, required): 10-character PAN number Format: XXXXX9999X (5 letters, 4 digits, 1 letter) Example: "ABCDE1234F" - name_as_per_pan (string, required): Full name as per PAN card Max length: 100 characters Example: "John Ronald Doe" - date_of_birth (string, required): Date of birth Format: DD/MM/YYYY Example: "15/08/1990" - consent (string, required): User consent Allowed values: "Y", "y" - reason (string, required): Reason for verification Min length: 1 character Example: "Customer onboarding" Output: { "pan": "ABCDE1234F", "category": "individual", "status": "valid", "remarks": null, "name_match": true, "dob_match": true, "aadhaar_seeding_status": "y", "verified_at": 1234567890 } Error Codes: - INVALID_INPUT: Invalid PAN format or missing required fields - CONSENT_REQUIRED: User consent not provided - SERVICE_UNAVAILABLE: KYC service temporarily unavailable - RATE_LIMIT_EXCEEDED: Too many requests Rate Limits: - 60 requests per minute - 1000 requests per hour Caching: - Enabled: Yes - TTL: 3600 seconds (1 hour) - Cache key: SHA256(pan|name|dob) Examples: # Successful verification Input: { "pan": "XXXPX1234A", "name_as_per_pan": "John Doe", "date_of_birth": "01/01/1990", "consent": "Y", "reason": "KYC verification" } Output: { "pan": "XXXPX1234A", "category": "individual", "status": "valid", "name_match": true, "dob_match": true, "aadhaar_seeding_status": "y" } """ ``` --- ## 26. Implementation Roadmap ### Phase 1: Foundation (Week 1-2) - [x] Design architecture - [ ] Setup project structure - [ ] Implement base classes (BaseTool, BaseClient) - [ ] Setup configuration management - [ ] Implement metadata manager - [ ] Setup logging and error handling ### Phase 2: Core Features (Week 3-4) - [ ] Implement Auto Tool Registry - [ ] Implement KYC API client with retry logic - [ ] Implement authentication manager - [ ] Implement Redis caching layer - [ ] Implement rate limiter - [ ] Setup unit tests ### Phase 3: Tool Implementation (Week 5-6) - [ ] Implement PAN verification tool - [ ] Implement PAN-Aadhaar link tool - [ ] Implement Aadhaar OKYC tools - [ ] Implement bank verification tools - [ ] Implement DigiLocker tools - [ ] Implement MCA search tools ### Phase 4: Integration & Testing (Week 7) - [ ] Integration testing with real APIs - [ ] Performance testing - [ ] Security audit - [ ] Load testing - [ ] Documentation completion ### Phase 5: Deployment (Week 8) - [ ] Docker image optimization - [ ] EC2 instance setup - [ ] Deploy to staging environment - [ ] Production deployment - [ ] Monitoring setup - [ ] Alert configuration --- ## 27. Key Design Decisions ### 27.1 Why Python? 1. **MCP SDK Support:** Official Python SDK available 2. **Async/Await:** Native async support for high concurrency 3. **Rich Ecosystem:** Excellent libraries for HTTP, validation, caching 4. **Type Safety:** Pydantic provides runtime type checking 5. **Developer Productivity:** Fast development and iteration ### 27.2 Why Redis for Caching? 1. **Performance:** In-memory storage with microsecond latency 2. **TTL Support:** Built-in expiration for cache entries 3. **Persistence:** Optional data persistence 4. **Scalability:** Easy to scale horizontally 5. **Pattern Matching:** Efficient cache invalidation ### 27.3 Why Auto Tool Registry? 1. **Maintainability:** Add tools without modifying core server code 2. **Scalability:** Easy to add new APIs as business grows 3. **Consistency:** Enforces standard tool interface 4. **Discovery:** Automatic tool detection and registration 5. **Versioning:** Support multiple tool versions simultaneously --- ## 28. API Response Patterns ### 28.1 Standard Response Format All tools return responses in this format: ```json { "success": true, "data": { // Tool-specific response data }, "metadata": { "tool_name": "verify_pan", "execution_time_ms": 245, "cached": false, "api_transaction_id": "uuid-here", "timestamp": 1234567890 } } ``` ### 28.2 Error Response Format ```json { "success": false, "error": { "code": "INVALID_INPUT", "message": "The PAN format is invalid", "details": { "field": "pan", "provided_value": "INVALID", "expected_format": "XXXXX9999X" } }, "metadata": { "tool_name": "verify_pan", "timestamp": 1234567890 } } ``` --- ## 29. Configuration Examples ### 29.1 Development Configuration ```bash # .env.development KYC_API_BASE_URL=https://sandbox.co.in KYC_API_KEY=dev_api_key KYC_JWT_SECRET=dev_secret_key REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD= CACHE_ENABLED=true CACHE_DEFAULT_TTL=300 # 5 minutes for dev RATE_LIMIT_ENABLED=false # Disabled for dev LOG_LEVEL=DEBUG ENABLE_METRICS=true METRICS_PORT=9090 ``` ### 29.2 Production Configuration ```bash # .env.production KYC_API_BASE_URL=https://api.production.co.in KYC_API_KEY=${AWS_SECRETS_MANAGER:kyc-api-key} KYC_JWT_SECRET=${AWS_SECRETS_MANAGER:kyc-jwt-secret} REDIS_HOST=redis-cluster.internal REDIS_PORT=6379 REDIS_PASSWORD=${AWS_SECRETS_MANAGER:redis-password} REDIS_SSL=true CACHE_ENABLED=true CACHE_DEFAULT_TTL=3600 RATE_LIMIT_ENABLED=true RATE_LIMIT_PER_MINUTE=60 RATE_LIMIT_PER_HOUR=1000 LOG_LEVEL=INFO ENABLE_METRICS=true METRICS_PORT=9090 MAX_RETRIES=3 REQUEST_TIMEOUT=30 ``` --- ## 30. Disaster Recovery & Backup ### 30.1 Backup Strategy ```python # scripts/backup_redis.py import asyncio import boto3 from datetime import datetime from redis import Redis async def backup_redis_to_s3(): """ Backup Redis data to S3 for disaster recovery. """ redis_client = Redis(host='localhost', port=6379) s3_client = boto3.client('s3') # Trigger Redis BGSAVE redis_client.bgsave() # Wait for save to complete while redis_client.lastsave() < datetime.now().timestamp(): await asyncio.sleep(1) # Upload dump.rdb to S3 backup_name = f"redis-backup-{datetime.now().isoformat()}.rdb" s3_client.upload_file( '/var/lib/redis/dump.rdb', 'kyc-mcp-backups', backup_name ) print(f"Backup completed: {backup_name}") if __name__ == "__main__": asyncio.run(backup_redis_to_s3()) ``` ### 30.2 Recovery Procedure 1. **Stop the MCP server** 2. **Download backup from S3** 3. **Restore Redis data** 4. **Restart services** 5. **Verify functionality** --- ## 31. Performance Benchmarks ### 31.1 Expected Performance Metrics | Metric | Target | Measurement | |--------|--------|-------------| | Tool execution time (cached) | < 10ms | p95 | | Tool execution time (uncached) | < 500ms | p95 | | Cache hit rate | > 70% | Average | | API success rate | > 99.5% | Average | | Concurrent requests | 100+ | Sustained | | Memory usage | < 1GB | Per instance | | CPU usage | < 50% | Average | ### 31.2 Load Testing ```python # tests/load/test_performance.py import asyncio import time from locust import User, task, between from locust.contrib.fasthttp import FastHttpUser class MCPUser(FastHttpUser): """Load test user for MCP server.""" wait_time = between(1, 3) @task(3) def verify_pan(self): """Test PAN verification endpoint.""" self.client.post("/tools/verify_pan", json={ "pan": "XXXPX1234A", "name_as_per_pan": "John Doe", "date_of_birth": "01/01/1990", "consent": "Y", "reason": "Load testing" }) @task(2) def check_pan_aadhaar(self): """Test PAN-Aadhaar link check.""" self.client.post("/tools/check_pan_aadhaar_link", json={ "pan": "XXXPX1234A", "aadhaar_number": "123456789012", "consent": "Y", "reason": "Load testing" }) @task(1) def verify_ifsc(self): """Test IFSC verification.""" self.client.get("/tools/verify_ifsc?ifsc=SBIN0021745") ``` --- ## 32. Compliance & Audit ### 32.1 Audit Logging ```python # utils/audit_logger.py import json from datetime import datetime from typing import Dict, Any import hashlib class AuditLogger: """ Log all KYC operations for compliance and audit purposes. """ def __init__(self, log_file: str = "/var/log/kyc-audit.log"): self.log_file = log_file async def log_operation( self, tool_name: str, user_id: str, input_params: Dict[str, Any], result: Dict[str, Any], success: bool ): """ Log KYC operation with PII masking. """ audit_entry = { "timestamp": datetime.utcnow().isoformat(), "tool": tool_name, "user_id": user_id, "input_hash": self._hash_sensitive_data(input_params), "success": success, "result_hash": self._hash_sensitive_data(result), "ip_address": "masked", # Get from request context } with open(self.log_file, 'a') as f: f.write(json.dumps(audit_entry) + '\n') @staticmethod def _hash_sensitive_data(data: Dict[str, Any]) -> str: """Hash sensitive data for audit trail.""" data_str = json.dumps(data, sort_keys=True) return hashlib.sha256(data_str.encode()).hexdigest() ``` ### 32.2 Data Retention Policy ```python # utils/data_retention.py from datetime import datetime, timedelta import asyncio class DataRetentionManager: """ Manage data retention policies for cached data and logs. """ def __init__(self, cache_manager, audit_logger): self.cache_manager = cache_manager self.audit_logger = audit_logger async def cleanup_expired_cache(self): """Remove expired cache entries (handled by Redis TTL).""" # Redis handles TTL automatically, but we can add custom logic pass async def archive_old_logs(self, days: int = 90): """Archive audit logs older than specified days.""" cutoff_date = datetime.now() - timedelta(days=days) # Implementation depends on log storage strategy # Could move to S3 Glacier for long-term storage pass async def run_cleanup_job(self): """Run periodic cleanup job.""" while True: await self.cleanup_expired_cache() await self.archive_old_logs() await asyncio.sleep(86400) # Run daily ``` --- ## 33. Troubleshooting Guide ### 33.1 Common Issues & Solutions | Issue | Symptoms | Solution | |-------|----------|----------| | High latency | Response time > 2s | Check Redis connection, API endpoint health | | Cache misses | Cache hit rate < 50% | Review cache key generation, increase TTL | | Rate limit errors | 429 responses | Implement request queuing, increase limits | | Authentication failures | 401/403 errors | Refresh JWT token, verify API key | | Memory leaks | Increasing memory usage | Check for unclosed connections, review cache size | ### 33.2 Debug Mode ```python # config/settings.py (add to Settings class) class Settings(BaseSettings): # ... existing settings ... # Debug Configuration DEBUG_MODE: bool = False DEBUG_LOG_REQUESTS: bool = False DEBUG_LOG_RESPONSES: bool = False DEBUG_DISABLE_CACHE: bool = False DEBUG_DISABLE_RATE_LIMIT: bool = False def is_debug(self) -> bool: """Check if running in debug mode.""" return self.DEBUG_MODE ``` --- ## 34. Cost Optimization ### 34.1 Cost Factors 1. **EC2 Instance:** ~$30-60/month (t3.medium) 2. **Data Transfer:** ~$10-20/month 3. **EBS Storage:** ~$5/month (50GB) 4. **CloudWatch:** ~$5-10/month 5. **Total Estimated:** ~$50-95/month ### 34.2 Optimization Strategies 1. **Use Reserved Instances:** Save 30-50% on EC2 costs 2. **Implement Aggressive Caching:** Reduce API calls 3. **Use Spot Instances:** For non-critical workloads 4. **Optimize Docker Images:** Reduce storage and transfer costs 5. **Monitor and Right-Size:** Adjust instance type based on actual usage --- ## 35. Migration & Versioning ### 35.1 API Version Management ```python # registry/version_manager.py from typing import Dict, List from packaging import version class VersionManager: """ Manage multiple versions of tools for backward compatibility. """ def __init__(self): self.tool_versions: Dict[str, List[str]] = {} def register_version(self, tool_name: str, tool_version: str): """Register a tool version.""" if tool_name not in self.tool_versions: self.tool_versions[tool_name] = [] self.tool_versions[tool_name].append(tool_version) self.tool_versions[tool_name].sort(key=version.parse, reverse=True) def get_latest_version(self, tool_name: str) -> str: """Get the latest version of a tool.""" versions = self.tool_versions.get(tool_name, []) return versions[0] if versions else "1.0.0" def get_tool_name_with_version( self, tool_name: str, requested_version: str = None ) -> str: """ Get versioned tool name. Examples: verify_pan -> verify_pan_v1 verify_pan@2.0.0 -> verify_pan_v2 """ if requested_version: major_version = version.parse(requested_version).major return f"{tool_name}_v{major_version}" latest = self.get_latest_version(tool_name) major_version = version.parse(latest).major return f"{tool_name}_v{major_version}" ``` --- ## 36. Summary & Next Steps ### 36.1 Architecture Highlights ✅ **Modular Design:** Clean separation of concerns with pluggable components ✅ **Auto Tool Registry:** Zero-config tool registration from metadata ✅ **Advanced Caching:** Redis-based caching with intelligent invalidation ✅ **Rate Limiting:** Per-tool rate limits with token bucket algorithm ✅ **Error Handling:** Comprehensive error mapping from API to MCP errors ✅ **Monitoring:** Prometheus metrics and structured logging ✅ **Security:** JWT authentication, input validation, PII protection ✅ **Scalability:** Horizontal scaling with load balancing support ✅ **Docker Ready:** Multi-stage builds for optimized images ✅ **Cloud Native:** Designed for EC2 deployment with AWS integration ### 36.2 Implementation Checklist **Before Starting Implementation:** - [ ] Review and approve architecture design - [ ] Setup AWS account and EC2 instance - [ ] Obtain KYC API credentials - [ ] Setup development environment - [ ] Create GitHub repository **During Implementation:** - [ ] Follow the implementation roadmap (Phase 1-5) - [ ] Write tests for each component - [ ] Document all APIs and tools - [ ] Setup CI/CD pipeline - [ ] Configure monitoring and alerts **After Implementation:** - [ ] Conduct security audit - [ ] Perform load testing - [ ] Deploy to staging environment - [ ] User acceptance testing - [ ] Production deployment - [ ] Monitor and optimize ### 36.3 Success Criteria 1. **Functionality:** All 12 KYC tools working correctly 2. **Performance:** p95 response time < 500ms 3. **Reliability:** 99.9% uptime 4. **Scalability:** Handle 100+ concurrent requests 5. **Security:** Pass security audit 6. **Maintainability:** New tools can be added in < 1 hour --- ## 37. References & Resources ### 37.1 Documentation Links - **MCP Protocol:** https://modelcontextprotocol.io/ - **MCP Python SDK:** https://github.com/modelcontextprotocol/python-sdk - **Pydantic:** https://docs.pydantic.dev/ - **httpx:** https://www.python-httpx.org/ - **Redis:** https://redis.io/docs/ - **Prometheus:** https://prometheus.io/docs/ ### 37.2 API Documentation - **KYC API Docs:** (From Postman collection) - **Sandbox Environment:** https://sandbox.co.in ### 37.3 Contact & Support - **Architecture Questions:** Contact technical lead - **API Issues:** Contact KYC API support - **Deployment Issues:** Contact DevOps team --- ## Appendix A: Complete Metadata Schema Example ```json { "version": "1.0.0", "server": { "name": "kyc-mcp-server", "description": "MCP server for comprehensive KYC API integration", "protocol_version": "2024-11-05", "supported_features": [ "tools", "caching", "rate_limiting", "authentication" ] }, "tools": [ { "name": "verify_pan", "version": "1.0.0", "description": "Verify PAN card details with name and date of birth matching. Returns validation status, category, and Aadhaar seeding status.", "category": "kyc", "subcategory": "pan", "api_endpoint": "/kyc/pan/verify", "http_method": "POST", "authentication": { "type": "jwt_and_api_key", "headers": ["Authorization", "x-api-key"] }, "rate_limit": { "requests_per_minute": 60, "requests_per_hour": 1000, "burst_size": 10 }, "cache": { "enabled": true, "ttl_seconds": 3600, "cache_key_fields": ["pan", "name_as_per_pan", "date_of_birth"], "invalidate_on": ["pan_update", "user_request"] }, "input_schema": { "type": "object", "required": ["pan", "name_as_per_pan", "date_of_birth", "consent", "reason"], "properties": { "pan": { "type": "string", "pattern": "^[A-Z]{5}[0-9]{4}[A-Z]$", "description": "10-character PAN number (5 letters, 4 digits, 1 letter)", "example": "XXXPX1234A" }, "name_as_per_pan": { "type": "string", "minLength": 1, "maxLength": 100, "description": "Full name as per PAN card" }, "date_of_birth": { "type": "string", "pattern": "^\\d{2}/\\d{2}/\\d{4}$", "description": "Date of birth in DD/MM/YYYY format", "example": "15/08/1990" }, "consent": { "type": "string", "enum": ["Y", "y"], "description": "User consent (must be 'Y' or 'y')" }, "reason": { "type": "string", "minLength": 1, "maxLength": 200, "description": "Reason for verification" } } }, "output_schema": { "type": "object", "properties": { "pan": { "type": "string", "description": "Verified PAN number" }, "category": { "type": "string", "enum": ["individual", "company", "trust", "association_of_persons", "body_of_individuals", "firm"], "description": "PAN holder category" }, "status": { "type": "string", "enum": ["valid", "invalid"], "description": "PAN validation status" }, "remarks": { "type": ["string", "null"], "description": "Additional remarks (e.g., 'Holder is Deceased', 'Liquidated')" }, "name_match": { "type": "boolean", "description": "Whether provided name matches PAN records" }, "dob_match": { "type": "boolean", "description": "Whether provided DOB matches PAN records" }, "aadhaar_seeding_status": { "type": "string", "enum": ["y", "n", "na"], "description": "Aadhaar seeding status (y=seeded, n=not seeded, na=not applicable)" }, "verified_at": { "type": "integer", "description": "Unix timestamp of verification" } } }, "error_mappings": [ { "api_code": 422, "api_message": "Invalid Pan pattern", "mcp_error": "INVALID_INPUT", "user_message": "The PAN format is invalid. Please provide a valid 10-character PAN in format XXXXX9999X.", "retry_able": false }, { "api_code": 422, "api_message": "Invalid parameter: Consent is required", "mcp_error": "CONSENT_REQUIRED", "user_message": "User consent is required for PAN verification. Please provide consent as 'Y'.", "retryable": false }, { "api_code": 503, "api_message": "Source Unavailable", "mcp_error": "SERVICE_UNAVAILABLE", "user_message": "The verification service is temporarily unavailable. Please try again in a few minutes.", "retryable": true } ], "retry_strategy": { "max_attempts": 3, "backoff_multiplier": 2, "initial_delay_seconds": 1, "max_delay_seconds": 10, "retry_on_status_codes": [500, 502, 503, 504], "retry_on_errors": ["SERVICE_UNAVAILABLE", "TIMEOUT"] }, "monitoring": { "track_metrics": true, "alert_on_error_rate": 0.05, "alert_on_latency_p95": 2000 } } ], "global_settings": { "base_url": "${KYC_API_BASE_URL}", "default_timeout": 30, "max_retries": 3, "log_level": "INFO", "enable_metrics": true, "enable_caching": true, "enable_rate_limiting": true } } ``` --- ## Appendix B: Complete Requirements File ```txt # requirements.txt # MCP SDK mcp>=1.0.0 # HTTP Client httpx>=0.27.0 httpx[http2]>=0.27.0 h2>=4.1.0 # Async Support asyncio>=3.4.3 aiofiles>=23.2.1 aioredis>=2.0.1 # Data Validation & Serialization pydantic>=2.5.0 pydantic-settings>=2.1.0 email-validator>=2.1.0 # Caching redis>=5.0.0 hiredis>=2.3.0 # Retry & Resilience tenacity>=8.2.3 # Authentication & Security pyjwt>=2.8.0 cryptography>=41.0.7 python-jose>=3.3.0 # Logging structlog>=24.1.0 python-json-logger>=2.0.7 colorama>=0.4.6 # Monitoring & Metrics prometheus-client>=0.19.0 # Configuration python-dotenv>=1.0.0 pyyaml>=6.0.1 # Utilities python-dateutil>=2.8.2 pytz>=2023.3 ``` ```txt # requirements-dev.txt # Testing pytest>=7.4.3 pytest-asyncio>=0.21.1 pytest-cov>=4.1.0 pytest-mock>=3.12.0 pytest-timeout>=2.2.0 pytest-xdist>=3.5.0 httpx-mock>=0.1.0 # Code Quality black>=23.12.1 ruff>=0.1.9 mypy>=1.7.1 isort>=5.13.2 # Type Stubs types-redis>=4.6.0 types-pyyaml>=6.0.12 # Development Tools ipython>=8.19.0 ipdb>=0.13.13 pre-commit>=3.6.0 # Load Testing locust>=2.20.0 # Documentation mkdocs>=1.5.3 mkdocs-material>=9.5.3 ``` --- ## Appendix C: Quick Start Guide ### Step 1: Clone and Setup ```bash # Clone repository git clone <repository-url> cd kyc-mcp-server # Create virtual environment python3.11 -m venv venv source venv/bin/activate # Install dependencies pip install -e ".[dev]" # Setup environment cp .env.example .env # Edit .env with your credentials ``` ### Step 2: Start Dependencies ``` bash # Start Redis locally docker run -d -p 6379:6379 --name kyc-redis redis:7-alpine # Or use docker-compose docker-compose -f docker/docker-compose.yml up -d redis ``` ### Step 3: Run Tests ```bash # Run all tests pytest tests/ -v # Run with coverage pytest tests/ --cov=src --cov-report=html # Run specific test pytest tests/unit/test_pan_verification.py -v ``` ### Step 4: Run Server ```bash # Development mode python -m src.main # With debug logging LOG_LEVEL=DEBUG python -m src.main # Production mode gunicorn src.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker ``` ### Step 5: Test Tools ```bash # Using MCP client mcp-client connect stdio -- python -m src.main # Or test individual tools curl -X POST http://localhost:9090/tools/verify_pan \ -H "Content-Type: application/json" \ -d '{ "pan": "XXXPX1234A", "name_as_per_pan": "John Doe", "date_of_birth": "01/01/1990", "consent": "Y", "reason": "Testing" }' ``` --- ## Appendix D: Glossary | Term | Definition | |------|------------| | **MCP** | Model Context Protocol - Protocol for AI model-tool integration | | **KYC** | Know Your Customer - Identity verification process | | **PAN** | Permanent Account Number - Indian tax identification | | **Aadhaar** | 12-digit unique identity number for Indian residents | | **OKYC** | Offline eKYC - Aadhaar verification without internet | | **DIN** | Director Identification Number | | **CIN** | Corporate Identification Number | | **LLPIN** | Limited Liability Partnership Identification Number | | **IFSC** | Indian Financial System Code - Bank branch identifier | | **MCA** | Ministry of Corporate Affairs | | **DigiLocker** | Digital document storage service by Govt of India | | **Penny Drop** | Bank account verification by depositing small amount | | **Penniless** | Bank account verification without deposit | | **JWT** | JSON Web Token - Authentication token format | | **TTL** | Time To Live - Cache expiration time | | **p95** | 95th percentile - Performance metric | --- ## Appendix E: API Error Code Reference ### HTTP Status Codes | Code | Meaning | Action | |------|---------|--------| | 200 | Success | Process response | | 400 | Bad Request | Fix request format | | 401 | Unauthorized | Check authentication | | 403 | Forbidden | Check permissions | | 404 | Not Found | Verify endpoint/resource | | 422 | Unprocessable Entity | Fix validation errors | | 429 | Too Many Requests | Implement backoff | | 500 | Internal Server Error | Retry with backoff | | 502 | Bad Gateway | Retry with backoff | | 503 | Service Unavailable | Retry with backoff | | 504 | Gateway Timeout | Increase timeout, retry | ### Custom Error Codes | Code | Description | Retryable | |------|-------------|-----------| | 521 | Data Not Found | No | | 523 | Invalid Session Lifecycle | No | --- ## Appendix F: Environment-Specific Configurations ### Development Environment ```yaml # docker/docker-compose.dev.yml version: '3.8' services: kyc-mcp-server: build: context: .. dockerfile: docker/Dockerfile.dev volumes: - ../src:/app/src:ro - ../config:/app/config:ro environment: - LOG_LEVEL=DEBUG - DEBUG_MODE=true - CACHE_ENABLED=true - RATE_LIMIT_ENABLED=false ports: - "9090:9090" - "5678:5678" # Debugger port command: python -m debugpy --listen 0.0.0.0:5678 -m src.main ``` ### Staging Environment ```yaml # docker/docker-compose.staging.yml version: '3.8' services: kyc-mcp-server: image: kyc-mcp-server:staging environment: - LOG_LEVEL=INFO - CACHE_ENABLED=true - RATE_LIMIT_ENABLED=true - ENABLE_METRICS=true deploy: replicas: 2 resources: limits: cpus: '1' memory: 2G ``` ### Production Environment ```yaml # docker/docker-compose.prod.yml version: '3.8' services: kyc-mcp-server: image: kyc-mcp-server:latest environment: - LOG_LEVEL=WARNING - CACHE_ENABLED=true - RATE_LIMIT_ENABLED=true - ENABLE_METRICS=true deploy: replicas: 3 resources: limits: cpus: '2' memory: 4G reservations: cpus: '1' memory: 2G restart_policy: condition: on-failure delay: 5s max_attempts: 3 ``` --- ## Conclusion This architecture provides a robust, scalable, and maintainable foundation for integrating KYC APIs via MCP. The design emphasizes: - **Modularity:** Easy to extend with new tools - **Performance:** Caching and connection pooling - **Reliability:** Retry logic and error handling - **Security:** Authentication and input validation - **Observability:** Comprehensive logging and metrics - **Scalability:** Horizontal scaling support - **Maintainability:** Clear structure and documentation The Auto Tool Registry system enables rapid addition of new APIs without modifying core server code, while the advanced metadata schema provides fine-grained control over tool behavior, caching, rate limiting, and error handling. **Ready for implementation in Code mode.** --- **Document Version:** 1.0.0 **Last Updated:** 2025-11-20 **Author:** Kilo Code (Architect Mode) **Status:** Ready for Review & Implementation

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CTD-Techs/CTD-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server