server.py•12.8 kB
"""MCP server implementation for ALA API."""
from __future__ import annotations
import asyncio
import json
from typing import Any
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from .client import ALAClient
from .config import settings
from .logging_config import setup_logging
# Setup logging
logger = setup_logging(level=settings.log_level)
# Initialize server
app = Server(settings.server_name)
# Initialize API client
ala_client = ALAClient()
@app.list_tools()
async def list_tools() -> list[Tool]:
"""
List available MCP tools.
Returns:
List of available tools
"""
logger.debug("Listing available tools")
return [
Tool(
name="search_occurrences",
description="Search for species occurrences in the Atlas of Living Australia. Access to 152+ million biodiversity records with filtering, faceting, and spatial search capabilities.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (e.g., 'Eucalyptus', 'Koala', or '*:*' for all records)",
},
"filters": {
"type": "array",
"items": {"type": "string"},
"description": "Filter queries (e.g., ['state:Victoria', 'year:[2020 TO 2024]'])",
},
"pageSize": {
"type": "integer",
"description": "Number of results to return (default: 10, max: 5000)",
"default": 10,
},
"start": {
"type": "integer",
"description": "Pagination offset (default: 0)",
"default": 0,
},
"facets": {
"type": "string",
"description": "Comma-separated facet fields (e.g., 'basis_of_record,year')",
},
"lat": {
"type": "number",
"description": "Latitude for spatial search (use with lon and radius)",
},
"lon": {
"type": "number",
"description": "Longitude for spatial search (use with lat and radius)",
},
"radius": {
"type": "number",
"description": "Search radius in kilometers (use with lat and lon)",
},
"wkt": {
"type": "string",
"description": "Well-Known Text polygon for spatial search",
},
},
"required": ["query"],
},
),
Tool(
name="get_occurrence",
description="Retrieve detailed information about a specific occurrence record by its UUID.",
inputSchema={
"type": "object",
"properties": {
"uuid": {
"type": "string",
"description": "The unique identifier (UUID) of the occurrence record",
}
},
"required": ["uuid"],
},
),
Tool(
name="download_occurrences",
description="Request an asynchronous download of occurrence data as CSV/TSV. Results are sent via email. Suitable for large datasets.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (e.g., 'taxon_name:Eucalyptus')",
},
"email": {
"type": "string",
"description": "Email address to receive download notification",
},
"reasonTypeId": {
"type": "string",
"description": "Reason for download (e.g., '10' for testing, '4' for scientific research, '7' for ecological research)",
},
"fields": {
"type": "string",
"description": "Comma-separated list of fields to include (e.g., 'uuid,scientificName,decimalLatitude,decimalLongitude,eventDate')",
},
"filters": {
"type": "array",
"items": {"type": "string"},
"description": "Filter queries (e.g., ['state:Victoria'])",
},
"fileType": {
"type": "string",
"enum": ["csv", "tsv"],
"description": "File format (default: csv)",
"default": "csv",
},
"mintDoi": {
"type": "boolean",
"description": "Generate a DOI for the dataset (default: false)",
"default": False,
},
},
"required": ["query", "email", "reasonTypeId", "fields"],
},
),
Tool(
name="count_taxa",
description="Get occurrence counts for a list of taxa. Useful for generating reports on species occurrence frequency.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"filters": {
"type": "array",
"items": {"type": "string"},
"description": "Filter queries",
},
},
"required": ["query"],
},
),
Tool(
name="create_query_id",
description="Create a cached query ID for complex searches. Returns a short identifier that can be used to reference the full query parameters.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"filters": {
"type": "array",
"items": {"type": "string"},
"description": "Filter queries",
},
"params": {"type": "object", "description": "Additional query parameters"},
},
"required": ["query"],
},
),
Tool(
name="get_query",
description="Retrieve stored query parameters by query ID.",
inputSchema={
"type": "object",
"properties": {
"queryId": {"type": "string", "description": "The query ID to retrieve"}
},
"required": ["queryId"],
},
),
Tool(
name="api_request",
description="Advanced: Make custom HTTP requests to any ALA API endpoint. Use this for endpoints not covered by specific tools.",
inputSchema={
"type": "object",
"properties": {
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE"],
"description": "HTTP method",
},
"endpoint": {
"type": "string",
"description": "API endpoint path (e.g., '/occurrences/search')",
},
"params": {"type": "object", "description": "Query parameters"},
"data": {"type": "object", "description": "Request body data"},
"headers": {"type": "object", "description": "Additional headers"},
},
"required": ["method", "endpoint"],
},
),
]
async def _execute_request(
method: str,
endpoint: str,
params: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> list[TextContent]:
"""
Execute an API request and return formatted response.
Args:
method: HTTP method
endpoint: API endpoint path
params: Query parameters
data: Request body data
headers: Additional headers
Returns:
List of text content responses
"""
try:
result = await ala_client.request(
method=method, endpoint=endpoint, params=params or {}, data=data, headers=headers or {}
)
logger.debug(f"Request successful: {result['status_code']}")
return [TextContent(type="text", text=json.dumps(result, indent=2))]
except Exception as e:
logger.error(f"Request failed: {type(e).__name__}: {e}")
error_result = {"error": str(e), "type": type(e).__name__}
return [TextContent(type="text", text=json.dumps(error_result, indent=2))]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""
Handle tool calls.
Args:
name: Tool name
arguments: Tool arguments
Returns:
List of text content responses
Raises:
ValueError: If tool name is unknown
"""
logger.info(f"Executing {name} with arguments: {arguments}")
if name == "search_occurrences":
params = {
"q": arguments["query"],
"pageSize": arguments.get("pageSize", 10),
"start": arguments.get("start", 0),
}
if "filters" in arguments:
params["fq"] = arguments["filters"]
if "facets" in arguments:
params["facets"] = arguments["facets"]
if "lat" in arguments:
params["lat"] = arguments["lat"]
if "lon" in arguments:
params["lon"] = arguments["lon"]
if "radius" in arguments:
params["radius"] = arguments["radius"]
if "wkt" in arguments:
params["wkt"] = arguments["wkt"]
return await _execute_request("GET", "/occurrences/search", params=params)
elif name == "get_occurrence":
uuid = arguments["uuid"]
return await _execute_request("GET", f"/occurrence/{uuid}")
elif name == "download_occurrences":
params = {
"q": arguments["query"],
"email": arguments["email"],
"reasonTypeId": arguments["reasonTypeId"],
"fields": arguments["fields"],
"fileType": arguments.get("fileType", "csv"),
}
if "filters" in arguments:
params["fq"] = arguments["filters"]
if "mintDoi" in arguments:
params["mintDoi"] = arguments["mintDoi"]
return await _execute_request("POST", "/occurrences/offline/download", params=params)
elif name == "count_taxa":
params = {"q": arguments["query"]}
if "filters" in arguments:
params["fq"] = arguments["filters"]
return await _execute_request("GET", "/occurrences/taxaCount", params=params)
elif name == "create_query_id":
params = {"q": arguments["query"]}
if "filters" in arguments:
params["fq"] = arguments["filters"]
if "params" in arguments:
params.update(arguments["params"])
return await _execute_request("POST", "/qid", params=params)
elif name == "get_query":
query_id = arguments["queryId"]
return await _execute_request("GET", f"/qid/{query_id}")
elif name == "api_request":
method = arguments["method"]
endpoint = arguments["endpoint"]
params = arguments.get("params", {})
data = arguments.get("data")
headers = arguments.get("headers", {})
return await _execute_request(method, endpoint, params, data, headers)
else:
logger.error(f"Unknown tool requested: {name}")
raise ValueError(f"Unknown tool: {name}")
async def main() -> None:
"""Run the MCP server."""
logger.info(f"Starting {settings.server_name}")
logger.info(f"Base URL: {settings.base_url}")
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())