Skip to main content
Glama
server.py21.7 kB
#!/usr/bin/env python3 """ National Dairy Farm MCP Server A Model Context Protocol server for the National Dairy FARM Program API. Provides access to dairy farm management data, evaluations, and analytics. """ import os import json import logging import asyncio from typing import Dict, List, Optional, Any, Sequence from datetime import datetime, timedelta import httpx from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, Field import uvicorn # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Security security = HTTPBearer() class DairyFarmMCPServer: """MCP Server for National Dairy Farm API.""" def __init__( self, base_url: str = "https://eval.nationaldairyfarm.com/dfdm/api", client_id: Optional[str] = None, client_secret: Optional[str] = None ): self.base_url = base_url.rstrip("/") self.client_id = client_id or os.getenv("DAIRY_FARM_CLIENT_ID") self.client_secret = client_secret or os.getenv("DAIRY_FARM_CLIENT_SECRET") self.access_token: Optional[str] = None self.token_expires_at: Optional[datetime] = None # HTTP client for API requests self.client = httpx.AsyncClient(timeout=30.0) # Supported API endpoints and operations self.endpoints = { # OAuth & Authentication "oauth_token": { "method": "POST", "path": "/oauth/token", "description": "Obtain OAuth access token", "parameters": ["grant_type", "client_id", "client_secret", "scope"] }, # Co-ops Management "list_coops": { "method": "GET", "path": "/coops", "description": "List all cooperatives", "parameters": ["page", "size", "sort"] }, "get_coop": { "method": "GET", "path": "/coops/{coop_id}", "description": "Get specific cooperative details", "parameters": ["coop_id"] }, "create_coop": { "method": "POST", "path": "/coops", "description": "Create new cooperative", "parameters": ["name", "description", "contact_info"] }, "update_coop": { "method": "PUT", "path": "/coops/{coop_id}", "description": "Update cooperative information", "parameters": ["coop_id", "name", "description", "contact_info"] }, # Farms Management "list_farms": { "method": "GET", "path": "/farms", "description": "List all farms", "parameters": ["page", "size", "sort", "coop_id", "search"] }, "get_farm": { "method": "GET", "path": "/farms/{farm_id}", "description": "Get specific farm details", "parameters": ["farm_id"] }, "create_farm": { "method": "POST", "path": "/farms", "description": "Create new farm", "parameters": ["name", "location", "coop_id", "contact_info"] }, "update_farm": { "method": "PUT", "path": "/farms/{farm_id}", "description": "Update farm information", "parameters": ["farm_id", "name", "location", "contact_info"] }, "delete_farm": { "method": "DELETE", "path": "/farms/{farm_id}", "description": "Delete farm", "parameters": ["farm_id"] }, # Facilities Management "list_facilities": { "method": "GET", "path": "/facilities", "description": "List farm facilities", "parameters": ["page", "size", "farm_id", "facility_type"] }, "get_facility": { "method": "GET", "path": "/facilities/{facility_id}", "description": "Get facility details", "parameters": ["facility_id"] }, "create_facility": { "method": "POST", "path": "/facilities", "description": "Create new facility", "parameters": ["farm_id", "name", "type", "capacity", "location"] }, "update_facility": { "method": "PUT", "path": "/facilities/{facility_id}", "description": "Update facility", "parameters": ["facility_id", "name", "type", "capacity", "location"] }, # Users Management "list_users": { "method": "GET", "path": "/users", "description": "List system users", "parameters": ["page", "size", "role", "coop_id"] }, "get_user": { "method": "GET", "path": "/users/{user_id}", "description": "Get user details", "parameters": ["user_id"] }, "create_user": { "method": "POST", "path": "/users", "description": "Create new user", "parameters": ["username", "email", "role", "coop_id", "permissions"] }, "update_user": { "method": "PUT", "path": "/users/{user_id}", "description": "Update user information", "parameters": ["user_id", "username", "email", "role", "permissions"] }, # Evaluations Management "list_evaluations": { "method": "GET", "path": "/evaluations", "description": "List farm evaluations", "parameters": ["page", "size", "farm_id", "status", "evaluator_id", "date_from", "date_to"] }, "get_evaluation": { "method": "GET", "path": "/evaluations/{evaluation_id}", "description": "Get evaluation details", "parameters": ["evaluation_id"] }, "create_evaluation": { "method": "POST", "path": "/evaluations", "description": "Create new evaluation", "parameters": ["farm_id", "evaluator_id", "evaluation_type", "scheduled_date", "notes"] }, "update_evaluation": { "method": "PUT", "path": "/evaluations/{evaluation_id}", "description": "Update evaluation", "parameters": ["evaluation_id", "status", "results", "notes", "completion_date"] }, "submit_evaluation": { "method": "POST", "path": "/evaluations/{evaluation_id}/submit", "description": "Submit completed evaluation", "parameters": ["evaluation_id", "results", "certifications"] }, # Life Cycle Analysis (LCA) "list_lca_reports": { "method": "GET", "path": "/lca/reports", "description": "List LCA reports", "parameters": ["page", "size", "farm_id", "report_type", "year"] }, "get_lca_report": { "method": "GET", "path": "/lca/reports/{report_id}", "description": "Get LCA report details", "parameters": ["report_id"] }, "create_lca_report": { "method": "POST", "path": "/lca/reports", "description": "Create new LCA report", "parameters": ["farm_id", "report_type", "year", "data", "methodology"] }, "update_lca_report": { "method": "PUT", "path": "/lca/reports/{report_id}", "description": "Update LCA report", "parameters": ["report_id", "data", "methodology", "status"] }, # Attachments Management "list_attachments": { "method": "GET", "path": "/attachments", "description": "List document attachments", "parameters": ["page", "size", "entity_type", "entity_id", "file_type"] }, "get_attachment": { "method": "GET", "path": "/attachments/{attachment_id}", "description": "Get attachment details", "parameters": ["attachment_id"] }, "upload_attachment": { "method": "POST", "path": "/attachments", "description": "Upload new attachment", "parameters": ["entity_type", "entity_id", "file", "description", "tags"] }, "delete_attachment": { "method": "DELETE", "path": "/attachments/{attachment_id}", "description": "Delete attachment", "parameters": ["attachment_id"] }, # Search Operations "search_farms": { "method": "GET", "path": "/search/farms", "description": "Search farms using Solr", "parameters": ["q", "filters", "facets", "page", "size"] }, "search_evaluations": { "method": "GET", "path": "/search/evaluations", "description": "Search evaluations", "parameters": ["q", "filters", "facets", "page", "size"] }, # Analytics & Reporting "get_farm_analytics": { "method": "GET", "path": "/analytics/farms/{farm_id}", "description": "Get farm performance analytics", "parameters": ["farm_id", "metrics", "period", "aggregation"] }, "get_coop_analytics": { "method": "GET", "path": "/analytics/coops/{coop_id}", "description": "Get cooperative analytics", "parameters": ["coop_id", "metrics", "period", "aggregation"] }, "get_evaluation_trends": { "method": "GET", "path": "/analytics/evaluation-trends", "description": "Get evaluation trend analysis", "parameters": ["coop_id", "farm_id", "period", "metrics"] } } async def authenticate(self) -> bool: """Authenticate with the Dairy Farm API using OAuth2 client credentials.""" if not self.client_id or not self.client_secret: logger.error("Client ID and secret required for authentication") return False # Check if token is still valid if (self.access_token and self.token_expires_at and datetime.now() < self.token_expires_at - timedelta(minutes=5)): return True try: auth_data = { "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, "scope": "read write" } response = await self.client.post( f"{self.base_url}/oauth/token", data=auth_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if response.status_code == 200: token_data = response.json() self.access_token = token_data.get("access_token") expires_in = token_data.get("expires_in", 3600) self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) logger.info("Successfully authenticated with Dairy Farm API") return True else: logger.error(f"Authentication failed: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Authentication error: {e}") return False async def make_request( self, method: str, path: str, params: Optional[Dict] = None, data: Optional[Dict] = None, headers: Optional[Dict] = None ) -> Dict[str, Any]: """Make authenticated request to the Dairy Farm API.""" # Ensure authentication if not await self.authenticate(): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Failed to authenticate with Dairy Farm API" ) # Prepare headers request_headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", "Accept": "application/json", "x-accept-version": "3.2" # API version } if headers: request_headers.update(headers) # Make request url = f"{self.base_url}{path}" try: response = await self.client.request( method=method, url=url, params=params, json=data, headers=request_headers ) if response.status_code in [200, 201, 202, 204]: if response.content: return response.json() else: return {"status": "success", "message": f"{method} {path} completed"} else: error_detail = f"API request failed: {response.status_code}" try: error_data = response.json() error_detail += f" - {error_data}" except: error_detail += f" - {response.text}" logger.error(error_detail) raise HTTPException( status_code=response.status_code, detail=error_detail ) except httpx.RequestError as e: logger.error(f"Request error: {e}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to connect to Dairy Farm API: {e}" ) def get_available_operations(self) -> List[Dict[str, Any]]: """Get list of all available MCP operations.""" operations = [] for op_name, op_config in self.endpoints.items(): operations.append({ "name": op_name, "method": op_config["method"], "path": op_config["path"], "description": op_config["description"], "parameters": op_config["parameters"] }) return operations async def execute_operation( self, operation_name: str, parameters: Dict[str, Any] ) -> Dict[str, Any]: """Execute a specific MCP operation.""" if operation_name not in self.endpoints: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unknown operation: {operation_name}" ) endpoint = self.endpoints[operation_name] method = endpoint["method"] path_template = endpoint["path"] # Extract path parameters path_params = {} query_params = {} body_data = None # Process parameters for key, value in parameters.items(): if f"{{{key}}}" in path_template: path_params[key] = value elif method in ["POST", "PUT", "PATCH"] and key not in ["page", "size", "sort"]: if body_data is None: body_data = {} body_data[key] = value else: query_params[key] = value # Build final path final_path = path_template for param_name, param_value in path_params.items(): final_path = final_path.replace(f"{{{param_name}}}", str(param_value)) # Make the request return await self.make_request( method=method, path=final_path, params=query_params if query_params else None, data=body_data ) async def close(self): """Close the HTTP client.""" await self.client.aclose() # Global server instance dairy_farm_server = DairyFarmMCPServer() # FastAPI app app = FastAPI( title="National Dairy Farm MCP Server", description="Model Context Protocol server for the National Dairy FARM Program API", version="1.0.0" ) # Request/Response Models class MCPOperation(BaseModel): operation: str = Field(..., description="Name of the operation to execute") parameters: Dict[str, Any] = Field(default_factory=dict, description="Operation parameters") class MCPResponse(BaseModel): success: bool operation: str result: Optional[Dict[str, Any]] = None error: Optional[str] = None class OperationInfo(BaseModel): name: str method: str path: str description: str parameters: List[str] # Dependency for authentication (optional for this MCP server) async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): """Validate MCP client authentication.""" # For this MCP server, we'll accept any bearer token # In production, you might want to validate against your own user system return {"token": credentials.credentials} @app.get("/") async def root(): """MCP Server information.""" return { "name": "National Dairy Farm MCP Server", "version": "1.0.0", "description": "Model Context Protocol server for National Dairy FARM Program API", "operations_count": len(dairy_farm_server.endpoints), "base_url": dairy_farm_server.base_url } @app.get("/operations", response_model=List[OperationInfo]) async def list_operations(): """List all available MCP operations.""" return dairy_farm_server.get_available_operations() @app.post("/execute", response_model=MCPResponse) async def execute_operation( request: MCPOperation, current_user: Dict = Depends(get_current_user) ): """Execute an MCP operation.""" try: result = await dairy_farm_server.execute_operation( operation_name=request.operation, parameters=request.parameters ) return MCPResponse( success=True, operation=request.operation, result=result ) except HTTPException as e: return MCPResponse( success=False, operation=request.operation, error=f"HTTP {e.status_code}: {e.detail}" ) except Exception as e: logger.error(f"Operation execution failed: {e}") return MCPResponse( success=False, operation=request.operation, error=str(e) ) @app.get("/health") async def health_check(): """Health check endpoint.""" try: # Test authentication auth_status = await dairy_farm_server.authenticate() return { "status": "healthy" if auth_status else "unhealthy", "dairy_farm_api_connected": auth_status, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "unhealthy", "error": str(e), "timestamp": datetime.now().isoformat() } @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown.""" await dairy_farm_server.close() def main(): """Run the MCP server.""" import argparse parser = argparse.ArgumentParser(description="National Dairy Farm MCP Server") parser.add_argument("--host", default="localhost", help="Host to bind to") parser.add_argument("--port", type=int, default=8001, help="Port to bind to") parser.add_argument("--dairy-farm-url", default="https://eval.nationaldairyfarm.com/dfdm/api", help="Dairy Farm API base URL") parser.add_argument("--client-id", help="Dairy Farm API client ID") parser.add_argument("--client-secret", help="Dairy Farm API client secret") args = parser.parse_args() # Configure the global server instance global dairy_farm_server dairy_farm_server = DairyFarmMCPServer( base_url=args.dairy_farm_url, client_id=args.client_id, client_secret=args.client_secret ) print(f"🐄 Starting National Dairy Farm MCP Server on {args.host}:{args.port}") print(f"🔗 Dairy Farm API: {args.dairy_farm_url}") print(f"📋 Available operations: {len(dairy_farm_server.endpoints)}") uvicorn.run(app, host=args.host, port=args.port) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DPoitrast/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server