Skip to main content
Glama

Finizi B4B MCP Server

by finizi-app
MIT License
mcp-plan.md56.7 kB
# Finizi B4B MCP Server - Implementation Plan ## 1. Project Overview ### 1.1 Purpose Build a standalone Model Context Protocol (MCP) server that enables AI agents to interact with Finizi B4B API data while maintaining proper authentication and multi-tenant authorization. ### 1.2 Key Architecture Decision: User Token Pass-Through **The MCP server acts as a transparent proxy**, forwarding the user's JWT token to the B4B API for authentication and authorization. This approach: - ✅ Preserves user identity and audit trail - ✅ Reuses existing B4B authentication and authorization logic - ✅ Enforces entity-level access control without code duplication - ✅ No need for separate service accounts or authorization server - ✅ Automatic compliance with B4B's RBAC system ### 1.3 Repository Structure ``` finizi-b4b-mcp-server/ ├── README.md ├── pyproject.toml # UV package management ├── .env.example ├── src/ │ └── finizi_b4b_mcp/ │ ├── __init__.py │ ├── server.py # Main MCP server entry point │ ├── config.py # Pydantic Settings │ ├── auth/ │ │ ├── __init__.py │ │ ├── token_handler.py # Token extraction & refresh │ │ └── jwt_utils.py # JWT decode/validate │ ├── client/ │ │ ├── __init__.py │ │ ├── api_client.py # HTTPX async client │ │ └── retry.py # Tenacity retry logic │ ├── tools/ │ │ ├── __init__.py │ │ ├── auth.py # login, logout, whoami │ │ ├── entities.py # Entity management tools │ │ ├── invoices.py # Invoice operations │ │ ├── vendors.py # Vendor management │ │ ├── products.py # Product & matching │ │ └── reports.py # Analytics & reports │ └── utils/ │ ├── __init__.py │ ├── validators.py # Input validation │ ├── formatters.py # Response formatting │ └── errors.py # Custom exceptions ├── tests/ │ ├── __init__.py │ ├── test_auth.py │ ├── test_entities.py │ ├── test_invoices.py │ └── fixtures/ │ └── mock_responses.json └── docs/ ├── API_MAPPING.md # B4B API to MCP tool mapping └── DEVELOPMENT.md # Development guide ``` --- ## 2. B4B API Documentation Retrieval ### 2.1 Accessing API Documentation Before implementing MCP tools, retrieve the complete B4B API documentation: ```bash # Method 1: Download OpenAPI JSON specification curl http://localhost:8000/api/v1/openapi.json > b4b-openapi.json # Method 2: Access interactive documentation # Swagger UI: http://localhost:8000/docs # ReDoc: http://localhost:8000/redoc # Method 3: View in browser with authentication test # Navigate to http://localhost:8000/docs and use the "Authorize" button # to test endpoints with actual JWT tokens ``` ### 2.2 API Documentation Structure (from main.py) The B4B API organizes endpoints into the following tags: | Tag | Description | Key Endpoints | |-----|-------------|---------------| | `authentication` | Login, registration, OTP | `/api/v1/auth/login`, `/api/v1/auth/register` | | `firebase-auth` | Firebase authentication | `/api/v1/auth/firebase/verify` | | `users` | User management | `/api/v1/users/me`, `/api/v1/users/{id}` | | `entities` | Entity (company) management | `/api/v1/entities/`, `/api/v1/entities/{id}` | | `invoices` | Invoice CRUD, XML/PDF import | `/api/v1/entities/{entity_id}/invoices` | | `vendors` | Vendor management | `/api/v1/entities/{entity_id}/vendors` | | `products` | Product matching | `/api/v1/products/` | | `email` | Email integration & OAuth | `/api/v1/email/oauth/authorize` | | `admin` | Super admin operations | `/api/v1/admin/users`, `/api/v1/admin/invoices` | | `queue` | Background job management | `/api/v1/jobs/{job_id}` | ### 2.3 Instructions for Coding Agent **Step-by-step process for implementing MCP tools:** 1. **Download OpenAPI Specification** ```bash cd finizi-b4b-mcp-server curl http://localhost:8000/api/v1/openapi.json -o docs/b4b-openapi.json ``` 2. **Review Endpoint Documentation** - Open http://localhost:8000/docs in browser - For each tag (entities, invoices, vendors, etc.): - Review available endpoints - Check request parameters (path, query, body) - Review response schemas - Note authentication requirements (all require JWT Bearer token) 3. **Map API Endpoints to MCP Tools** - Create `docs/API_MAPPING.md` with table: ```markdown | MCP Tool | B4B API Endpoint | Method | Description | |----------|------------------|--------|-------------| | `list_entities` | `/api/v1/entities/` | GET | List user's entities | | `create_entity` | `/api/v1/entities/` | POST | Create new entity | | `get_entity` | `/api/v1/entities/{id}` | GET | Get entity details | ``` 4. **Use Response Schemas for Validation** - Extract Pydantic schemas from OpenAPI spec - Reuse for MCP tool response validation - Example: `EntityResponse`, `InvoiceResponse`, `VendorResponse` 5. **Test Each Endpoint Before Implementation** ```bash # Login to get token curl -X POST http://localhost:8000/api/v1/auth/login \ -H "Content-Type: application/json" \ -d '{"phone": "+84909495665", "password": "Admin123@"}' # Use token to test endpoint curl -X GET http://localhost:8000/api/v1/entities/ \ -H "Authorization: Bearer YOUR_TOKEN_HERE" ``` 6. **Implement MCP Tool with Token Pass-Through** ```python @mcp.tool() async def list_entities(ctx: Context) -> dict: """List entities user has access to""" user_token = await extract_user_token(ctx) response = await api_client.get( "/api/v1/entities/", headers={"Authorization": f"Bearer {user_token}"} ) return response.json() ``` ### 2.4 Key B4B API Characteristics From reviewing the B4B codebase, note these important characteristics: - **Authentication**: All endpoints require JWT Bearer token (except `/auth/login`, `/auth/register`) - **Token Lifetime**: 120 minutes (2 hours) - implement auto-refresh logic - **Multi-Tenant**: Most endpoints require `entity_id` parameter - **Authorization**: B4B API enforces via `UserEntityRelationship` table - **Super Admin**: `is_super_admin=True` users bypass entity access checks - **Pagination**: Use `page` and `per_page` parameters (max `per_page=100`) - **Error Responses**: Standard FastAPI format with `detail` field - **Date Filters**: Many endpoints support `date_from` and `date_to` query params - **Status Enums**: Invoice status is INTEGER (0=DRAFT, 1=ACTIVE, 2=CANCELLED, 3=ARCHIVED) ### 2.5 OpenAPI Spec Parsing Script Create a helper script to parse OpenAPI spec and generate MCP tool templates: ```python # tools/generate_mcp_tools.py import json from pathlib import Path def parse_openapi_spec(spec_path: str) -> dict: """Parse OpenAPI spec and extract endpoint information""" with open(spec_path) as f: spec = json.load(f) tools = [] for path, methods in spec['paths'].items(): for method, details in methods.items(): if method.lower() in ['get', 'post', 'put', 'delete', 'patch']: tools.append({ 'path': path, 'method': method.upper(), 'operation_id': details.get('operationId'), 'summary': details.get('summary'), 'description': details.get('description'), 'parameters': details.get('parameters', []), 'request_body': details.get('requestBody'), 'responses': details.get('responses'), 'tags': details.get('tags', []), }) return tools def generate_mcp_tool_template(tool: dict) -> str: """Generate MCP tool template from OpenAPI endpoint""" # Extract parameters params = [] for param in tool.get('parameters', []): params.append(f"{param['name']}: {param['schema']['type']}") # Generate function signature func_name = tool['operation_id'].replace('-', '_') params_str = ', '.join(params) if params else '' template = f''' @mcp.tool() async def {func_name}({params_str}, ctx: Context) -> dict: """ {tool['summary']} {tool['description']} B4B API: {tool['method']} {tool['path']} """ user_token = await extract_user_token(ctx) response = await api_client.{tool['method'].lower()}( "{tool['path']}", headers={{"Authorization": f"Bearer {{user_token}}"}} ) return response.json() ''' return template if __name__ == "__main__": spec_path = "docs/b4b-openapi.json" tools = parse_openapi_spec(spec_path) # Generate templates for entities tag for tool in tools: if 'entities' in tool.get('tags', []): print(generate_mcp_tool_template(tool)) ``` --- ## 3. Authentication & Authorization Architecture ### 3.1 Authentication Flow ``` ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ AI Agent │ │ MCP Server │ │ B4B API │ └──────┬──────┘ └──────┬───────┘ └──────┬──────┘ │ │ │ │ 1. mcp.call("login") │ │ ├──────────────────────>│ │ │ │ 2. POST /auth/login │ │ ├───────────────────────>│ │ │ │ │ │ 3. {access_token, ...} │ │ │<───────────────────────┤ │ │ │ │ │ 4. Store in session │ │ │ metadata │ │ │ │ │ 5. {success: true} │ │ │<──────────────────────┤ │ │ │ │ │ 6. mcp.call( │ │ │ "list_invoices", │ │ │ entity_id="...") │ │ ├──────────────────────>│ │ │ │ 7. Extract user token │ │ │ from session │ │ │ │ │ │ 8. GET /invoices │ │ │ Authorization: │ │ │ Bearer {token} │ │ ├───────────────────────>│ │ │ │ │ │ 9. Check JWT validity │ │ │ & user permissions │ │ │ │ │ │ 10. Invoice data │ │ │<───────────────────────┤ │ │ │ │ 11. Formatted result │ │ │<──────────────────────┤ │ ``` ### 3.2 Key Implementation: Token Storage & Extraction **Login Tool (stores user token in session):** ```python @mcp.tool() async def login(phone: str, password: str, ctx: Context) -> dict: """ Login to B4B API and establish authenticated session. This tool must be called first before using any other tools. The JWT token is stored in the session and automatically used for all subsequent API calls. Args: phone: User's phone number (e.g., "+84909495665") password: User's password ctx: MCP context (automatically provided) Returns: Success message with user info Example: await login("+84909495665", "Admin123@") """ logger.info(f"Login attempt for phone: {phone}") try: # Call B4B login endpoint response = await api_client.post( "/api/v1/auth/login", json={"phone": phone, "password": password} ) response.raise_for_status() data = response.json() # Store tokens in session metadata ctx.session.metadata['user_token'] = data["access_token"] ctx.session.metadata['refresh_token'] = data.get("refresh_token") ctx.session.metadata['user_email'] = data.get("email") ctx.session.metadata['user_id'] = data.get("id") ctx.session.metadata['is_super_admin'] = data.get("is_super_admin", False) logger.info(f"Login successful for: {data.get('email')}") return { "success": True, "message": f"Successfully logged in as {data.get('email')}", "user_id": data.get("id"), "is_super_admin": data.get("is_super_admin", False) } except httpx.HTTPStatusError as e: logger.error(f"Login failed: {e.response.status_code} - {e.response.text}") return { "success": False, "error": f"Login failed: {e.response.json().get('detail', 'Invalid credentials')}" } except Exception as e: logger.error(f"Login error: {str(e)}") return {"success": False, "error": str(e)} ``` **Token Extraction Helper (used by all tools):** ```python async def extract_user_token(ctx: Context) -> str: """ Extract and validate user's JWT token from session metadata. This is called by every MCP tool to get the user's authentication token before making API requests to B4B. Args: ctx: MCP context containing session metadata Returns: JWT access token string Raises: ValueError: If user is not authenticated Note: Token refresh logic should be added here in the future to handle token expiry (currently tokens expire after 120 minutes) """ token = ctx.session.metadata.get('user_token') if not token: raise ValueError( "Not authenticated. Please call the 'login' tool first with your " "phone number and password to establish an authenticated session." ) # TODO: Add token refresh logic # if await is_token_expired(token): # token = await refresh_token(ctx) return token ``` **Example Tool with Token Pass-Through:** ```python @mcp.tool() async def list_invoices( entity_id: str, page: int = 1, per_page: int = 20, date_from: Optional[str] = None, date_to: Optional[str] = None, ctx: Context = None ) -> dict: """ List invoices for a specific entity. The user must have access to the specified entity. Authorization is enforced by the B4B API based on the user's JWT token. Args: entity_id: UUID of the entity to list invoices for page: Page number (default: 1) per_page: Items per page, max 100 (default: 20) date_from: Optional start date (ISO format: YYYY-MM-DD) date_to: Optional end date (ISO format: YYYY-MM-DD) ctx: MCP context (automatically provided) Returns: { "items": [...], "total": 100, "page": 1, "per_page": 20, "pages": 5 } Example: await list_invoices("123e4567-e89b-12d3-a456-426614174000", page=1) """ # 1. Extract user's JWT token from session user_token = await extract_user_token(ctx) # 2. Build query parameters params = {"page": page, "per_page": min(per_page, 100)} if date_from: params["date_from"] = date_from if date_to: params["date_to"] = date_to try: # 3. Call B4B API with user's token (pass-through) response = await api_client.get( f"/api/v1/entities/{entity_id}/invoices", headers={"Authorization": f"Bearer {user_token}"}, params=params ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: # 4. Handle authorization errors with user-friendly messages if e.response.status_code == 403: return { "error": f"You don't have access to entity {entity_id}. " "Please verify the entity ID or contact an administrator." } elif e.response.status_code == 404: return {"error": f"Entity {entity_id} not found."} else: return {"error": f"API error: {e.response.json().get('detail', str(e))}"} except Exception as e: logger.error(f"Error listing invoices: {str(e)}") return {"error": str(e)} ``` ### 3.3 Authorization Model **How Multi-Tenant Authorization Works:** 1. **User logs in** → MCP stores JWT token in session 2. **User calls MCP tool** with `entity_id` parameter 3. **MCP extracts token** from session and forwards to B4B API 4. **B4B API validates JWT** and extracts `user_id` from token 5. **B4B API checks** `UserEntityRelationship` table: ```sql SELECT * FROM user_entity_relationships WHERE user_id = {from_jwt} AND entity_id = {from_request} AND is_active = true ``` 6. **B4B API returns**: - ✅ 200 OK → User has access - ❌ 403 Forbidden → User doesn't have access - ❌ 404 Not Found → Entity doesn't exist **Defense in Depth:** - MCP server: No authorization logic (just passes token) - B4B API: Enforces all authorization rules - Database: Stores relationship data - JWT: Cryptographically signed, cannot be forged **Special Case - Super Admin:** ```python # B4B API automatically handles super admin if current_user.is_super_admin: # Bypass entity relationship check # Super admin can access ANY entity entity = db.query(Entity).filter(Entity.id == entity_id).first() return entity ``` ### 3.4 Session Management Tools ```python @mcp.tool() async def logout(ctx: Context) -> dict: """ Logout and clear authentication session. This removes the stored JWT token from the session. You will need to call 'login' again to use other tools. """ ctx.session.metadata.clear() return {"success": True, "message": "Successfully logged out"} @mcp.tool() async def whoami(ctx: Context) -> dict: """ Get information about the currently authenticated user. Returns user details from the session metadata. """ try: user_token = await extract_user_token(ctx) # Call B4B API to get current user info response = await api_client.get( "/api/v1/users/me", headers={"Authorization": f"Bearer {user_token}"} ) return response.json() except ValueError as e: return {"error": str(e)} ``` --- ## 4. MCP Tool Categories & Implementation ### 4.1 Authentication Tools (Priority: P0) ```python # src/finizi_b4b_mcp/tools/auth.py @mcp.tool() async def login(phone: str, password: str, ctx: Context) -> dict: """Login to B4B and store user token""" # Implementation shown in section 3.2 @mcp.tool() async def logout(ctx: Context) -> dict: """Logout and clear session""" # Implementation shown in section 3.4 @mcp.tool() async def whoami(ctx: Context) -> dict: """Get current user info""" # Implementation shown in section 3.4 ``` ### 4.2 Entity Management Tools (Priority: P0) ```python # src/finizi_b4b_mcp/tools/entities.py @mcp.tool() async def list_entities( page: int = 1, per_page: int = 20, search: Optional[str] = None, ctx: Context = None ) -> dict: """ List entities the user has access to. Args: page: Page number per_page: Items per page (max 100) search: Optional search query for entity name or tax ID Returns: Paginated list of entities with basic information """ user_token = await extract_user_token(ctx) params = {"page": page, "per_page": min(per_page, 100)} if search: params["search"] = search response = await api_client.get( "/api/v1/entities/", headers={"Authorization": f"Bearer {user_token}"}, params=params ) return response.json() @mcp.tool() async def get_entity(entity_id: str, ctx: Context) -> dict: """ Get detailed information about a specific entity. Args: entity_id: UUID of the entity Returns: Complete entity details including tax information, address, etc. """ user_token = await extract_user_token(ctx) response = await api_client.get( f"/api/v1/entities/{entity_id}", headers={"Authorization": f"Bearer {user_token}"} ) return response.json() @mcp.tool() async def create_entity( name: str, tax_id: str, entity_type: str, address: Optional[str] = None, phone: Optional[str] = None, email: Optional[str] = None, ctx: Context = None ) -> dict: """ Create a new entity. Args: name: Entity name (company name or individual name) tax_id: Tax identification number (10 or 13 digits) entity_type: "company" or "individual" address: Optional business address phone: Optional phone number email: Optional email address Returns: Created entity details including generated ID """ user_token = await extract_user_token(ctx) payload = { "name": name, "tax_id": tax_id, "entity_type": entity_type, } if address: payload["address"] = address if phone: payload["phone"] = phone if email: payload["email"] = email response = await api_client.post( "/api/v1/entities/", headers={"Authorization": f"Bearer {user_token}"}, json=payload ) return response.json() @mcp.tool() async def update_entity( entity_id: str, name: Optional[str] = None, address: Optional[str] = None, phone: Optional[str] = None, email: Optional[str] = None, ctx: Context = None ) -> dict: """ Update entity information. Args: entity_id: UUID of entity to update name: Optional new name address: Optional new address phone: Optional new phone email: Optional new email Returns: Updated entity details """ user_token = await extract_user_token(ctx) # Build payload with only provided fields payload = {} if name is not None: payload["name"] = name if address is not None: payload["address"] = address if phone is not None: payload["phone"] = phone if email is not None: payload["email"] = email response = await api_client.put( f"/api/v1/entities/{entity_id}", headers={"Authorization": f"Bearer {user_token}"}, json=payload ) return response.json() ``` ### 4.3 Invoice Management Tools (Priority: P1) ```python # src/finizi_b4b_mcp/tools/invoices.py @mcp.tool() async def list_invoices( entity_id: str, page: int = 1, per_page: int = 20, date_from: Optional[str] = None, date_to: Optional[str] = None, status: Optional[int] = None, search: Optional[str] = None, ctx: Context = None ) -> dict: """ List invoices for an entity with filtering options. Args: entity_id: UUID of the entity page: Page number (default: 1) per_page: Items per page, max 100 (default: 20) date_from: Start date filter (ISO format: YYYY-MM-DD) date_to: End date filter (ISO format: YYYY-MM-DD) status: Invoice status (0=DRAFT, 1=ACTIVE, 2=CANCELLED, 3=ARCHIVED) search: Search in invoice number or vendor name Returns: Paginated list of invoices with vendor info """ # Implementation shown in section 3.2 @mcp.tool() async def get_invoice(entity_id: str, invoice_id: str, ctx: Context) -> dict: """ Get detailed information about a specific invoice. Args: entity_id: UUID of the entity invoice_id: UUID of the invoice Returns: Complete invoice details including line items """ user_token = await extract_user_token(ctx) response = await api_client.get( f"/api/v1/entities/{entity_id}/invoices/{invoice_id}", headers={"Authorization": f"Bearer {user_token}"} ) return response.json() @mcp.tool() async def import_invoice_xml( entity_id: str, xml_content: str, ctx: Context = None ) -> dict: """ Import invoice from Vietnamese e-invoice XML format. Args: entity_id: UUID of the entity xml_content: XML content as string Returns: Imported invoice details with parsing status """ user_token = await extract_user_token(ctx) response = await api_client.post( f"/api/v1/entities/{entity_id}/invoices/upload-xml", headers={"Authorization": f"Bearer {user_token}"}, json={"xml_content": xml_content} ) return response.json() @mcp.tool() async def get_invoice_statistics( entity_id: str, year: Optional[int] = None, month: Optional[int] = None, ctx: Context = None ) -> dict: """ Get invoice statistics and analytics. Args: entity_id: UUID of the entity year: Optional year filter month: Optional month filter (1-12) Returns: Statistics including total invoices, amounts, VAT, etc. """ user_token = await extract_user_token(ctx) params = {} if year: params["year"] = year if month: params["month"] = month response = await api_client.get( f"/api/v1/entities/{entity_id}/invoices/stats", headers={"Authorization": f"Bearer {user_token}"}, params=params ) return response.json() ``` ### 4.4 Vendor Management Tools (Priority: P2) ```python # src/finizi_b4b_mcp/tools/vendors.py @mcp.tool() async def list_vendors( entity_id: str, page: int = 1, per_page: int = 20, search: Optional[str] = None, ctx: Context = None ) -> dict: """List vendors for an entity""" user_token = await extract_user_token(ctx) params = {"page": page, "per_page": min(per_page, 100)} if search: params["search"] = search response = await api_client.get( f"/api/v1/entities/{entity_id}/vendors", headers={"Authorization": f"Bearer {user_token}"}, params=params ) return response.json() @mcp.tool() async def get_vendor( entity_id: str, vendor_id: str, ctx: Context = None ) -> dict: """Get vendor details including transaction history""" user_token = await extract_user_token(ctx) response = await api_client.get( f"/api/v1/entities/{entity_id}/vendors/{vendor_id}", headers={"Authorization": f"Bearer {user_token}"} ) return response.json() ``` ### 4.5 Product Management Tools (Priority: P2) ```python # src/finizi_b4b_mcp/tools/products.py @mcp.tool() async def list_products( entity_id: str, page: int = 1, per_page: int = 20, category: Optional[str] = None, search: Optional[str] = None, ctx: Context = None ) -> dict: """List products for an entity""" user_token = await extract_user_token(ctx) params = {"page": page, "per_page": min(per_page, 100)} if category: params["category"] = category if search: params["search"] = search response = await api_client.get( f"/api/v1/entities/{entity_id}/products", headers={"Authorization": f"Bearer {user_token}"}, params=params ) return response.json() @mcp.tool() async def search_similar_products( entity_id: str, product_name: str, top_k: int = 5, ctx: Context = None ) -> dict: """ Search for similar products using semantic search. Uses vector embeddings to find products with similar names/descriptions. Args: entity_id: UUID of the entity product_name: Product name to search for top_k: Number of similar products to return (default: 5) Returns: List of similar products with similarity scores """ user_token = await extract_user_token(ctx) response = await api_client.post( f"/api/v1/entities/{entity_id}/products/search-similar", headers={"Authorization": f"Bearer {user_token}"}, json={"product_name": product_name, "top_k": top_k} ) return response.json() ``` --- ## 5. Configuration & Environment ### 5.1 Configuration Class (Pydantic Settings) ```python # src/finizi_b4b_mcp/config.py from pydantic_settings import BaseSettings from pydantic import Field from typing import Optional class MCPSettings(BaseSettings): """MCP Server configuration from environment variables""" # B4B API Configuration b4b_api_base_url: str = Field( default="http://localhost:8000", description="Base URL for B4B API" ) b4b_api_version: str = Field( default="v1", description="API version to use" ) # Timeouts api_timeout: int = Field( default=30, description="HTTP request timeout in seconds" ) api_connect_timeout: int = Field( default=10, description="HTTP connection timeout in seconds" ) # Retry Configuration max_retries: int = Field( default=3, description="Maximum number of retry attempts" ) retry_backoff: float = Field( default=1.0, description="Exponential backoff multiplier" ) # Token Refresh (future) token_refresh_threshold_minutes: int = Field( default=15, description="Refresh token when this many minutes remain" ) # Logging log_level: str = Field( default="INFO", description="Logging level (DEBUG, INFO, WARNING, ERROR)" ) # Rate Limiting (future) rate_limit_requests: int = Field( default=100, description="Maximum requests per minute" ) @property def api_base_url(self) -> str: """Full API base URL with version""" return f"{self.b4b_api_base_url}/api/{self.b4b_api_version}" class Config: env_file = ".env" env_file_encoding = "utf-8" case_sensitive = False settings = MCPSettings() ``` ### 5.2 Environment Variables (.env.example) ```bash # B4B API Configuration B4B_API_BASE_URL=http://localhost:8000 B4B_API_VERSION=v1 # Timeouts (in seconds) API_TIMEOUT=30 API_CONNECT_TIMEOUT=10 # Retry Configuration MAX_RETRIES=3 RETRY_BACKOFF=1.0 # Token Management TOKEN_REFRESH_THRESHOLD_MINUTES=15 # Logging LOG_LEVEL=INFO # Rate Limiting RATE_LIMIT_REQUESTS=100 ``` --- ## 6. HTTP Client Implementation ### 6.1 Async API Client with Retry Logic ```python # src/finizi_b4b_mcp/client/api_client.py import httpx from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type ) from ..config import settings import structlog logger = structlog.get_logger(__name__) class B4BAPIClient: """Async HTTP client for B4B API with retry logic""" def __init__(self): self.base_url = settings.api_base_url self.timeout = httpx.Timeout( timeout=settings.api_timeout, connect=settings.api_connect_timeout ) # Create persistent client (connection pooling) self.client = httpx.AsyncClient( base_url=self.base_url, timeout=self.timeout, follow_redirects=True, headers={ "User-Agent": "Finizi-B4B-MCP/1.0", "Content-Type": "application/json" } ) logger.info(f"Initialized B4B API client for {self.base_url}") @retry( stop=stop_after_attempt(lambda: settings.max_retries), wait=wait_exponential(multiplier=lambda: settings.retry_backoff), retry=retry_if_exception_type(( httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError )), reraise=True ) async def get(self, path: str, headers: dict = None, params: dict = None) -> httpx.Response: """GET request with retry logic""" logger.debug(f"GET {path}", params=params) response = await self.client.get(path, headers=headers, params=params) logger.debug(f"Response: {response.status_code}") return response @retry( stop=stop_after_attempt(lambda: settings.max_retries), wait=wait_exponential(multiplier=lambda: settings.retry_backoff), retry=retry_if_exception_type(( httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError )), reraise=True ) async def post(self, path: str, headers: dict = None, json: dict = None) -> httpx.Response: """POST request with retry logic""" logger.debug(f"POST {path}", json_keys=list(json.keys()) if json else None) response = await self.client.post(path, headers=headers, json=json) logger.debug(f"Response: {response.status_code}") return response @retry( stop=stop_after_attempt(lambda: settings.max_retries), wait=wait_exponential(multiplier=lambda: settings.retry_backoff), retry=retry_if_exception_type(( httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError )), reraise=True ) async def put(self, path: str, headers: dict = None, json: dict = None) -> httpx.Response: """PUT request with retry logic""" logger.debug(f"PUT {path}") response = await self.client.put(path, headers=headers, json=json) logger.debug(f"Response: {response.status_code}") return response @retry( stop=stop_after_attempt(lambda: settings.max_retries), wait=wait_exponential(multiplier=lambda: settings.retry_backoff), retry=retry_if_exception_type(( httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError )), reraise=True ) async def delete(self, path: str, headers: dict = None) -> httpx.Response: """DELETE request with retry logic""" logger.debug(f"DELETE {path}") response = await self.client.delete(path, headers=headers) logger.debug(f"Response: {response.status_code}") return response async def close(self): """Close HTTP client connection""" await self.client.aclose() logger.info("Closed B4B API client") # Singleton instance _api_client = None def get_api_client() -> B4BAPIClient: """Get singleton API client instance""" global _api_client if _api_client is None: _api_client = B4BAPIClient() return _api_client ``` --- ## 7. Main MCP Server Entry Point ### 7.1 Server Initialization with Lifespan Management ```python # src/finizi_b4b_mcp/server.py import mcp.server.fastmcp as fastmcp from contextlib import asynccontextmanager import structlog from .config import settings from .client.api_client import get_api_client from .tools import auth, entities, invoices, vendors, products # Configure structured logging structlog.configure( processors=[ structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.dev.ConsoleRenderer() ] ) logger = structlog.get_logger(__name__) # Create MCP server instance mcp = fastmcp.FastMCP("Finizi B4B API") @asynccontextmanager async def lifespan(): """ Lifespan context manager for MCP server. Handles startup and shutdown: - Startup: Initialize HTTP client, validate B4B API connection - Shutdown: Close HTTP client, cleanup resources """ # Startup logger.info("Starting Finizi B4B MCP Server") api_client = get_api_client() # Health check B4B API try: response = await api_client.get("/") logger.info(f"B4B API health check: {response.status_code}") except Exception as e: logger.error(f"B4B API health check failed: {str(e)}") yield # Shutdown logger.info("Shutting down Finizi B4B MCP Server") await api_client.close() # Register lifespan handler mcp.lifespan(lifespan) # Import and register all tools # Note: Tools are registered via decorators in their respective modules from .tools import ( auth, # login, logout, whoami entities, # list_entities, get_entity, create_entity, update_entity invoices, # list_invoices, get_invoice, import_invoice_xml, get_invoice_statistics vendors, # list_vendors, get_vendor products # list_products, search_similar_products ) if __name__ == "__main__": # Run MCP server mcp.run() ``` --- ## 8. Implementation Roadmap ### 8.1 Phase 1: Foundation (Week 1) **Goal: Basic MCP server with authentication** Tasks: - [ ] Set up repository structure - [ ] Implement configuration management (Pydantic Settings) - [ ] Build HTTP client with retry logic (HTTPX + Tenacity) - [ ] Implement token extraction helper - [ ] Create authentication tools (login, logout, whoami) - [ ] Set up structured logging (structlog) - [ ] Write unit tests for auth flow Deliverables: - Working MCP server that can login to B4B API - Token stored in session metadata - Basic error handling ### 8.2 Phase 2: Core Tools (Week 2) **Goal: Entity and invoice management** Tasks: - [ ] Implement entity management tools (list, get, create, update) - [ ] Implement invoice management tools (list, get, statistics) - [ ] Add input validation helpers - [ ] Add response formatting utilities - [ ] Write integration tests with mock B4B API responses - [ ] Document API mapping (API_MAPPING.md) Deliverables: - Entity CRUD operations working - Invoice listing and retrieval working - Proper error handling for 403/404 errors ### 8.3 Phase 3: Advanced Features (Week 3) **Goal: Complete tool coverage** Tasks: - [ ] Implement vendor management tools - [ ] Implement product management tools (with semantic search) - [ ] Add invoice import tool (XML support) - [ ] Implement pagination helpers - [ ] Add comprehensive error handling - [ ] Performance testing and optimization Deliverables: - All major B4B API endpoints covered - XML invoice import working - Semantic product search working ### 8.4 Phase 4: Production Readiness (Week 4) **Goal: Monitoring, security, performance** Tasks: - [ ] Implement token auto-refresh logic - [ ] Add rate limiting (token bucket algorithm) - [ ] Implement comprehensive logging - [ ] Add metrics and monitoring hooks - [ ] Security audit (token storage, input validation) - [ ] Load testing with multiple concurrent sessions - [ ] Documentation (README, API docs, examples) Deliverables: - Production-ready MCP server - Complete documentation - Security best practices implemented ### 8.5 Phase 5: Polish & Launch (Week 5) **Goal: Launch and iterate** Tasks: - [ ] Create example usage scripts - [ ] Write integration guide for AI agents - [ ] Performance optimization based on profiling - [ ] Bug fixes from testing - [ ] Deploy to production environment - [ ] Monitor initial usage and gather feedback Deliverables: - Deployed MCP server - Usage examples - Monitoring dashboard --- ## 9. Testing Strategy ### 9.1 Unit Tests ```python # tests/test_auth.py import pytest from unittest.mock import Mock, AsyncMock, patch import httpx @pytest.mark.asyncio async def test_login_success(): """Test successful login stores token in session""" from finizi_b4b_mcp.tools.auth import login # Mock Context ctx = Mock() ctx.session.metadata = {} # Mock API response mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "access_token": "fake_token_123", "refresh_token": "refresh_token_456", "email": "test@example.com", "id": "user_id_789", "is_super_admin": False } with patch('finizi_b4b_mcp.client.api_client.get_api_client') as mock_client: mock_client.return_value.post = AsyncMock(return_value=mock_response) result = await login("+84909495665", "Admin123@", ctx) # Assert success assert result["success"] is True assert "Successfully logged in" in result["message"] # Assert token stored assert ctx.session.metadata['user_token'] == "fake_token_123" assert ctx.session.metadata['refresh_token'] == "refresh_token_456" assert ctx.session.metadata['user_email'] == "test@example.com" @pytest.mark.asyncio async def test_login_invalid_credentials(): """Test login with invalid credentials returns error""" from finizi_b4b_mcp.tools.auth import login ctx = Mock() ctx.session.metadata = {} # Mock 401 response mock_response = Mock() mock_response.status_code = 401 mock_response.json.return_value = {"detail": "Invalid credentials"} mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( "401", request=Mock(), response=mock_response ) with patch('finizi_b4b_mcp.client.api_client.get_api_client') as mock_client: mock_client.return_value.post = AsyncMock(return_value=mock_response) result = await login("+84909495665", "WrongPassword", ctx) # Assert failure assert result["success"] is False assert "Invalid credentials" in result["error"] @pytest.mark.asyncio async def test_extract_token_not_authenticated(): """Test token extraction fails when not logged in""" from finizi_b4b_mcp.auth.token_handler import extract_user_token ctx = Mock() ctx.session.metadata = {} # No token with pytest.raises(ValueError, match="Not authenticated"): await extract_user_token(ctx) ``` ### 9.2 Integration Tests ```python # tests/test_entities.py import pytest from unittest.mock import Mock, AsyncMock, patch @pytest.mark.asyncio async def test_list_entities_with_token(): """Test listing entities with valid token""" from finizi_b4b_mcp.tools.entities import list_entities # Mock authenticated context ctx = Mock() ctx.session.metadata = {"user_token": "valid_token_123"} # Mock API response mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "items": [ {"id": "entity_1", "name": "Company A"}, {"id": "entity_2", "name": "Company B"} ], "total": 2, "page": 1, "per_page": 20 } with patch('finizi_b4b_mcp.client.api_client.get_api_client') as mock_client: mock_client.return_value.get = AsyncMock(return_value=mock_response) result = await list_entities(ctx=ctx) # Assert correct API call mock_client.return_value.get.assert_called_once() call_args = mock_client.return_value.get.call_args assert call_args.kwargs["headers"]["Authorization"] == "Bearer valid_token_123" # Assert response assert result["total"] == 2 assert len(result["items"]) == 2 @pytest.mark.asyncio async def test_list_invoices_no_access(): """Test listing invoices for entity user doesn't have access to""" from finizi_b4b_mcp.tools.invoices import list_invoices ctx = Mock() ctx.session.metadata = {"user_token": "valid_token_123"} # Mock 403 Forbidden response mock_response = Mock() mock_response.status_code = 403 mock_response.json.return_value = {"detail": "You don't have access to this entity"} mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( "403", request=Mock(), response=mock_response ) with patch('finizi_b4b_mcp.client.api_client.get_api_client') as mock_client: mock_client.return_value.get = AsyncMock(return_value=mock_response) result = await list_invoices("unauthorized_entity_id", ctx=ctx) # Assert proper error message assert "error" in result assert "don't have access" in result["error"] ``` ### 9.3 Test Fixtures ```python # tests/fixtures/mock_responses.json { "login_success": { "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "refresh_token": "refresh_token_456", "token_type": "bearer", "expires_in": 7200, "user": { "id": "550e8400-e29b-41d4-a716-446655440000", "email": "test@example.com", "phone": "+84909495665", "is_super_admin": false, "role": "user" } }, "entities_list": { "items": [ { "id": "entity_uuid_1", "name": "CÔNG TY TNHH ABC", "tax_id": "0123456789", "entity_type": "company", "address": "123 Nguyen Hue, District 1, HCMC" } ], "total": 1, "page": 1, "per_page": 20, "pages": 1 } } ``` --- ## 10. Security Considerations ### 10.1 Token Security - ✅ **In-Memory Storage**: Tokens stored in MCP session metadata (not persisted to disk) - ✅ **No Logs**: Never log JWT tokens or passwords - ✅ **HTTPS Only**: Require HTTPS for production B4B API - ✅ **Token Expiry**: Implement auto-refresh before 120-minute expiry - ⚠️ **Session Cleanup**: Clear tokens on logout and server shutdown ### 10.2 Input Validation ```python # src/finizi_b4b_mcp/utils/validators.py import uuid from typing import Optional def validate_uuid(value: str, field_name: str = "id") -> str: """Validate UUID format""" try: uuid.UUID(value) return value except ValueError: raise ValueError(f"Invalid {field_name}: must be a valid UUID") def validate_page_params(page: int, per_page: int) -> tuple[int, int]: """Validate pagination parameters""" if page < 1: raise ValueError("page must be >= 1") if per_page < 1 or per_page > 100: raise ValueError("per_page must be between 1 and 100") return page, min(per_page, 100) def validate_phone(phone: str) -> str: """Validate Vietnamese phone number format""" import re pattern = r'^\+84[0-9]{9,10}$' if not re.match(pattern, phone): raise ValueError( "Invalid phone number. Format: +84XXXXXXXXX (9-10 digits after +84)" ) return phone ``` ### 10.3 Error Message Sanitization ```python # Never expose internal errors to users def sanitize_error(error: Exception) -> str: """Sanitize error messages for user display""" error_str = str(error) # Remove sensitive paths error_str = error_str.replace("/app/", "") # Remove internal IPs import re error_str = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '[IP]', error_str) return error_str ``` --- ## 11. Production Deployment ### 11.1 Package Configuration (pyproject.toml) ```toml [project] name = "finizi-b4b-mcp" version = "1.0.0" description = "MCP server for Finizi B4B API" authors = [{name = "Finizi Team", email = "dev@finizi.ai"}] requires-python = ">=3.11" dependencies = [ "mcp>=0.9.0", "httpx>=0.27.0", "pydantic>=2.0.0", "pydantic-settings>=2.0.0", "tenacity>=8.2.0", "structlog>=24.1.0", ] [project.optional-dependencies] dev = [ "pytest>=8.0.0", "pytest-asyncio>=0.23.0", "pytest-mock>=3.12.0", "pytest-cov>=4.1.0", "ruff>=0.1.0", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.ruff] line-length = 100 target-version = "py311" [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] ``` ### 11.2 Docker Deployment (Optional) ```dockerfile # Dockerfile FROM python:3.11-slim WORKDIR /app # Install UV RUN pip install uv # Copy project files COPY pyproject.toml ./ COPY src/ ./src/ # Install dependencies RUN uv sync # Run MCP server CMD ["uv", "run", "python", "-m", "finizi_b4b_mcp.server"] ``` ### 11.3 Environment Configuration ```bash # .env.production B4B_API_BASE_URL=https://api.finizi.ai B4B_API_VERSION=v1 API_TIMEOUT=60 MAX_RETRIES=3 LOG_LEVEL=INFO TOKEN_REFRESH_THRESHOLD_MINUTES=15 ``` --- ## 12. Future Enhancements ### 12.1 Token Auto-Refresh ```python async def refresh_token_if_needed(ctx: Context) -> str: """ Check token expiry and refresh if needed. JWT tokens expire after 120 minutes. This function checks the token and automatically refreshes it if less than 15 minutes remain. """ token = ctx.session.metadata.get('user_token') # Decode JWT to check expiry (without verification) import jwt decoded = jwt.decode(token, options={"verify_signature": False}) exp = decoded.get('exp') # Check if refresh needed import time time_remaining = exp - time.time() threshold = settings.token_refresh_threshold_minutes * 60 if time_remaining < threshold: logger.info("Token expiring soon, refreshing...") # Call refresh endpoint refresh_token = ctx.session.metadata.get('refresh_token') response = await api_client.post( "/api/v1/auth/refresh", json={"refresh_token": refresh_token} ) data = response.json() new_token = data["access_token"] # Update session ctx.session.metadata['user_token'] = new_token logger.info("Token refreshed successfully") return new_token return token ``` ### 12.2 Rate Limiting ```python from collections import deque import time class TokenBucket: """Token bucket rate limiter per session""" def __init__(self, capacity: int, refill_rate: float): self.capacity = capacity self.tokens = capacity self.refill_rate = refill_rate # tokens per second self.last_refill = time.time() def consume(self, tokens: int = 1) -> bool: """Try to consume tokens, return True if successful""" self._refill() if self.tokens >= tokens: self.tokens -= tokens return True return False def _refill(self): """Refill tokens based on time elapsed""" now = time.time() elapsed = now - self.last_refill refill_amount = elapsed * self.refill_rate self.tokens = min(self.capacity, self.tokens + refill_amount) self.last_refill = now ``` ### 12.3 Caching Layer ```python from functools import lru_cache import hashlib import json @lru_cache(maxsize=100) def cache_api_response(cache_key: str, ttl: int = 300): """ Cache API responses for frequently accessed data. Good candidates: - Entity details (rarely change) - Vendor lists (relatively stable) - Invoice statistics (can be stale by minutes) Not suitable for: - Invoice lists (real-time data) - Authentication (security risk) """ pass ``` --- ## 13. Monitoring & Observability ### 13.1 Structured Logging ```python import structlog logger = structlog.get_logger(__name__) # Log with context logger.info( "invoice_listed", entity_id=entity_id, user_id=user_id, result_count=len(invoices), duration_ms=duration ) # Log errors with full context logger.error( "api_request_failed", endpoint="/api/v1/entities", status_code=500, error=str(error), user_id=user_id ) ``` ### 13.2 Metrics Collection (Future) ```python # Potential metrics to track: # - Request latency by endpoint # - Error rate by status code # - Token refresh frequency # - Most used tools # - Session duration ``` --- ## 14. Success Criteria ### 14.1 Functional Requirements - ✅ User can login and token is stored - ✅ All entity operations work with proper authorization - ✅ Invoice listing respects entity access control - ✅ 403 errors return user-friendly messages - ✅ Super admin can access all entities - ✅ Token refresh works before expiry ### 14.2 Non-Functional Requirements - ⚡ API requests < 1 second median latency - 🔒 No token leakage in logs or errors - 📊 Structured logging for all operations - 🛡️ Input validation on all parameters - 🔄 Automatic retry on network errors - 📈 Support 100+ concurrent sessions ### 14.3 Quality Gates - 🧪 80%+ test coverage - 🐛 Zero critical security vulnerabilities - 📝 Complete API documentation - 🎯 All error cases handled gracefully - 🚀 Production deployment successful --- ## 15. References & Resources ### 15.1 MCP Documentation - **Official MCP Spec**: https://spec.modelcontextprotocol.io - **FastMCP Python SDK**: https://github.com/jlowin/fastmcp - **MCP Best Practices 2025**: Retrieved via Context7 ### 15.2 B4B API Documentation - **OpenAPI Spec**: http://localhost:8000/api/v1/openapi.json - **Swagger UI**: http://localhost:8000/docs - **ReDoc**: http://localhost:8000/redoc - **Source Code**: /Users/trunghuynh/development/b4b-api/ ### 15.3 Key Technologies - **HTTPX**: https://www.python-httpx.org/ - **Tenacity**: https://tenacity.readthedocs.io/ - **Pydantic**: https://docs.pydantic.dev/ - **Structlog**: https://www.structlog.org/ --- ## Appendix A: Quick Start Guide ### For Developers ```bash # 1. Clone repository git clone <repo-url> cd finizi-b4b-mcp-server # 2. Install UV curl -LsSf https://astral.sh/uv/install.sh | sh # 3. Install dependencies uv sync # 4. Configure environment cp .env.example .env # Edit .env with B4B API URL # 5. Run MCP server uv run python -m finizi_b4b_mcp.server # 6. Test with MCP client # (Use your preferred MCP client, e.g., Claude Desktop) ``` ### For AI Agents ```python # Example usage from AI agent # 1. Login first await mcp.call_tool("login", { "phone": "+84909495665", "password": "Admin123@" }) # 2. List entities user has access to entities = await mcp.call_tool("list_entities", {}) entity_id = entities["items"][0]["id"] # 3. List invoices for entity invoices = await mcp.call_tool("list_invoices", { "entity_id": entity_id, "page": 1, "per_page": 20 }) # 4. Get invoice statistics stats = await mcp.call_tool("get_invoice_statistics", { "entity_id": entity_id, "year": 2025, "month": 1 }) ``` --- ## Appendix B: API Endpoint Mapping | MCP Tool | HTTP Method | B4B API Endpoint | Description | |----------|-------------|------------------|-------------| | `login` | POST | `/api/v1/auth/login` | Authenticate user | | `logout` | N/A | N/A | Clear session (client-side) | | `whoami` | GET | `/api/v1/users/me` | Get current user info | | `list_entities` | GET | `/api/v1/entities/` | List user's entities | | `get_entity` | GET | `/api/v1/entities/{id}` | Get entity details | | `create_entity` | POST | `/api/v1/entities/` | Create new entity | | `update_entity` | PUT | `/api/v1/entities/{id}` | Update entity | | `list_invoices` | GET | `/api/v1/entities/{entity_id}/invoices` | List invoices | | `get_invoice` | GET | `/api/v1/entities/{entity_id}/invoices/{id}` | Get invoice details | | `import_invoice_xml` | POST | `/api/v1/entities/{entity_id}/invoices/upload-xml` | Import XML invoice | | `get_invoice_statistics` | GET | `/api/v1/entities/{entity_id}/invoices/stats` | Get invoice stats | | `list_vendors` | GET | `/api/v1/entities/{entity_id}/vendors` | List vendors | | `get_vendor` | GET | `/api/v1/entities/{entity_id}/vendors/{id}` | Get vendor details | | `list_products` | GET | `/api/v1/entities/{entity_id}/products` | List products | | `search_similar_products` | POST | `/api/v1/entities/{entity_id}/products/search-similar` | Semantic search | --- **Document Version**: 1.0 **Last Updated**: 2025-10-01 **Status**: Ready for Implementation

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/finizi-app/finizi-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server