Skip to main content
Glama

STAC MCP Server

by BnJam
CONTEXT.md16 kB
# FastMCP Context Usage Guide for STAC MCP **Status**: Technical Guide **Last Updated**: 2025-10-18 **Purpose**: How to use the FastMCP `Context` object in STAC MCP tools, resources, and prompts --- ## Overview This guide describes how STAC MCP uses (or will use) the FastMCP `Context` object and how to keep logging and orchestration consistent across STAC tools, resources, and shared modules. --- ## 1. Where Context Lives ### Current Architecture (MCP 1.0) STAC MCP currently uses the `mcp` package directly without FastMCP's `Context` object. Tools receive parameters directly: ```python async def execute_tool(tool_name: str, arguments: dict): """Current tool execution pattern.""" if tool_name == "search_items": return await handle_search_items(arguments) ``` ### Future FastMCP Architecture When migrating to FastMCP (issues #69, #78), `Context` will be injected: - **Entry points**: `Context` will be injected into MCP tools, resources, and prompts in `stac_mcp/tools/`, `stac_mcp/resources/` (future), and `stac_mcp/prompts/` (future) - **Shared helpers**: Modules under `stac_mcp/tools/client.py` and future shared utilities will accept `ctx: Context | None` but remain silent unless the caller explicitly opts in This keeps shared STAC logic reusable outside MCP and avoids duplicate logging. --- ## 2. Allowed Operations When using `Context` in STAC MCP, limit usage to: ### Logging ```python await ctx.info("Searching STAC catalog for items") await ctx.debug(f"Using bbox: {bbox}, datetime: {datetime}") await ctx.warning("Collection not found, trying fallback") await ctx.error("STAC API connection failed") ``` ### Progress Reporting ```python # For long-running operations like size estimation await ctx.report_progress(processed_items, total_items, "Estimating data size...") ``` ### Resource Access ```python # Read STAC catalog metadata from resources catalog_info = await ctx.read_resource("catalog://root") collections = await ctx.read_resource("catalog://collections") ``` ### Request Metadata ```python # Access request identifiers for correlation request_id = ctx.request_id client_id = ctx.client_id ``` ### State Management ```python # Manage per-request scratch state (e.g., search cache) ctx.set_state("last_search_params", search_params) last_params = ctx.get_state("last_search_params") ``` ### LLM Sampling (Advanced) ```python # Optional: When a tool needs on-the-fly assistance collection_advice = await ctx.sample( "Which STAC collection is best for vegetation monitoring?" ) ``` --- ## 3. Prohibited Operations Avoid the following to prevent `Context` from becoming a god object: ❌ **Don't persist state across requests** ```python # BAD: Global state mutation ctx.set_state("all_searches", global_search_list) ``` ❌ **Don't embed business logic in Context** ```python # BAD: Context-dependent decisions if ctx.is_admin(): # Wrong approach return sensitive_data ``` ❌ **Don't transport large datasets** ```python # BAD: Large binary data in context ctx.set_state("full_stac_items", huge_item_list) ``` ❌ **Don't perform blocking I/O** ```python # BAD: Blocking operations in context wrappers def helper(ctx): time.sleep(10) # Blocks event loop ``` --- ## 4. Logging Patterns for STAC ### Log at the Boundary Entry points announce start/end, key milestones, and recoverable errors: ```python @mcp.tool() async def search_items( collections: list[str], bbox: list[float] | None = None, datetime: str | None = None, limit: int = 10, ctx: Context | None = None ) -> dict: """Search STAC catalog for items.""" if ctx: await ctx.info(f"[search_items] Starting search in {collections}") try: results = await stac_client.search( collections=collections, bbox=bbox, datetime=datetime, limit=limit ) if ctx: await ctx.info(f"[search_items] Found {len(results)} items") return format_results(results) except Exception as e: if ctx: await ctx.error(f"[search_items] Search failed: {str(e)}") raise ``` ### No Duplicate Logging Shared helpers remain silent by default: ```python # stac_mcp/tools/client.py class STACClient: async def search( self, collections: list[str], bbox: list[float] | None = None, ctx: Context | None = None ): """Shared STAC search logic - silent by default.""" # Only log if ctx is provided AND caller opts in if ctx: await ctx.debug(f"[STACClient] Executing search") # Core logic without logging return await self._execute_search(collections, bbox) ``` ### Consistency with Prefixes Prefix messages so log consumers can filter quickly: ```python await ctx.info("[search_items] Starting search") await ctx.info("[estimate_data_size] Processing 50 items") await ctx.debug("[STACClient] Making API request") await ctx.warning("[get_collection] Collection not found in cache") ``` ### Signal Over Noise Prefer progress updates over repetitive info messages: ```python # GOOD: Progress updates in loops for i, item in enumerate(items): if i % 10 == 0 and ctx: await ctx.report_progress(i, len(items), f"Processing item {i}/{len(items)}") process_item(item) # BAD: Info message in tight loop for item in items: await ctx.info(f"Processing {item.id}") # Too noisy ``` --- ## 5. Passing Context to STAC Helpers When helpers need logging or progress reporting, pass `ctx` explicitly: ### Example: Size Estimation with Progress ```python # Entry point (tool) @mcp.tool() async def estimate_data_size( collections: list[str], bbox: list[float] | None = None, limit: int = 50, ctx: Context | None = None ) -> dict: """Estimate data size with progress tracking.""" if ctx: await ctx.info("[estimate_data_size] Starting") # Pass ctx to helper items = await search_items_helper(collections, bbox, limit, ctx=ctx) # Helper uses ctx for progress size_estimate = await calculate_size_with_progress(items, ctx=ctx) return { "estimated_bytes": size_estimate, "item_count": len(items) } # Helper function async def calculate_size_with_progress( items: list, ctx: Context | None = None ) -> int: """Calculate size with optional progress reporting.""" total_size = 0 for i, item in enumerate(items): if ctx and i % 10 == 0: await ctx.report_progress(i, len(items), "Calculating size...") total_size += get_item_size(item) return total_size ``` ### Example: STAC Search with Logging ```python # Entry point (tool) @mcp.tool() async def search_collections( query: str | None = None, ctx: Context | None = None ) -> dict: """Search STAC collections.""" if ctx: await ctx.info(f"[search_collections] Query: {query}") # Call helper with ctx collections = await stac_client.get_collections(query=query, ctx=ctx) return format_collections(collections) # Helper in STACClient async def get_collections( self, query: str | None = None, ctx: Context | None = None ): """Get collections with optional logging.""" if ctx: await ctx.debug(f"[STACClient] Fetching collections: {query}") # Core logic response = await self._http_get("/collections") return response.get("collections", []) ``` --- ## 6. STAC-Specific Context Patterns ### Pattern 1: Multi-Step STAC Workflow ```python @mcp.tool() async def comprehensive_stac_search( data_type: str, region: str, time_period: str, ctx: Context | None = None ) -> dict: """Multi-step STAC search with context tracking.""" # Step 1: Read collection info from resource if ctx: await ctx.info("[comprehensive_search] Step 1: Reading collections") collections_resource = await ctx.read_resource("catalog://collections") # Step 2: Select appropriate collection if ctx: await ctx.info("[comprehensive_search] Step 2: Selecting collection") selected_collection = choose_collection(data_type, collections_resource) # Step 3: Execute search with progress if ctx: await ctx.info(f"[comprehensive_search] Step 3: Searching {selected_collection}") items = await search_items( collections=[selected_collection], bbox=parse_region(region), datetime=parse_time_period(time_period), ctx=ctx ) # Step 4: Validate results if ctx: await ctx.info(f"[comprehensive_search] Step 4: Validating {len(items)} items") return {"items": items, "collection": selected_collection} ``` ### Pattern 2: Retry with Logging ```python @mcp.tool() async def resilient_stac_search( collections: list[str], bbox: list[float], ctx: Context | None = None ) -> dict: """STAC search with retry and logging.""" max_retries = 3 for attempt in range(max_retries): try: if ctx: await ctx.info(f"[resilient_search] Attempt {attempt + 1}/{max_retries}") results = await stac_client.search( collections=collections, bbox=bbox, ctx=ctx ) if ctx: await ctx.info(f"[resilient_search] Success on attempt {attempt + 1}") return {"items": results} except Exception as e: if ctx: await ctx.warning( f"[resilient_search] Attempt {attempt + 1} failed: {str(e)}" ) if attempt == max_retries - 1: if ctx: await ctx.error("[resilient_search] All attempts failed") raise await asyncio.sleep(2 ** attempt) # Exponential backoff ``` ### Pattern 3: Size Estimation with Progress ```python @mcp.tool() async def estimate_data_size( collections: list[str], bbox: list[float] | None = None, limit: int = 50, ctx: Context | None = None ) -> dict: """Estimate size with detailed progress.""" # Phase 1: Search if ctx: await ctx.info("[estimate_size] Phase 1/3: Searching for items") items = await search_items(collections, bbox, limit=limit, ctx=ctx) # Phase 2: Load metadata if ctx: await ctx.info(f"[estimate_size] Phase 2/3: Loading metadata for {len(items)} items") total_size = 0 for i, item in enumerate(items): if ctx and i % 10 == 0: await ctx.report_progress(i, len(items), "Calculating size...") total_size += calculate_item_size(item) # Phase 3: Format results if ctx: await ctx.info("[estimate_size] Phase 3/3: Formatting results") return { "estimated_bytes": total_size, "item_count": len(items), "average_bytes_per_item": total_size // len(items) if items else 0 } ``` --- ## 7. Review Checklist Before merging changes that use Context: - [ ] Logging occurs only at entry points or deliberately opted-in helpers - [ ] All `ctx.` usage appears in modules cleared for orchestration responsibilities - [ ] Shared modules accept `ctx: Context | None` but do not require it - [ ] No business logic or configuration lives inside context helper functions - [ ] Progress updates used for long-running operations (>5 seconds) - [ ] Error logging includes actionable information - [ ] Log messages are prefixed with component name `[component]` - [ ] No duplicate logging between entry points and helpers - [ ] Context state used only for per-request scratch data - [ ] No blocking I/O inside context operations --- ## 8. Migration Guide: Current → FastMCP ### Current State (v1.2.0) ```python # Current: No context, direct execution async def handle_search_items(arguments: dict): """Execute STAC search without context.""" results = await stac_client.search( collections=arguments["collections"], bbox=arguments.get("bbox"), datetime=arguments.get("datetime") ) return format_results(results) ``` ### Future with FastMCP ```python # Future: With Context support @mcp.tool() async def search_items( collections: list[str], bbox: list[float] | None = None, datetime: str | None = None, ctx: Context | None = None ) -> dict: """Search STAC catalog for items.""" if ctx: await ctx.info(f"[search_items] Searching {collections}") results = await stac_client.search( collections=collections, bbox=bbox, datetime=datetime, ctx=ctx ) if ctx: await ctx.info(f"[search_items] Found {len(results)} items") return format_results(results) ``` --- ## 9. Examples by Tool Category ### STAC Search Tools ```python @mcp.tool() async def search_items( collections: list[str], bbox: list[float] | None = None, ctx: Context | None = None ) -> dict: if ctx: await ctx.info(f"[search_items] Searching {', '.join(collections)}") # Implementation... ``` ### PySTAC CRUDL Tools ```python @mcp.tool() async def pystac_create_collection( collection_id: str, title: str, output_path: str, ctx: Context | None = None ) -> dict: if ctx: await ctx.info(f"[pystac_create_collection] Creating {collection_id}") collection = create_collection(collection_id, title) collection.save_object(output_path) if ctx: await ctx.info(f"[pystac_create_collection] Saved to {output_path}") return {"id": collection_id, "path": output_path} ``` ### Data Analysis Tools ```python @mcp.tool() async def estimate_data_size( collections: list[str], limit: int = 50, ctx: Context | None = None ) -> dict: if ctx: await ctx.info("[estimate_data_size] Starting estimation") items = await search_items(collections, limit=limit, ctx=ctx) size = 0 for i, item in enumerate(items): if ctx and i % 10 == 0: await ctx.report_progress(i, len(items)) size += calculate_size(item) return {"estimated_bytes": size} ``` --- ## 10. Environment Variable Integration Context can access environment-based configuration: ```python @mcp.tool() async def configure_catalog( catalog_url: str | None = None, ctx: Context | None = None ) -> dict: """Configure STAC catalog with logging.""" # Use env var if not provided url = catalog_url or os.getenv("STAC_CATALOG_URL") if ctx: await ctx.info(f"[configure_catalog] Using catalog: {url}") # Configure client stac_client.configure(url) if ctx: await ctx.info("[configure_catalog] Configuration complete") return {"catalog_url": url, "status": "configured"} ``` --- ## Conclusion This Context usage policy: - Keeps MCP logs meaningful with boundary-based logging - Maintains clean separation between control flow and domain logic - Preserves the ability to reuse STAC shared modules outside MCP - Enables progress tracking for long-running STAC operations - Supports future FastMCP migration without major refactoring By following these guidelines, STAC MCP will have consistent, helpful logging and orchestration when integrated with FastMCP. --- ## References - **FastMCP Documentation**: [Context Guide](https://github.com/jlowin/fastmcp) - **GUIDELINES.md**: Overall FastMCP usage in STAC MCP - **Issue #69**: FastMCP integration - **Issue #78**: FastMCP doc/architecture implementation --- **Last Updated**: 2025-10-18 **Related Docs**: [GUIDELINES.md](./GUIDELINES.md), [DECORATORS.md](./DECORATORS.md), [PROMPTS.md](./PROMPTS.md), [RESOURCES.md](./RESOURCES.md)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BnJam/stac-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server