Skip to main content
Glama

IaC Memory MCP Server

by AgentWong
resources.py32.5 kB
"""Resource handlers for the IaC Memory MCP Server. This module contains the MCP server handlers for resource operations. Database operations are handled in db/resources.py. """ import logging import os from typing import Any, Dict, Optional from urllib.parse import urlparse import mcp.types as types from mcp.server.lowlevel.server import RequestContext, Server from mcp.shared.exceptions import McpError from pydantic import GetJsonSchemaHandler from pydantic.json_schema import JsonSchemaValue from pydantic_core import CoreSchema, Url from pydantic_core.core_schema import ValidationInfo from .context import BaseContext as TestContext from .db.connection import DatabaseError from .db.resources import ( get_ansible_collections, get_ansible_modules, get_entities, get_entity_relationships, get_terraform_providers, get_terraform_resources, ) from .db.terraform import get_provider_resources, get_resource_info from .shared import get_db # Configure module logger to inherit from root logger = logging.getLogger("iac_memory.resources") class ResourceUrl: """Custom URL type for resource URIs that supports template variables.""" @classmethod def validate(cls, value: Any, handler: Optional[ValidationInfo] = None) -> str: """Validate and normalize resource URLs.""" if isinstance(value, str): # Parse URL components try: parsed = urlparse(value) except Exception as e: raise ValueError(f"Invalid URL format: {str(e)}") # Validate scheme if parsed.scheme not in ["terraform", "ansible", "iac", "resources"]: raise ValueError( f"Invalid scheme '{parsed.scheme}'. Must be one of: terraform, ansible, iac, resources" ) # For template URIs, preserve the exact string without any parsing if "{" in value and "}" in value: # Just validate the scheme scheme = value.split("://", 1)[0] if scheme not in ["terraform", "ansible", "iac", "resources"]: raise ValueError( f"Invalid scheme '{scheme}'. Must be one of: terraform, ansible, iac, resources" ) return value # For concrete URIs, validate path components path = parsed.path.strip("/") if not path: raise ValueError("Path cannot be empty") parts = path.split("/") if len(parts) < 2: raise ValueError( "Invalid resource path. Must have at least 2 components." ) # For hierarchical URIs (resources://terraform/providers/aws), # we only validate basic path structure if parsed.scheme == "resources": if not parts: raise ValueError("Path cannot be empty for hierarchical URIs") if len(parts) < 2: raise ValueError("Path must include at least two components") return value elif isinstance(value, (Url, ResourceUrl)): return str(value) else: raise ValueError(f"Invalid URL type: {type(value)}") def __init__(self, url: str): """Initialize with a URL string.""" self._url = url if "{" in url and "}" in url else self.validate(url, None) self._is_template = "{" in url and "}" in url def __str__(self) -> str: """Return the raw URL string.""" return self._url def __repr__(self) -> str: """Return a string representation.""" return f"ResourceUrl({self._url!r})" def __eq__(self, other): """Compare URLs, preserving template variables.""" if isinstance(other, str): return str(self) == other return str(self) == str(other) @classmethod def __get_pydantic_core_schema__( cls, source_type: Any, handler: GetJsonSchemaHandler ) -> CoreSchema: """Get Pydantic core schema for URL validation.""" return CoreSchema(type="str", validator=cls.validate) @classmethod def __get_pydantic_json_schema__( cls, core_schema: CoreSchema, handler: GetJsonSchemaHandler ) -> JsonSchemaValue: """Get JSON schema for URL validation.""" return { "type": "string", "format": "uri", "pattern": "^(terraform|ansible|iac|resources)://.*$", } # Define resource templates for common access patterns RESOURCE_TEMPLATES = [ { "uriTemplate": "resources://terraform/providers/{provider_name}", "name": "Terraform Provider", "description": "Access Terraform provider information and resources", "mimeType": "application/json", }, { "uriTemplate": "resources://terraform/resources/{provider_name}/{resource_type}", "name": "Terraform Resource", "description": "Access specific Terraform resource type information", "mimeType": "application/json", }, { "uriTemplate": "resources://ansible/collections/{collection_name}", "name": "Ansible Collection", "description": "Access Ansible collection information and modules", "mimeType": "application/json", }, { "uriTemplate": "resources://ansible/modules/{collection_name}/{module_name}", "name": "Ansible Module", "description": "Access specific Ansible module information", "mimeType": "application/json", }, { "uriTemplate": "resources://entities/{entity_id}", "name": "Entity", "description": "Access entity information including observations and relationships", "mimeType": "application/json", }, { "uriTemplate": "resources://entities/{entity_id}/relationships", "name": "Entity Relationships", "description": "Access entity relationship information", "mimeType": "application/json", }, ] def extract_template_variables(uri: str, template: str) -> Optional[Dict[str, str]]: """Extract variables from a URI based on a template pattern. Args: uri: The actual URI to parse template: The template pattern containing {variable} placeholders Returns: Dictionary of variable names to values, or None if URI doesn't match template """ # Convert template into regex pattern pattern_parts = template.split("/") uri_parts = uri.split("/") if len(pattern_parts) != len(uri_parts): return None variables = {} for template_part, uri_part in zip(pattern_parts, uri_parts): if template_part.startswith("{") and template_part.endswith("}"): var_name = template_part[1:-1] variables[var_name] = uri_part elif template_part != uri_part: return None return variables async def handle_list_resources( ctx: RequestContext = None, page: int = 1, per_page: int = 50 ) -> list[types.Resource]: """List available resources and resource templates. Lists both concrete resources from the database and available resource templates that can be used to construct valid resource URIs. Args: ctx: Request context page: Page number (1-based) per_page: Number of items per page """ if ctx is None: try: from mcp.server.lowlevel.server import request_ctx ctx = request_ctx.get() except LookupError: ctx = TestContext(operation_name="list_resources") logger.info("Starting list_resources operation") # Get database instance db = get_db() logger.info("Listing available resources", extra={"database_path": db.db_path}) # Validate database state if not db.is_initialized(): msg = "Database not initialized" logger.error(msg) raise McpError( types.ErrorData( code=types.INTERNAL_ERROR, message=msg, data={"operation": "list_resources", "database_path": db.db_path}, ) ) try: # Get all available resources result = [] # Debug log database connection logger.debug(f"Using database at: {db.db_path}") # Get Terraform providers logger.debug("Fetching Terraform providers...") terraform_providers = [dict(p) for p in get_terraform_providers(db)] logger.debug(f"Found {len(terraform_providers)} Terraform providers") for p in terraform_providers: result.append( types.Resource( uri=str( ResourceUrl(f"resources://terraform/providers/{p['name']}") ), name=f"Provider: {p['name']}", description=( f"Terraform Provider v{p['version']} with {p['resource_count']} resources. " f"Updated: {p['updated_at']}" ), mimeType="application/json", ) ) # Get Terraform resources logger.debug("Fetching Terraform resources...") terraform_resources = [dict(r) for r in get_terraform_resources(db)] logger.debug( f"Found {len(terraform_resources)} Terraform resources: {terraform_resources}" ) for r in terraform_resources: result.append( types.Resource( uri=str( ResourceUrl( f"resources://terraform/resources/{r['provider_name']}/{r['resource_type']}" ) ), name=r["name"], description=( f"Terraform Resource v{r['version']} from provider {r['provider_name']} " f"v{r['provider_version']} ({r.get('relationship_type', 'MANAGED')})" ), mimeType="application/json", ) ) # Get Ansible collections logger.debug("Fetching Ansible collections...") ansible_collections = [dict(c) for c in get_ansible_collections(db)] logger.debug(f"Found {len(ansible_collections)} Ansible collections") for c in ansible_collections: result.append( types.Resource( uri=str( ResourceUrl(f"resources://ansible/collections/{c['name']}") ), name=f"Collection: {c['name']}", description=( f"Ansible Collection v{c['version']} with {c['module_count']} modules. " f"Updated: {c['updated_at']}" ), mimeType="application/json", ) ) # Get Ansible modules logger.debug("Fetching Ansible resources...") ansible_modules = get_ansible_modules(db) logger.debug(f"Found {len(ansible_modules)} Ansible modules: {ansible_modules}") for m in ansible_modules: result.append( types.Resource( uri=str( ResourceUrl( f"resources://ansible/modules/{m['collection_name']}/{m['module_name']}" ) ), name=m["module_name"], description=f"Ansible Module v{m['module_version']} from collection {m['collection_name']} v{m['collection_version']}", mimeType="application/json", ) ) # Get entities and relationships logger.debug("Fetching entities...") entities = [dict(e) for e in get_entities(db)] logger.debug(f"Found {len(entities)} entities") for e in entities: result.append( types.Resource( uri=str(ResourceUrl(f"iac://entity/{e['id']}")), name=f"Entity: {e['name']}", description=( f"Type: {e['type']}, Relationships: {e['relationship_count']}, " f"Created: {e['created_at']}, Updated: {e['updated_at']}" ), mimeType="application/json", ) ) # Add concrete resources first concrete_resources = result.copy() result.clear() # Add template resources with raw (unencoded) URIs logger.debug("Adding resource templates...") for template in RESOURCE_TEMPLATES: # Create template URI without URL encoding the variables uri_str = template["uriTemplate"] try: # Create template resource using model_construct to bypass validation template_resource = types.Resource.model_construct( uri=uri_str, # Use raw string for template URIs name=template["name"], description=template["description"], mimeType=template["mimeType"], ) result.append(template_resource) logger.debug(f"Added template with URI: {uri_str}") except Exception as e: logger.error(f"Failed to add template {uri_str}: {str(e)}") # Log the full error for debugging logger.debug(f"Template error details: {str(e)}", exc_info=True) # Add concrete resources after templates result.extend(concrete_resources) logger.debug( f"Added {len(RESOURCE_TEMPLATES)} resource templates and {len(concrete_resources)} concrete resources" ) # Log all URIs for debugging logger.debug("All URIs in response:") for r in result: logger.debug(f" {str(r.uri)}") # Apply pagination total_resources = len(result) start_idx = (page - 1) * per_page end_idx = start_idx + per_page logger.info(f"Found {total_resources} total resources (including templates)") logger.info(f"Returning page {page} with {per_page} items per page") # Debug database state logger.debug("Database state:") logger.debug(f"Database path: {db.db_path}") logger.debug(f"Database initialized: {db.is_initialized()}") # Return paginated results return result[start_idx:end_idx] except DatabaseError as e: raise McpError( types.ErrorData( code=types.INTERNAL_ERROR, message="Failed to list resources", data={"error": str(e), "operation": "list_resources"}, ) ) async def handle_read_resource( uri: ResourceUrl, ctx: RequestContext = None ) -> str | bytes: """Read a specific resource's content by its URI.""" if ctx is None: ctx = TestContext() # Get database instance early to ensure correct context db = ctx.db if hasattr(ctx, "db") else get_db() # Use context's DB if available logger.info("Starting resource read operation") logger.info(f"Reading resource: {uri}") # Extract and validate resource URI uri_str = str(uri) scheme, path = uri_str.split("://", 1) # Support both legacy and hierarchical URI schemes if scheme not in ["terraform", "ansible", "iac", "resources"]: raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=f"Resource not found: Unsupported URI scheme '{scheme}'", data={ "uri": uri_str, "supported_schemes": ["terraform", "ansible", "iac", "resources"], "example_hierarchical": "resources://terraform/providers/aws", }, ) ) # Check if URI matches any templates first if scheme == "resources": for template in RESOURCE_TEMPLATES: variables = extract_template_variables(uri_str, template["uriTemplate"]) if variables: logger.debug( f"URI matches template {template['uriTemplate']} with variables {variables}" ) if "terraform/providers" in template["uriTemplate"]: provider_name = variables.get("provider_name") resources = get_provider_resources(db, provider_name) if resources: provider = resources[0] return ( f"Provider: {provider['provider_name']}\n" f"Version: {provider['provider_version']}\n" f"Resources: {len(resources)}\n" f"Documentation: {provider['doc_url']}\n" f"Relationships:\n" + "\n".join( f"- {r['resource_type']} ({r['relationship_type']})" for r in resources ) ) elif "terraform/resources" in template["uriTemplate"]: provider_name = variables.get("provider_name") resource_type = variables.get("resource_type") resources = [ r for r in get_terraform_resources(db) if r["provider_name"] == provider_name and r["resource_type"] == resource_type ] if resources: resource = resources[0] return ( f"Resource: {resource['name']}\n" f"Provider: {resource['provider_name']}\n" f"Type: {resource['resource_type']}\n" f"Version: {resource['version']}\n" f"Documentation: {resource['doc_url']}\n" f"Schema:\n{resource['schema']}" ) elif "/entities/" in template["uriTemplate"]: entity_id = variables.get("entity_id") if entity_id: if "relationships" in template["uriTemplate"]: relationships = get_entity_relationships(db, entity_id) return ( "\n".join( f"{r['source_id']} -> {r['target_id']}: {r['relationship_type']}" for r in relationships ) if relationships else "No relationships found" ) else: from .db.entities import get_entity_with_observation result = get_entity_with_observation(db, entity_id) if result: response = ( f"Entity: {result['name']}\n" f"Type: {result['type']}\n" f"Created: {result['created_at']}\n" f"Updated: {result['updated_at']}" ) if result.get("observation"): response += ( f"\nObservation: {result['observation']}" ) return response # Handle hierarchical URIs (new format) if scheme == "resources": parts = path.strip("/").split("/") if len(parts) < 2: raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message="Invalid resource path: Must have at least 2 components", data={ "uri": uri_str, "example": "resources://terraform/providers/aws", }, ) ) # Validate resource type resource_type = parts[0] # terraform, ansible, or entities if resource_type not in ["terraform", "ansible", "entities"]: raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=f"Invalid resource path: Unsupported resource type '{resource_type}'", data={ "uri": uri_str, "supported_types": ["terraform", "ansible", "entities"], "example": "resources://terraform/providers/aws", }, ) ) # Validate category category = parts[1] # providers, resources, collections, modules valid_categories = { "terraform": ["providers", "resources"], "ansible": ["collections", "modules"], "entities": ["relationships"], } if category not in valid_categories.get(resource_type, []): raise ValueError("Invalid resource path") # Validate name component name = parts[2] if len(parts) > 2 else None if not name: raise ValueError("Invalid resource path") # Map hierarchical URI to legacy format if resource_type == "terraform" and category == "providers": scheme = "terraform" path = f"provider/{name}" if name else "provider" elif resource_type == "terraform" and category == "resources": scheme = "terraform" path = "/".join(parts[2:]) if len(parts) > 2 else "" elif resource_type == "ansible" and category == "collections": scheme = "ansible" path = f"collection/{name}" if name else "collection" elif resource_type == "ansible" and category == "modules": scheme = "ansible" path = "/".join(parts[2:]) if len(parts) > 2 else "" elif resource_type == "entities": scheme = "iac" path = f"entity/{name}" if name else "entity" try: logger.info("Parsing resource URI") # Parse path based on scheme parts = path.strip("/").split("/") if scheme == "terraform": if len(parts) != 2: raise ValueError( "Invalid Terraform resource path. Expected format: provider/resource_type" ) provider_name = parts[0] resource_type = parts[1] # Debug logging to inspect database state all_resources = get_terraform_resources(db) logger.debug( f"All terraform resources in DB: {[dict(r) for r in all_resources]}" ) resources = get_provider_resources(db, provider_name) if not resources: raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=f"Resource not found: Provider '{provider_name}' does not exist", data={"uri": uri_str}, ) ) matching_resources = [ r for r in resources if r["resource_type"] == resource_type ] if not matching_resources: raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=f"Resource not found: Resource type '{resource_type}' not found for provider '{provider_name}'", data={"uri": uri_str}, ) ) resource = matching_resources[0] return ( f"Resource: {resource['name']}\n" f"Provider: {resource['provider_name']}\n" f"Type: {resource['resource_type']}\n" f"Version: {resource['version']}\n" f"Documentation: {resource['doc_url']}\n" f"Relationship: {resource.get('relationship_type', 'MANAGED')}\n" f"Schema:\n{resource['schema']}" ) elif scheme == "ansible": if len(parts) != 2: raise ValueError( "Invalid Ansible resource path. Expected format: collection_name/module_name" ) collection_name, module_name = parts # Get module info using the collection and module names from .db.ansible import get_module_by_name module = get_module_by_name(db, collection_name, module_name) if not module: raise ValueError( f"Module {module_name} not found in collection {collection_name}" ) return ( f"Module: {module['name']}\n" f"Collection: {module['collection_name']}\n" f"Type: {module['type']}\n" f"Version: {module['version']}\n" f"Documentation: {module['doc_url']}\n" f"Schema:\n{module['schema']}" ) elif scheme == "iac": if len(parts) != 2 or parts[0] != "entity": raise ValueError("Invalid IaC resource path") name = parts[1] logger.info(f"Resolved resource name: {name}") if not name: raise McpError( types.ErrorData( code=types.INVALID_REQUEST, message="Resource name cannot be empty", data={"uri": uri_str}, ) ) except Exception as e: error_msg = f"Failed to parse resource URI: {str(e)}" logger.error(error_msg, extra={"uri": uri_str}) # Determine if this is a "not found" error if "not found" in str(e).lower() or "no resources found" in str(e).lower(): raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=error_msg, data={ "uri": uri_str, "error": str(e), "expected_format": "[terraform|ansible|iac]://<path>", }, ) ) else: raise McpError( types.ErrorData( code=types.INVALID_REQUEST, message=error_msg, data={ "uri": uri_str, "error": str(e), "expected_format": "[terraform|ansible|iac]://<path>", }, ) ) if not db.is_initialized(): logger.info("Database not initialized, resetting") if os.environ.get("MCP_TEST_MODE"): logger.info("Test mode detected - resetting database to clean state") db.reset_database() try: # Handle Terraform resources if scheme == "terraform": if len(parts) < 2: raise ValueError("Invalid Terraform resource path") if parts[0] == "provider": resources = get_provider_resources(db, parts[1]) if resources: provider = resources[0] # All resources have provider info return ( f"Provider: {provider['provider_name']}\n" f"Version: {provider['provider_version']}\n" f"Resources: {len(resources)}\n" f"Documentation: {provider['doc_url']}\n" f"Relationships:\n" + "\n".join( f"- {r['resource_type']} ({r['relationship_type']})" for r in resources ) ) elif parts[0] == "resource": resource = get_resource_info(db, parts[1]) if resource: return ( f"Resource: {resource['name']}\n" f"Provider: {resource['provider_name']}\n" f"Type: {resource['resource_type']}\n" f"Version: {resource['version']}\n" f"Documentation: {resource['doc_url']}\n" f"Relationship: {resource['relationship_type']}\n" f"Schema:\n{resource['schema']}" ) elif scheme == "ansible": from .db.ansible import get_collection_modules, get_module_info if parts[0] == "collection": modules = get_collection_modules(db, parts[1]) if modules: collection = modules[0] # All modules have collection info return ( f"Collection: {collection['collection_name']}\n" f"Version: {collection['collection_version']}\n" f"Modules: {len(modules)}\n" f"Documentation: {collection['doc_url']}" ) else: # module module = get_module_info(db, parts[1]) if module: return ( f"Module: {module['name']}\n" f"Collection: {module['collection_name']}\n" f"Type: {module['type']}\n" f"Version: {module['version']}\n" f"Documentation: {module['doc_url']}\n" f"Schema:\n{module['schema']}" ) elif scheme == "iac": from .db.entities import get_entity_with_observation result = get_entity_with_observation(db, parts[1]) if result: response = ( f"Entity: {result['name']}\n" f"Type: {result['type']}\n" f"Created: {result['created_at']}\n" f"Updated: {result['updated_at']}" ) if result.get("observation"): response += f"\nObservation: {result['observation']}" return response # If we get here, resource wasn't found logger.info(f"Resource not found: {uri_str}") raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=f"Resource not found: {uri_str}", data={"uri": uri_str}, ) ) except McpError: raise except Exception as e: if "not found" in str(e).lower() or "no such" in str(e).lower(): raise McpError( types.ErrorData( code=types.METHOD_NOT_FOUND, message=f"Resource not found: {name}", data={ "uri": uri_str, "resource_name": name, }, ) ) else: raise McpError( types.ErrorData( code=types.INTERNAL_ERROR, message="Failed to read resource", data={ "uri": uri_str, "error": str(e), "operation": "read_resource", "resource_name": name, }, ) ) def register_resources(server: Server, db: Any) -> None: """Register all resource handlers with the server.""" @server.list_resources() async def list_resources(ctx: RequestContext = None): return await handle_list_resources(ctx) @server.read_resource() async def read_resource(uri: ResourceUrl, ctx: RequestContext = None): return await handle_read_resource(uri, ctx) @server.list_resource_templates() async def list_resource_templates(): """List available resource templates.""" return [types.ResourceTemplate(**template) for template in RESOURCE_TEMPLATES]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AgentWong/iac-memory-mcp-server-project'

If you have feedback or need assistance with the MCP directory API, please join our Discord server