blob_container_list
List all Blob Storage containers in Azure using the Model Context Protocol, enabling automated logging and audit tracking for enhanced operational transparency.
Instructions
List all Blob Storage containers
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- mcp_server_azure/azure_server.py:192-195 (handler)Executes the blob_container_list tool by listing all containers in the blob storage account using BlobServiceClient and extracting their names into a response dictionary.elif name == "blob_container_list": containers = blob_service_client.list_containers() container_names = [container.name for container in containers] response = {"container_names": container_names}
- Defines the input schema for the blob_container_list tool, specifying no required input properties.Tool( name="blob_container_list", description="List all Blob Storage containers", inputSchema={"type": "object", "properties": {}}, ),
- mcp_server_azure/azure_server.py:171-176 (registration)Registers the blob_container_list tool (among others) by returning the list from get_azure_tools() in response to list_tools requests.@server.list_tools() async def list_tools() -> list[Tool]: """List available Azure tools""" logger.debug("Handling list_tools request") return get_azure_tools() # Use get_azure_tools
- mcp_server_azure/azure_server.py:177-238 (handler)The dispatch handler function for all blob storage tools, including the specific logic for blob_container_list.async def handle_blob_storage_operations( azure_rm: AzureResourceManager, name: str, arguments: dict ) -> list[TextContent]: """Handle Azure Blob Storage operations""" blob_service_client = azure_rm.get_blob_service_client() response = None if name == "blob_container_create": container_client = blob_service_client.create_container( arguments["container_name"] ) response = { "container_name": container_client.container_name, "created": True, } # Simplify response elif name == "blob_container_list": containers = blob_service_client.list_containers() container_names = [container.name for container in containers] response = {"container_names": container_names} elif name == "blob_container_delete": blob_service_client.delete_container(arguments["container_name"]) response = {"container_name": arguments["container_name"], "deleted": True} elif name == "blob_upload": blob_client = blob_service_client.get_blob_client( container=arguments["container_name"], blob=arguments["blob_name"] ) decoded_content = base64.b64decode(arguments["file_content"]) blob_client.upload_blob(decoded_content, overwrite=True) response = {"blob_name": arguments["blob_name"], "uploaded": True} elif name == "blob_delete": blob_client = blob_service_client.get_blob_client( container=arguments["container_name"], blob=arguments["blob_name"] ) blob_client.delete_blob() response = {"blob_name": arguments["blob_name"], "deleted": True} elif name == "blob_list": container_client = blob_service_client.get_container_client( arguments["container_name"] ) blob_list = container_client.list_blobs() blob_names = [blob.name for blob in blob_list] response = {"blob_names": blob_names} elif name == "blob_read": blob_client = blob_service_client.get_blob_client( container=arguments["container_name"], blob=arguments["blob_name"] ) downloader = blob_client.download_blob() content = downloader.readall().decode("utf-8") return [TextContent(type="text", text=content)] else: raise ValueError(f"Unknown Blob Storage operation: {name}") azure_rm.log_operation( "blob_storage", name.replace("blob_", ""), arguments ) # Update service name in log return [ TextContent( type="text", text=f"Operation Result:\n{json.dumps(response, indent=2, default=custom_json_serializer)}", ) ]
- mcp_server_azure/azure_server.py:432-436 (handler)Dispatches blob_container_list (and other blob_ tools) to the specific blob storage handler from the main call_tool function.if name.startswith("blob_"): # Updated prefix to blob_ return await handle_blob_storage_operations( azure_rm, name, arguments ) # Use blob handler elif name.startswith("cosmosdb_"): # Updated prefix to cosmosdb_