Skip to main content
Glama

Sumanshu Arora

development.md43.2 kB
# Template Development Guide **Complete development workflow for creating, testing, and maintaining high-quality MCP server templates with professional development practices.** ## Development Workflow ### Setting Up Development Environment #### Prerequisites ```bash # Install development dependencies pip install -r requirements-dev.txt # Development tools include: # - pytest (testing framework) # - black (code formatting) # - flake8 (linting) # - mypy (type checking) # - coverage (test coverage) # - pre-commit (git hooks) ``` #### Project Structure ```bash # Clone the template repository git clone https://github.com/Data-Everything/mcp-server-templates.git cd mcp-server-templates # Set up development environment python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows pip install -e . ``` #### Development Configuration ```bash # Configure pre-commit hooks pre-commit install # Set up environment variables cp .env.example .env # Edit .env with your configuration # Run development setup make setup-dev ``` ### Template Development Lifecycle #### 1. Planning Phase **Define Requirements** ```markdown # Template Requirements Document - **Template ID**: my-new-template - **Purpose**: Integration with XYZ API - **Target Users**: Data analysts, developers - **Key Features**: - Data fetching tools - Transformation utilities - Export capabilities - **Configuration Needs**: - API credentials - Output formats - Rate limiting options ``` **Architecture Design** ```python # Design your tool structure tools_design = { "fetch_data": { "purpose": "Retrieve data from XYZ API", "parameters": ["endpoint", "filters", "format"], "output": "structured_data" }, "transform_data": { "purpose": "Apply transformations to data", "parameters": ["data", "transformations"], "output": "transformed_data" }, "export_data": { "purpose": "Export data in various formats", "parameters": ["data", "format", "destination"], "output": "export_result" } } ``` #### 2. Development Phase **Create Template Structure** ```bash # Use the template creator with development settings mcpt create my-new-template --dev-mode # This creates: templates/my-new-template/ ├── template.json # Template configuration ├── Dockerfile # Container definition ├── requirements.txt # Python dependencies ├── requirements-dev.txt # Development dependencies ├── src/ # Source code │ ├── __init__.py # Package initialization │ ├── server.py # Main server implementation │ ├── tools.py # Tool implementations │ ├── config.py # Configuration management │ └── utils.py # Utility functions ├── tests/ # Test suite │ ├── __init__.py # Test package │ ├── conftest.py # Test configuration │ ├── test_server.py # Server tests │ ├── test_tools.py # Tool tests │ ├── test_config.py # Configuration tests │ └── test_integration.py # Integration tests ├── config/ # Configuration examples │ ├── development.json # Development config │ ├── testing.json # Testing config │ └── production.json # Production config ├── docs/ # Template documentation │ ├── README.md # Overview │ ├── usage.md # Usage guide │ └── api.md # API reference ├── .env.example # Environment template ├── .gitignore # Git ignore rules ├── Makefile # Development commands └── docker-compose.dev.yml # Development setup ``` **Implement Core Functionality** *src/server.py - Main Server* ```python #!/usr/bin/env python3 """ My New Template - MCP Server Professional MCP server implementation with comprehensive error handling, logging, and configuration management. """ import asyncio import logging from typing import Any, Dict, List, Optional from contextlib import asynccontextmanager from fastmcp import FastMCP from fastmcp.tools import tool from .config import Config, load_config from .tools import MyNewTemplateTools from .utils import setup_logging # Initialize logging logger = logging.getLogger(__name__) # Initialize FastMCP server mcp = FastMCP("My New Template") class MyNewTemplateMCPServer: """ Professional MCP Server Implementation Features: - Comprehensive error handling - Structured logging - Configuration validation - Health monitoring - Graceful shutdown """ def __init__(self, config: Optional[Config] = None): self.config = config or load_config() self.tools = MyNewTemplateTools(self.config) self.is_healthy = False self._register_tools() self._setup_monitoring() def _register_tools(self): """Register all available tools with comprehensive error handling""" @tool("fetch_data") async def fetch_data( endpoint: str, filters: Optional[Dict[str, Any]] = None, format: str = "json" ) -> Dict[str, Any]: """ Retrieve data from XYZ API Args: endpoint: API endpoint to fetch from filters: Optional filters to apply format: Output format (json, csv, xml) Returns: Dict containing fetched data and metadata """ try: logger.info(f"Fetching data from endpoint: {endpoint}") result = await self.tools.fetch_data(endpoint, filters, format) logger.info(f"Successfully fetched {len(result.get('data', []))} records") return result except Exception as e: logger.error(f"Failed to fetch data: {e}", exc_info=True) return { "success": False, "error": str(e), "endpoint": endpoint, "filters": filters } @tool("transform_data") async def transform_data( data: List[Dict[str, Any]], transformations: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Apply transformations to data Args: data: Input data to transform transformations: List of transformation rules Returns: Dict containing transformed data """ try: logger.info(f"Applying {len(transformations)} transformations to {len(data)} records") result = await self.tools.transform_data(data, transformations) logger.info(f"Transformation completed: {result.get('records_processed', 0)} records") return result except Exception as e: logger.error(f"Data transformation failed: {e}", exc_info=True) return { "success": False, "error": str(e), "input_records": len(data), "transformations": len(transformations) } @tool("export_data") async def export_data( data: List[Dict[str, Any]], format: str = "json", destination: Optional[str] = None ) -> Dict[str, Any]: """ Export data in various formats Args: data: Data to export format: Export format (json, csv, parquet, xlsx) destination: Optional destination path/URL Returns: Dict containing export results """ try: logger.info(f"Exporting {len(data)} records as {format}") result = await self.tools.export_data(data, format, destination) logger.info(f"Export completed: {result.get('bytes_written', 0)} bytes written") return result except Exception as e: logger.error(f"Data export failed: {e}", exc_info=True) return { "success": False, "error": str(e), "records": len(data), "format": format } @tool("health_check") async def health_check() -> Dict[str, Any]: """Get server health status and diagnostics""" return await self._health_check() logger.info("Registered tools: fetch_data, transform_data, export_data, health_check") def _setup_monitoring(self): """Setup monitoring and health checks""" self.is_healthy = True logger.info("Monitoring setup completed") async def _health_check(self) -> Dict[str, Any]: """Comprehensive health check""" try: # Check external dependencies api_healthy = await self.tools.check_api_health() # Check configuration config_valid = self.config.validate() # Overall health status overall_healthy = api_healthy and config_valid and self.is_healthy return { "status": "healthy" if overall_healthy else "unhealthy", "version": "1.0.0", "timestamp": "2025-01-27T16:47:30Z", "checks": { "api_connection": api_healthy, "configuration": config_valid, "server": self.is_healthy }, "config": { "api_configured": bool(self.config.api_key), "export_formats": self.config.supported_formats, "rate_limit": self.config.rate_limit } } except Exception as e: logger.error(f"Health check failed: {e}") return { "status": "unhealthy", "error": str(e), "timestamp": "2025-01-27T16:47:30Z" } @asynccontextmanager async def lifespan_manager(): """Manage server startup and shutdown""" logger.info("MCP Server starting up...") yield logger.info("MCP Server shutting down...") async def main(): """Main server entry point with proper lifecycle management""" # Setup logging setup_logging() # Load and validate configuration config = load_config() if not config.validate(): logger.error("Invalid configuration, exiting") return # Initialize server server = MyNewTemplateMCPServer(config) logger.info("My New Template MCP Server starting...") logger.info(f"Configuration: API configured={bool(config.api_key)}") try: await mcp.run() except KeyboardInterrupt: logger.info("Received shutdown signal") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) finally: logger.info("Server shutdown complete") if __name__ == "__main__": asyncio.run(main()) ``` *src/tools.py - Tool Implementations* ```python """ Tool implementations for My New Template Professional tool implementations with comprehensive error handling, validation, and logging. """ import asyncio import logging from typing import Any, Dict, List, Optional import aiohttp import json import csv import pandas as pd from io import StringIO, BytesIO from .config import Config from .utils import validate_input, rate_limit, retry_on_failure logger = logging.getLogger(__name__) class MyNewTemplateTools: """Professional tool implementations with comprehensive features""" def __init__(self, config: Config): self.config = config self.session = None self._setup_session() async def _setup_session(self): """Setup HTTP session with proper configuration""" timeout = aiohttp.ClientTimeout(total=self.config.timeout) connector = aiohttp.TCPConnector(limit=self.config.connection_limit) self.session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={ "User-Agent": "MCP-Server-MyNewTemplate/1.0.0", "Authorization": f"Bearer {self.config.api_key}" } ) @validate_input @rate_limit @retry_on_failure(max_attempts=3) async def fetch_data( self, endpoint: str, filters: Optional[Dict[str, Any]] = None, format: str = "json" ) -> Dict[str, Any]: """ Fetch data from external API with comprehensive error handling Args: endpoint: API endpoint to fetch from filters: Optional query filters format: Response format Returns: Dict containing fetched data and metadata """ if not self.session: await self._setup_session() try: # Build request URL url = f"{self.config.api_base_url.rstrip('/')}/{endpoint.lstrip('/')}" # Prepare query parameters params = filters or {} params['format'] = format logger.debug(f"Fetching data from: {url} with params: {params}") # Make API request async with self.session.get(url, params=params) as response: response.raise_for_status() # Parse response based on format if format.lower() == 'json': data = await response.json() elif format.lower() == 'csv': text = await response.text() data = self._parse_csv(text) else: data = await response.text() # Return structured response return { "success": True, "data": data, "metadata": { "endpoint": endpoint, "filters": filters, "format": format, "status_code": response.status, "content_type": response.headers.get('content-type'), "timestamp": "2025-01-27T16:47:30Z" } } except aiohttp.ClientError as e: logger.error(f"HTTP client error: {e}") raise Exception(f"API request failed: {e}") except asyncio.TimeoutError: logger.error(f"Request timeout for endpoint: {endpoint}") raise Exception(f"Request timeout for endpoint: {endpoint}") except Exception as e: logger.error(f"Unexpected error fetching data: {e}") raise @validate_input async def transform_data( self, data: List[Dict[str, Any]], transformations: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Apply transformations to data with comprehensive validation Args: data: Input data to transform transformations: List of transformation rules Returns: Dict containing transformed data and statistics """ try: transformed_data = [] processing_stats = { "records_processed": 0, "records_skipped": 0, "transformations_applied": 0, "errors": [] } for record in data: try: transformed_record = dict(record) # Copy original # Apply each transformation for transformation in transformations: transformed_record = await self._apply_transformation( transformed_record, transformation ) processing_stats["transformations_applied"] += 1 transformed_data.append(transformed_record) processing_stats["records_processed"] += 1 except Exception as e: logger.warning(f"Skipping record due to transformation error: {e}") processing_stats["records_skipped"] += 1 processing_stats["errors"].append(str(e)) return { "success": True, "data": transformed_data, "statistics": processing_stats, "timestamp": "2025-01-27T16:47:30Z" } except Exception as e: logger.error(f"Data transformation failed: {e}") raise Exception(f"Data transformation failed: {e}") async def _apply_transformation( self, record: Dict[str, Any], transformation: Dict[str, Any] ) -> Dict[str, Any]: """Apply a single transformation to a record""" transform_type = transformation.get("type") if transform_type == "rename_field": # Rename field old_name = transformation["from"] new_name = transformation["to"] if old_name in record: record[new_name] = record.pop(old_name) elif transform_type == "add_field": # Add computed field field_name = transformation["name"] field_value = transformation["value"] record[field_name] = field_value elif transform_type == "filter_field": # Filter field based on condition field_name = transformation["field"] condition = transformation["condition"] if not self._evaluate_condition(record.get(field_name), condition): raise Exception(f"Record filtered out by condition: {condition}") elif transform_type == "format_field": # Format field value field_name = transformation["field"] format_type = transformation["format"] if field_name in record: record[field_name] = self._format_value(record[field_name], format_type) return record @validate_input async def export_data( self, data: List[Dict[str, Any]], format: str = "json", destination: Optional[str] = None ) -> Dict[str, Any]: """ Export data in various formats with comprehensive options Args: data: Data to export format: Export format (json, csv, parquet, xlsx) destination: Optional destination path/URL Returns: Dict containing export results and metadata """ try: # Export data based on format if format.lower() == "json": exported_data = self._export_json(data) elif format.lower() == "csv": exported_data = self._export_csv(data) elif format.lower() == "parquet": exported_data = self._export_parquet(data) elif format.lower() == "xlsx": exported_data = self._export_xlsx(data) else: raise ValueError(f"Unsupported export format: {format}") # Handle destination if specified if destination: await self._write_to_destination(exported_data, destination, format) return { "success": True, "format": format, "records_exported": len(data), "bytes_written": len(exported_data) if isinstance(exported_data, (str, bytes)) else 0, "destination": destination, "timestamp": "2025-01-27T16:47:30Z", "data": exported_data if not destination else None } except Exception as e: logger.error(f"Data export failed: {e}") raise Exception(f"Data export failed: {e}") def _export_json(self, data: List[Dict[str, Any]]) -> str: """Export data as JSON""" return json.dumps(data, indent=2, default=str) def _export_csv(self, data: List[Dict[str, Any]]) -> str: """Export data as CSV""" if not data: return "" output = StringIO() fieldnames = list(data[0].keys()) writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(data) return output.getvalue() def _export_parquet(self, data: List[Dict[str, Any]]) -> bytes: """Export data as Parquet""" df = pd.DataFrame(data) buffer = BytesIO() df.to_parquet(buffer, index=False) return buffer.getvalue() def _export_xlsx(self, data: List[Dict[str, Any]]) -> bytes: """Export data as Excel""" df = pd.DataFrame(data) buffer = BytesIO() df.to_excel(buffer, index=False, engine='openpyxl') return buffer.getvalue() async def check_api_health(self) -> bool: """Check if external API is healthy""" try: if not self.session: await self._setup_session() health_url = f"{self.config.api_base_url}/health" async with self.session.get(health_url) as response: return response.status == 200 except Exception as e: logger.warning(f"API health check failed: {e}") return False def _parse_csv(self, csv_text: str) -> List[Dict[str, Any]]: """Parse CSV text into list of dictionaries""" reader = csv.DictReader(StringIO(csv_text)) return list(reader) def _evaluate_condition(self, value: Any, condition: Dict[str, Any]) -> bool: """Evaluate a filter condition""" operator = condition.get("operator", "eq") expected = condition.get("value") if operator == "eq": return value == expected elif operator == "ne": return value != expected elif operator == "gt": return value > expected elif operator == "lt": return value < expected elif operator == "contains": return expected in str(value) else: return True def _format_value(self, value: Any, format_type: str) -> Any: """Format a value according to specified type""" if format_type == "uppercase": return str(value).upper() elif format_type == "lowercase": return str(value).lower() elif format_type == "title": return str(value).title() elif format_type == "strip": return str(value).strip() else: return value async def close(self): """Clean up resources""" if self.session: await self.session.close() ``` #### 3. Testing Phase **Comprehensive Test Suite** *tests/test_tools.py* ```python """ Comprehensive tests for My New Template tools """ import pytest from unittest.mock import Mock, AsyncMock, patch import aiohttp from src.tools import MyNewTemplateTools from src.config import Config @pytest.fixture def mock_config(): """Create mock configuration for testing""" config = Mock(spec=Config) config.api_key = "test-key" config.api_base_url = "https://api.example.com" config.timeout = 30 config.connection_limit = 10 config.rate_limit = 100 return config @pytest.fixture async def tools(mock_config): """Create tools instance for testing""" tools = MyNewTemplateTools(mock_config) yield tools await tools.close() class TestFetchData: """Test data fetching functionality""" @pytest.mark.asyncio async def test_fetch_data_success(self, tools): """Test successful data fetching""" mock_response_data = {"items": [{"id": 1, "name": "test"}]} with patch.object(tools, 'session') as mock_session: # Setup mock response mock_response = AsyncMock() mock_response.status = 200 mock_response.json.return_value = mock_response_data mock_response.headers = {'content-type': 'application/json'} mock_response.raise_for_status.return_value = None mock_session.get.return_value.__aenter__.return_value = mock_response # Test the method result = await tools.fetch_data("test/endpoint") # Assertions assert result["success"] is True assert result["data"] == mock_response_data assert result["metadata"]["endpoint"] == "test/endpoint" assert result["metadata"]["status_code"] == 200 @pytest.mark.asyncio async def test_fetch_data_http_error(self, tools): """Test handling of HTTP errors""" with patch.object(tools, 'session') as mock_session: # Setup mock to raise HTTP error mock_response = AsyncMock() mock_response.raise_for_status.side_effect = aiohttp.ClientResponseError( request_info=Mock(), history=(), status=404, message="Not Found" ) mock_session.get.return_value.__aenter__.return_value = mock_response # Test that exception is raised with pytest.raises(Exception, match="API request failed"): await tools.fetch_data("nonexistent/endpoint") @pytest.mark.asyncio async def test_fetch_data_with_filters(self, tools): """Test data fetching with filters""" filters = {"category": "test", "limit": 10} with patch.object(tools, 'session') as mock_session: mock_response = AsyncMock() mock_response.status = 200 mock_response.json.return_value = {"items": []} mock_response.headers = {'content-type': 'application/json'} mock_response.raise_for_status.return_value = None mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools.fetch_data("test/endpoint", filters=filters) # Verify filters were passed call_args = mock_session.get.call_args assert call_args[1]['params']['category'] == 'test' assert call_args[1]['params']['limit'] == 10 class TestTransformData: """Test data transformation functionality""" @pytest.mark.asyncio async def test_transform_data_rename_field(self, tools): """Test field renaming transformation""" data = [{"old_name": "value1"}, {"old_name": "value2"}] transformations = [{ "type": "rename_field", "from": "old_name", "to": "new_name" }] result = await tools.transform_data(data, transformations) assert result["success"] is True assert len(result["data"]) == 2 assert "new_name" in result["data"][0] assert "old_name" not in result["data"][0] assert result["data"][0]["new_name"] == "value1" @pytest.mark.asyncio async def test_transform_data_add_field(self, tools): """Test field addition transformation""" data = [{"id": 1}, {"id": 2}] transformations = [{ "type": "add_field", "name": "status", "value": "active" }] result = await tools.transform_data(data, transformations) assert result["success"] is True assert result["data"][0]["status"] == "active" assert result["data"][1]["status"] == "active" @pytest.mark.asyncio async def test_transform_data_error_handling(self, tools): """Test error handling in transformations""" data = [{"field": "value1"}, {"field": "value2"}] transformations = [{ "type": "filter_field", "field": "field", "condition": {"operator": "eq", "value": "nonexistent"} }] result = await tools.transform_data(data, transformations) # All records should be skipped due to filter assert result["success"] is True assert result["statistics"]["records_skipped"] == 2 assert result["statistics"]["records_processed"] == 0 class TestExportData: """Test data export functionality""" @pytest.mark.asyncio async def test_export_json(self, tools): """Test JSON export""" data = [{"id": 1, "name": "test"}] result = await tools.export_data(data, format="json") assert result["success"] is True assert result["format"] == "json" assert result["records_exported"] == 1 assert "data" in result assert '"id": 1' in result["data"] @pytest.mark.asyncio async def test_export_csv(self, tools): """Test CSV export""" data = [{"id": 1, "name": "test"}, {"id": 2, "name": "test2"}] result = await tools.export_data(data, format="csv") assert result["success"] is True assert result["format"] == "csv" assert result["records_exported"] == 2 csv_data = result["data"] assert "id,name" in csv_data assert "1,test" in csv_data @pytest.mark.asyncio async def test_export_unsupported_format(self, tools): """Test handling of unsupported export format""" data = [{"id": 1}] with pytest.raises(Exception, match="Unsupported export format"): await tools.export_data(data, format="xml") class TestHealthCheck: """Test health check functionality""" @pytest.mark.asyncio async def test_api_health_check_success(self, tools): """Test successful API health check""" with patch.object(tools, 'session') as mock_session: mock_response = AsyncMock() mock_response.status = 200 mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools.check_api_health() assert result is True @pytest.mark.asyncio async def test_api_health_check_failure(self, tools): """Test failed API health check""" with patch.object(tools, 'session') as mock_session: mock_response = AsyncMock() mock_response.status = 500 mock_session.get.return_value.__aenter__.return_value = mock_response result = await tools.check_api_health() assert result is False ``` **Integration Tests** *tests/test_integration.py* ```python """ Integration tests for My New Template """ import pytest import asyncio from unittest.mock import patch import tempfile import json from src.server import MyNewTemplateMCPServer from src.config import Config @pytest.fixture def integration_config(): """Create integration test configuration""" return Config( api_key="integration-test-key", api_base_url="https://httpbin.org", timeout=30, rate_limit=10 ) @pytest.mark.integration class TestFullWorkflow: """Test complete workflows end-to-end""" @pytest.mark.asyncio async def test_fetch_transform_export_workflow(self, integration_config): """Test complete data pipeline workflow""" server = MyNewTemplateMCPServer(integration_config) # Mock external API calls with patch.object(server.tools, 'fetch_data') as mock_fetch, \ patch.object(server.tools, 'transform_data') as mock_transform, \ patch.object(server.tools, 'export_data') as mock_export: # Setup mock responses mock_fetch_result = { "success": True, "data": [{"id": 1, "name": "test1"}, {"id": 2, "name": "test2"}] } mock_fetch.return_value = mock_fetch_result mock_transform_result = { "success": True, "data": [{"id": 1, "name": "TEST1"}, {"id": 2, "name": "TEST2"}], "statistics": {"records_processed": 2} } mock_transform.return_value = mock_transform_result mock_export_result = { "success": True, "format": "json", "records_exported": 2 } mock_export.return_value = mock_export_result # Execute workflow fetch_result = await server.tools.fetch_data("test/data") assert fetch_result["success"] is True transform_result = await server.tools.transform_data( fetch_result["data"], [{"type": "format_field", "field": "name", "format": "uppercase"}] ) assert transform_result["success"] is True export_result = await server.tools.export_data( transform_result["data"], format="json" ) assert export_result["success"] is True @pytest.mark.integration class TestErrorRecovery: """Test error recovery and resilience""" @pytest.mark.asyncio async def test_api_failure_recovery(self, integration_config): """Test recovery from API failures""" server = MyNewTemplateMCPServer(integration_config) # Test graceful handling of API failures with patch.object(server.tools, 'session') as mock_session: mock_session.get.side_effect = Exception("Network error") result = await server.tools.fetch_data("test/endpoint") # Should handle error gracefully assert "error" in str(result).lower() ``` #### 4. Quality Assurance **Code Quality Tools** *Makefile* ```makefile # Development commands for My New Template .PHONY: setup-dev test lint format type-check coverage clean build # Development setup setup-dev: pip install -e . pip install -r requirements-dev.txt pre-commit install # Testing test: pytest tests/ -v --tb=short test-integration: pytest tests/ -v --tb=short -m integration test-coverage: pytest tests/ --cov=src --cov-report=html --cov-report=term # Code quality lint: flake8 src/ tests/ pylint src/ format: black src/ tests/ isort src/ tests/ type-check: mypy src/ # Quality gate (run all checks) quality-gate: format lint type-check test-coverage @echo "✅ All quality checks passed!" # Build build: docker build -t my-new-template . build-dev: docker build -f Dockerfile.dev -t my-new-template:dev . # Cleanup clean: find . -type d -name __pycache__ -delete find . -type f -name "*.pyc" -delete rm -rf htmlcov/ rm -rf .coverage rm -rf dist/ rm -rf build/ # Template operations validate: mcpt validate . deploy-test: mcpt deploy . --config environment=test deploy-prod: mcpt deploy . --config environment=production ``` *pyproject.toml* ```toml [build-system] requires = ["setuptools>=61.0", "wheel"] build-backend = "setuptools.build_meta" [project] name = "my-new-template" version = "1.0.0" description = "Professional MCP server template" readme = "README.md" requires-python = ">=3.8" dependencies = [ "fastmcp>=0.9.0", "aiohttp>=3.8.0", "pandas>=1.3.0", "openpyxl>=3.0.0", "pyarrow>=5.0.0" ] [project.optional-dependencies] dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", "pytest-cov>=4.0.0", "black>=22.0.0", "flake8>=5.0.0", "pylint>=2.15.0", "mypy>=0.991", "pre-commit>=2.20.0", "isort>=5.10.0" ] [tool.black] line-length = 88 target-version = ['py38'] [tool.isort] profile = "black" line_length = 88 [tool.pylint.messages_control] disable = ["C0330", "C0326"] [tool.mypy] python_version = "3.8" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] markers = [ "integration: marks tests as integration tests (deselect with '-m \"not integration\"')", "slow: marks tests as slow (deselect with '-m \"not slow\"')" ] [tool.coverage.run] source = ["src"] omit = ["tests/*"] [tool.coverage.report] exclude_lines = [ "pragma: no cover", "def __repr__", "raise AssertionError", "raise NotImplementedError" ] ``` #### 5. Documentation Phase **Comprehensive Documentation** *README.md* ```markdown # My New Template Professional MCP server template for XYZ API integration with comprehensive data processing capabilities. ## Features - **Data Fetching**: Retrieve data from XYZ API with flexible filtering - **Data Transformation**: Apply complex transformations with validation - **Data Export**: Export in multiple formats (JSON, CSV, Parquet, Excel) - **Health Monitoring**: Comprehensive health checks and diagnostics - **Error Recovery**: Robust error handling and recovery mechanisms - **Performance**: Optimized for high-throughput data processing ## Quick Start ### Using MCP Template Platform ```bash # Deploy with basic configuration mcpt deploy my-new-template --config api_key=your-key # Deploy with custom configuration mcpt deploy my-new-template --config-file config.json ``` ### Manual Docker Deployment ```bash # Build the image docker build -t my-new-template . # Run with environment variables docker run -d \ -e CUSTOM_API_KEY=your-api-key \ -e CUSTOM_BASE_URL=https://api.example.com \ -p 8080:8080 \ my-new-template ``` ## Configuration ### Required Configuration | Option | Environment Variable | Description | |--------|---------------------|-------------| | `api_key` | `CUSTOM_API_KEY` | API authentication key | ### Optional Configuration | Option | Environment Variable | Default | Description | |--------|---------------------|---------|-------------| | `base_url` | `CUSTOM_BASE_URL` | `https://api.example.com` | API base URL | | `features` | `CUSTOM_FEATURES` | `["feature1","feature2"]` | Enabled features (comma-separated) | | `advanced.timeout` | `CUSTOM_ADVANCED__TIMEOUT` | `30` | Request timeout in seconds | | `advanced.retry_attempts` | `CUSTOM_ADVANCED__RETRY_ATTEMPTS` | `3` | Number of retry attempts | ### Configuration File Example ```json { "api_key": "your-api-key-here", "base_url": "https://api.example.com", "features": ["data_processing", "export_support"], "advanced": { "timeout": 60, "retry_attempts": 5 } } ``` ## Available Tools ### fetch_data Retrieve data from XYZ API with flexible filtering options. **Parameters:** - `endpoint` (string, required): API endpoint to fetch from - `filters` (object, optional): Query filters to apply - `format` (string, optional): Response format (json, csv, xml) **Example:** ```json { "endpoint": "users", "filters": { "category": "active", "limit": 100 }, "format": "json" } ``` ### transform_data Apply transformations to data with comprehensive validation. **Parameters:** - `data` (array, required): Input data to transform - `transformations` (array, required): List of transformation rules **Transformation Types:** - `rename_field`: Rename a field - `add_field`: Add a computed field - `filter_field`: Filter records based on conditions - `format_field`: Format field values **Example:** ```json { "data": [{"old_name": "value"}], "transformations": [ { "type": "rename_field", "from": "old_name", "to": "new_name" } ] } ``` ### export_data Export data in various formats with destination support. **Parameters:** - `data` (array, required): Data to export - `format` (string, optional): Export format (json, csv, parquet, xlsx) - `destination` (string, optional): Destination path or URL **Example:** ```json { "data": [{"id": 1, "name": "test"}], "format": "csv", "destination": "/tmp/export.csv" } ``` ### health_check Get comprehensive server health status and diagnostics. **Returns:** - Server status and version - Configuration validation - External dependency health - Performance metrics ## Integration Examples ### Claude Desktop Add to your Claude Desktop configuration: ```json { "mcpServers": { "my-new-template": { "command": "python", "args": ["-m", "mcp_template", "run", "my-new-template"], "env": { "CUSTOM_API_KEY": "your-api-key" } } } } ``` ### Continue.dev Add to your Continue configuration: ```json { "mcpServers": [ { "name": "my-new-template", "command": ["python", "-m", "mcp_template", "run", "my-new-template"], "env": { "CUSTOM_API_KEY": "your-api-key" } } ] } ``` ## Development See [Development Guide](docs/development.md) for comprehensive development documentation. ### Quick Development Setup ```bash # Clone and setup git clone <repository> cd my-new-template make setup-dev # Run tests make test # Quality checks make quality-gate # Build and deploy make build make deploy-test ``` ## Support - **Documentation**: [Template docs](docs/) - **Issues**: [GitHub Issues](https://github.com/Data-Everything/mcp-server-templates/issues) - **Community**: [Join our Discord](https://discord.gg/55Cfxe9gnr) - **Professional Support**: [Contact us](mailto:support@dataeverything.ai) ## License MIT License - see [LICENSE](LICENSE) file for details. ``` ## Deployment and Distribution ### Production Deployment ```bash # Production build with optimizations docker build --target production -t my-template:prod . # Deploy to production environment mcpt deploy my-template \ --config-file production-config.json \ --environment production \ --replicas 3 \ --monitor ``` ### Template Registry Submission ```bash # Package template for distribution mcpt package my-template \ --include-tests \ --validate \ --optimize # Submit to template registry mcpt submit my-template.tar.gz \ --category api \ --tags "data,api,integration" \ --license MIT ``` ## Best Practices Summary ### Code Quality - **Comprehensive Testing**: Unit, integration, and end-to-end tests - **Error Handling**: Graceful error handling with meaningful messages - **Type Safety**: Full type annotations with mypy validation - **Code Style**: Consistent formatting with Black and isort - **Documentation**: Comprehensive docstrings and user documentation ### Performance - **Async/Await**: Non-blocking operations for scalability - **Connection Pooling**: Efficient HTTP client management - **Rate Limiting**: Respectful API usage patterns - **Resource Management**: Proper cleanup and resource management ### Security - **Input Validation**: Comprehensive parameter validation - **Secret Management**: Secure handling of API keys and credentials - **Principle of Least Privilege**: Minimal required permissions - **Dependency Management**: Regular security updates ### Operations - **Health Monitoring**: Comprehensive health checks - **Structured Logging**: Detailed operational logging - **Metrics Collection**: Performance and usage metrics - **Graceful Degradation**: Resilient error handling --- **Next Steps:** - [Learn about testing strategies](testing.md) - [Explore deployment options](../cli/deploy.md) - [View integration examples](../examples/integrations.md) - [Contribute to the platform](../guides/contributing.md)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server