Azure MCP Server

by mashriram
Verified
# azure_server.py import os import json import logging from datetime import datetime from typing import Any, Sequence from functools import lru_cache import base64 import io import asyncio from dotenv import load_dotenv import mcp.server.stdio from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions from mcp.types import Resource, Tool, TextContent, ImageContent, EmbeddedResource from pydantic import AnyUrl from azure.identity import DefaultAzureCredential from azure.storage.blob import BlobServiceClient from azure.appconfiguration import AzureAppConfigurationClient from azure.cosmos import CosmosClient from mcp_server_azure.azure_tools import get_azure_tools from mcp_server_azure.azure_utils import ( get_cosmosdb_type, ) # Assuming you choose Cosmos DB load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger("azure-mcp-server") def custom_json_serializer(obj): if isinstance(obj, datetime): return obj.isoformat() raise TypeError(f"Object of type {type(obj)} is not JSON serializable") class AzureResourceManager: def __init__(self): logger.info("Initializing AzureResourceManager") self.audit_entries: list[dict] = [] self.credential = ( DefaultAzureCredential() ) # Use DefaultAzureCredential for simpler auth @lru_cache(maxsize=None) def get_blob_service_client( self, account_url: str | None = None ) -> BlobServiceClient: """Get an Azure Blob Service client.""" try: logger.info(f"Creating BlobServiceClient for account: {account_url}") account_url = account_url or os.getenv("AZURE_STORAGE_ACCOUNT_URL") if not account_url: raise ValueError( "Azure Storage Account URL is not specified and not set in the environment." ) return BlobServiceClient( account_url=account_url, credential=self.credential ) except Exception as e: logger.error(f"Failed to create BlobServiceClient: {e}") raise RuntimeError(f"Failed to create BlobServiceClient: {e}") @lru_cache(maxsize=None) def get_cosmos_client( self, endpoint: str | None = None, key: str | None = None, url: str | None = None, ) -> CosmosClient: """Get an Azure Cosmos DB client.""" try: logger.info(f"Creating CosmosClient for endpoint: {endpoint}") url = url or os.getenv("AZURE_COSMOSDB_URL") endpoint = endpoint or os.getenv("AZURE_COSMOSDB_ENDPOINT") key = key or os.getenv("AZURE_COSMOSDB_KEY") url = str(url) if not endpoint or not key: raise ValueError( "Azure Cosmos DB Endpoint or Key is not specified and not set in the environment." ) return CosmosClient(endpoint=endpoint, credential=key, url=url) except Exception as e: logger.error(f"Failed to create CosmosClient: {e}") raise RuntimeError(f"Failed to create CosmosClient: {e}") @lru_cache(maxsize=None) def get_app_configuration_client( self, endpoint: str | None = None ) -> AzureAppConfigurationClient: """Get an Azure App Configuration client using AzureCliCredential.""" try: logger.info(f"Creating AzureAppConfigurationClient") endpoint = endpoint or os.getenv("AZURE_APP_CONFIGURATION_ENDPOINT") if not endpoint: raise ValueError( "Azure App Configuration endpoint is not specified and not set in the environment." ) return AzureAppConfigurationClient(base_url=endpoint, credential=self.credential) except Exception as e: logger.error(f"Failed to create AzureAppConfigurationClient: {e}") raise RuntimeError(f"Failed to create AzureAppConfigurationClient: {e}") def _synthesize_audit_log(self) -> str: """Generate formatted audit log from entries""" logger.debug("Synthesizing audit log") if not self.audit_entries: return "No Azure operations have been performed yet." report = "📋 Azure Operations Audit Log 📋\n\n" for entry in self.audit_entries: report += f"[{entry['timestamp']}]\n" report += f"Service: {entry['service']}\n" report += f"Operation: {entry['operation']}\n" report += f"Parameters: {json.dumps( entry['parameters'], indent=2)}\n" report += "-" * 50 + "\n" return report def log_operation(self, service: str, operation: str, parameters: dict) -> None: """Log an Azure operation to the audit log""" logger.info(f"Logging operation - Service: {service}, Operation: {operation}") audit_entry = { "timestamp": datetime.utcnow().isoformat(), "service": service, "operation": operation, "parameters": parameters, } self.audit_entries.append(audit_entry) async def main(): logger.info("Starting Azure MCP Server") azure_rm = AzureResourceManager() # Rename AWSManager to AzureResourceManager server = Server("azure-mcp-server") # Rename server name logger.debug("Registering handlers") @server.list_resources() async def handle_list_resources() -> list[Resource]: logger.debug("Handling list_resources request") return [ Resource( uri=AnyUrl("audit://azure-operations"), # Update URI scheme and path name="Azure Operations Audit Log", # Update name description="A log of all Azure operations performed through this server", # Update description mimeType="text/plain", ) ] @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: logger.debug(f"Handling read_resource request for URI: {uri}") if uri.scheme != "audit": logger.error(f"Unsupported URI scheme: {uri.scheme}") raise ValueError(f"Unsupported URI scheme: {uri.scheme}") path = str(uri).replace("audit://", "") if path != "azure-operations": # Update resource path logger.error(f"Unknown resource path: {path}") raise ValueError(f"Unknown resource path: {path}") return azure_rm._synthesize_audit_log() # Use AzureResourceManager @server.list_tools() async def list_tools() -> list[Tool]: """List available Azure tools""" logger.debug("Handling list_tools request") return get_azure_tools() # Use get_azure_tools async def handle_blob_storage_operations( azure_rm: AzureResourceManager, name: str, arguments: dict ) -> list[TextContent]: """Handle Azure Blob Storage operations""" blob_service_client = azure_rm.get_blob_service_client() response = None if name == "blob_container_create": container_client = blob_service_client.create_container( arguments["container_name"] ) response = { "container_name": container_client.container_name, "created": True, } # Simplify response elif name == "blob_container_list": containers = blob_service_client.list_containers() container_names = [container.name for container in containers] response = {"container_names": container_names} elif name == "blob_container_delete": blob_service_client.delete_container(arguments["container_name"]) response = {"container_name": arguments["container_name"], "deleted": True} elif name == "blob_upload": blob_client = blob_service_client.get_blob_client( container=arguments["container_name"], blob=arguments["blob_name"] ) decoded_content = base64.b64decode(arguments["file_content"]) blob_client.upload_blob(decoded_content, overwrite=True) response = {"blob_name": arguments["blob_name"], "uploaded": True} elif name == "blob_delete": blob_client = blob_service_client.get_blob_client( container=arguments["container_name"], blob=arguments["blob_name"] ) blob_client.delete_blob() response = {"blob_name": arguments["blob_name"], "deleted": True} elif name == "blob_list": container_client = blob_service_client.get_container_client( arguments["container_name"] ) blob_list = container_client.list_blobs() blob_names = [blob.name for blob in blob_list] response = {"blob_names": blob_names} elif name == "blob_read": blob_client = blob_service_client.get_blob_client( container=arguments["container_name"], blob=arguments["blob_name"] ) downloader = blob_client.download_blob() content = downloader.readall().decode("utf-8") return [TextContent(type="text", text=content)] else: raise ValueError(f"Unknown Blob Storage operation: {name}") azure_rm.log_operation( "blob_storage", name.replace("blob_", ""), arguments ) # Update service name in log return [ TextContent( type="text", text=f"Operation Result:\n{json.dumps(response, indent=2, default=custom_json_serializer)}", ) ] async def handle_cosmosdb_operations( azure_rm: AzureResourceManager, name: str, arguments: dict ) -> list[TextContent]: """Handle Azure Cosmos DB operations (NoSQL API)""" cosmos_client = azure_rm.get_cosmos_client() logger.log(cosmos_client) database = cosmos_client.get_database_client( arguments.get("database_name", "SampleDB") ) # Assuming a default db response = None if name == "cosmosdb_container_create": # Renamed from table to container container = database.create_container( id=arguments["container_name"], partition_key=arguments["partition_key"] ) response = {"container_id": container.id, "created": True} elif name == "cosmosdb_container_describe": # Renamed from table to container container = database.get_container_client(arguments["container_name"]) container_properties = container.read() response = container_properties elif name == "cosmosdb_container_list": # Renamed from table to container containers = list(database.list_containers()) container_names = [c["id"] for c in containers] response = {"container_names": container_names} elif name == "cosmosdb_container_delete": # Renamed from table to container database.delete_container(arguments["container_name"]) response = {"container_name": arguments["container_name"], "deleted": True} elif ( name == "cosmosdb_item_create" ): # Renamed from put to create, and table to container container_client = database.get_container_client( arguments["container_name"] ) item = container_client.create_item(body=arguments["item"]) response = {"item_id": item["id"], "created": True} elif ( name == "cosmosdb_item_read" ): # Renamed from get to read, and table to container container_client = database.get_container_client( arguments["container_name"] ) item = container_client.read_item( item=arguments["item_id"], partition_key=arguments["partition_key"] ) response = item elif ( name == "cosmosdb_item_replace" ): # Renamed from update to replace, and table to container, using replace_item for full replace container_client = database.get_container_client( arguments["container_name"] ) item = container_client.replace_item( item=arguments["item_id"], body=arguments["item"] ) response = {"item_id": item["id"], "replaced": True} elif name == "cosmosdb_item_delete": # Renamed table to container container_client = database.get_container_client( arguments["container_name"] ) container_client.delete_item( item=arguments["item_id"], partition_key=arguments["partition_key"] ) response = {"item_id": arguments["item_id"], "deleted": True} elif ( name == "cosmosdb_item_query" ): # Renamed table to container, simplified query container_client = database.get_container_client( arguments["container_name"] ) items = list( container_client.query_items( query=arguments["query"], parameters=arguments.get("parameters", []), # Optional parameters ) ) response = {"items": items} else: raise ValueError(f"Unknown Cosmos DB operation: {name}") azure_rm.log_operation( "cosmosdb", name.replace("cosmosdb_", ""), arguments ) # Update service name in log return [ TextContent( type="text", text=f"Operation Result:\n{json.dumps(response, indent=2, default=custom_json_serializer)}", ) ] async def handle_app_configuration_operations( azure_rm: AzureResourceManager, name: str, arguments: dict ) -> list[TextContent]: """Handle Azure App Configuration operations""" app_config_client = azure_rm.get_app_configuration_client() response = None if name == "app_configuration_kv_read": # Get key and label from arguments, both optional key = arguments.get("key", None) label = arguments.get("label", None) # List key-values with optional filtering by key and label if key: settings = list(app_config_client.list_configuration_settings( key_filter=key, label_filter=label )) else: settings = list(app_config_client.list_configuration_settings( label_filter=label )) # Format results for display result = [] for setting in settings: result.append({ "key": setting.key, "value": setting.value, "content_type": setting.content_type, "label": setting.label, "last_modified": setting.last_modified.isoformat() if setting.last_modified else None, "read_only": setting.read_only }) response = {"settings": result} elif name == "app_configuration_kv_write": from azure.appconfiguration import ConfigurationSetting # Create a configuration setting setting = ConfigurationSetting( key=arguments["key"], value=arguments["value"], label=arguments.get("label", None), content_type=arguments.get("content_type", None) ) # Set or update the configuration setting result = app_config_client.set_configuration_setting(setting) # Format result for display response = { "key": result.key, "value": result.value, "content_type": result.content_type, "label": result.label, "etag": result.etag, "last_modified": result.last_modified.isoformat() if result.last_modified else None } elif name == "app_configuration_kv_delete": # Delete a key-value result = app_config_client.delete_configuration_setting( key=arguments["key"], label=arguments.get("label", None) ) if result: response = { "key": result.key, "label": result.label, "deleted": True } else: response = { "key": arguments["key"], "label": arguments.get("label", None), "deleted": False, "message": "Key not found" } else: raise ValueError(f"Unknown App Configuration operation: {name}") azure_rm.log_operation("app_configuration", name.replace("app_configuration_", ""), arguments) return [ TextContent( type="text", text=f"Operation Result:\n{json.dumps(response, indent=2, default=custom_json_serializer)}", ) ] @server.call_tool() async def call_tool( name: str, arguments: Any ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Handle Azure tool operations""" logger.info(f"Handling tool call: {name}") logger.debug(f"Tool arguments: {arguments}") if not isinstance(arguments, dict): logger.error("Invalid arguments: not a dictionary") raise ValueError("Invalid arguments") try: if name.startswith("blob_"): # Updated prefix to blob_ return await handle_blob_storage_operations( azure_rm, name, arguments ) # Use blob handler elif name.startswith("cosmosdb_"): # Updated prefix to cosmosdb_ return await handle_cosmosdb_operations( azure_rm, name, arguments ) # Use cosmosdb handler elif name.startswith("app_configuration_"): return await handle_app_configuration_operations( azure_rm, name, arguments ) # Use app configuration handler else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Operation failed: {str(e)}") raise RuntimeError(f"Operation failed: {str(e)}") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("Server running with stdio transport") await server.run( read_stream, write_stream, InitializationOptions( server_name="mcp-server-azure", # Updated server name server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())