"""Mender MCP Server - Main server implementation."""
import asyncio
import os
import sys
from typing import Any, Dict, List, Optional
import click
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, TextContent, Tool
from pydantic import AnyUrl
from .mender_api import MenderAPIClient, MenderAPIError
from .security import (
SecurityLogger, ErrorSanitizer, validate_input,
DeviceIdInput, DeploymentIdInput, ReleaseNameInput,
LimitInput, StatusInput, DeviceTypeInput
)
class MenderMCPServer:
"""Mender MCP Server implementation."""
def __init__(self, server_url: str, access_token: str):
"""Initialize the server with Mender API client."""
self.mender_client = MenderAPIClient(server_url, access_token)
self.server = Server("mender")
# Initialize security logger for server operations
self.security_logger = SecurityLogger("mender_mcp_server")
# Log server initialization (token will be masked by MenderAPIClient)
self.security_logger.log_secure(
20, # INFO level
f"Initializing Mender MCP Server for {server_url}"
)
self._setup_handlers()
def _setup_handlers(self) -> None:
"""Set up MCP server handlers."""
@self.server.list_resources()
async def list_resources() -> List[Resource]:
"""List available resources."""
return [
Resource(
uri=AnyUrl("mender://devices"),
name="Devices",
description="List of all Mender devices",
mimeType="application/json",
),
Resource(
uri=AnyUrl("mender://deployments"),
name="Deployments",
description="List of all Mender deployments",
mimeType="application/json",
),
Resource(
uri=AnyUrl("mender://artifacts"),
name="Artifacts",
description="List of all Mender artifacts",
mimeType="application/json",
),
Resource(
uri=AnyUrl("mender://releases"),
name="Releases",
description="List of all Mender releases",
mimeType="application/json",
),
Resource(
uri=AnyUrl("mender://inventory"),
name="Device Inventory",
description="Complete device inventory with all attributes",
mimeType="application/json",
),
Resource(
uri=AnyUrl("mender://inventory-groups"),
name="Inventory Groups",
description="Device grouping information",
mimeType="application/json",
),
Resource(
uri=AnyUrl("mender://audit-logs"),
name="Audit Logs",
description="System audit logs for user actions and system changes",
mimeType="application/json",
),
]
@self.server.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""Read a specific resource."""
uri_str = str(uri)
try:
if uri_str == "mender://devices":
devices = self.mender_client.get_devices()
return self._format_devices_output(devices)
elif uri_str == "mender://deployments":
deployments = self.mender_client.get_deployments()
return self._format_deployments_output(deployments)
elif uri_str == "mender://artifacts":
artifacts = self.mender_client.get_artifacts()
return self._format_artifacts_output(artifacts)
elif uri_str == "mender://releases":
releases = self.mender_client.get_releases()
return self._format_releases_output(releases)
elif uri_str.startswith("mender://devices/"):
device_id = uri_str.split("/")[-1]
device = self.mender_client.get_device(device_id)
return self._format_device_output(device)
elif uri_str.startswith("mender://deployments/"):
deployment_id = uri_str.split("/")[-1]
deployment = self.mender_client.get_deployment(deployment_id)
return self._format_deployment_output(deployment)
elif uri_str.startswith("mender://releases/"):
release_name = uri_str.split("/")[-1]
release = self.mender_client.get_release(release_name)
return self._format_release_output(release)
elif uri_str == "mender://inventory":
inventories = self.mender_client.get_devices_inventory()
return self._format_inventories_output(inventories)
elif uri_str.startswith("mender://inventory/"):
device_id = uri_str.split("/")[-1]
inventory = self.mender_client.get_device_inventory(device_id)
return self._format_device_inventory_output(inventory)
elif uri_str == "mender://inventory-groups":
groups = self.mender_client.get_inventory_groups()
return self._format_inventory_groups_output(groups)
elif uri_str == "mender://audit-logs":
audit_log = self.mender_client.get_audit_logs(limit=100)
return self._format_audit_log_output(audit_log)
else:
raise ValueError(f"Unknown resource: {uri}")
except MenderAPIError as e:
return f"Error accessing Mender API: {e.message}"
except Exception as e:
return f"Unexpected error: {str(e)}"
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="get_device_status",
description="Get the current status of a specific device",
inputSchema={
"type": "object",
"properties": {
"device_id": {
"type": "string",
"description": "The ID of the device to check"
}
},
"required": ["device_id"]
}
),
Tool(
name="list_devices",
description="List devices with optional filtering",
inputSchema={
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "Filter by device status (accepted, rejected, pending, etc.)",
"enum": ["accepted", "rejected", "pending", "noauth"]
},
"device_type": {
"type": "string",
"description": "Filter by device type"
},
"limit": {
"type": "integer",
"description": "Maximum number of devices to return",
"minimum": 1,
"maximum": 500,
"default": 20
}
}
}
),
Tool(
name="get_deployment_status",
description="Get the status and details of a specific deployment",
inputSchema={
"type": "object",
"properties": {
"deployment_id": {
"type": "string",
"description": "The ID of the deployment to check"
}
},
"required": ["deployment_id"]
}
),
Tool(
name="list_deployments",
description="List deployments with optional filtering",
inputSchema={
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "Filter by deployment status",
"enum": ["inprogress", "finished", "pending"]
},
"limit": {
"type": "integer",
"description": "Maximum number of deployments to return",
"minimum": 1,
"maximum": 100,
"default": 10
}
}
}
),
Tool(
name="list_releases",
description="List releases with optional filtering",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Filter by release name"
},
"tag": {
"type": "string",
"description": "Filter by release tag"
},
"limit": {
"type": "integer",
"description": "Maximum number of releases to return",
"minimum": 1,
"maximum": 100,
"default": 20
}
}
}
),
Tool(
name="get_release_status",
description="Get the details of a specific release",
inputSchema={
"type": "object",
"properties": {
"release_name": {
"type": "string",
"description": "The name of the release to check"
}
},
"required": ["release_name"]
}
),
Tool(
name="get_device_inventory",
description="Get complete inventory attributes for a specific device",
inputSchema={
"type": "object",
"properties": {
"device_id": {
"type": "string",
"description": "The ID of the device to get inventory for"
}
},
"required": ["device_id"]
}
),
Tool(
name="list_device_inventory",
description="List device inventories with optional filtering",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of device inventories to return",
"minimum": 1,
"maximum": 500,
"default": 20
},
"has_attribute": {
"type": "string",
"description": "Filter devices that have a specific attribute name"
}
}
}
),
Tool(
name="get_inventory_groups",
description="Get all device inventory groups",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_deployment_device_log",
description="Get deployment logs for a specific device in a deployment",
inputSchema={
"type": "object",
"properties": {
"deployment_id": {
"type": "string",
"description": "The deployment ID"
},
"device_id": {
"type": "string",
"description": "The device ID"
}
},
"required": ["deployment_id", "device_id"]
}
),
Tool(
name="get_deployment_logs",
description="Get deployment logs for all devices in a deployment",
inputSchema={
"type": "object",
"properties": {
"deployment_id": {
"type": "string",
"description": "The deployment ID"
}
},
"required": ["deployment_id"]
}
),
Tool(
name="get_audit_logs",
description="Get Mender audit logs with optional filtering",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of audit log entries to return",
"minimum": 1,
"maximum": 1000,
"default": 50
},
"user": {
"type": "string",
"description": "Filter by user ID or username"
},
"action": {
"type": "string",
"description": "Filter by action type (e.g., 'login', 'deploy', 'device_accept')"
},
"object_type": {
"type": "string",
"description": "Filter by object type (e.g., 'device', 'deployment', 'user')"
},
"start_date": {
"type": "string",
"description": "Filter from date (ISO format, e.g., '2023-08-27T00:00:00Z')"
},
"end_date": {
"type": "string",
"description": "Filter to date (ISO format, e.g., '2023-08-27T23:59:59Z')"
}
}
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls with comprehensive input validation."""
try:
# Log tool call with sanitized arguments
safe_args = {k: v for k, v in arguments.items() if not k.lower().endswith('token')}
self.security_logger.log_secure(
20, # INFO level
f"Tool call: {name} with arguments: {safe_args}"
)
if name == "get_device_status":
# Validate input parameters
validated = validate_input(DeviceIdInput, arguments)
device_id = validated["device_id"]
device = self.mender_client.get_device(device_id)
result = self._format_device_output(device)
elif name == "list_devices":
# Validate input parameters
validated_status = validate_input(StatusInput, {"status": arguments.get("status")}) if arguments.get("status") else {}
validated_device_type = validate_input(DeviceTypeInput, {"device_type": arguments.get("device_type")}) if arguments.get("device_type") else {}
validated_limit = validate_input(LimitInput, {"limit": arguments.get("limit", 20)})
status = validated_status.get("status")
device_type = validated_device_type.get("device_type")
limit = validated_limit.get("limit", 20)
devices = self.mender_client.get_devices(
status=status,
device_type=device_type,
limit=limit
)
result = self._format_devices_output(devices)
elif name == "get_deployment_status":
# Validate input parameters
validated = validate_input(DeploymentIdInput, arguments)
deployment_id = validated["deployment_id"]
deployment = self.mender_client.get_deployment(deployment_id)
result = self._format_deployment_output(deployment)
elif name == "list_deployments":
# Validate input parameters
validated_status = validate_input(StatusInput, {"status": arguments.get("status")}) if arguments.get("status") else {}
validated_limit = validate_input(LimitInput, {"limit": arguments.get("limit", 10)})
status = validated_status.get("status")
limit = validated_limit.get("limit", 10)
deployments = self.mender_client.get_deployments(
status=status,
limit=limit
)
result = self._format_deployments_output(deployments)
elif name == "list_releases":
# Validate input parameters
validated_limit = validate_input(LimitInput, {"limit": arguments.get("limit", 20)})
limit = validated_limit.get("limit", 20)
# Note: name and tag are optional filters, validation is less strict
name_filter = arguments.get("name")
tag = arguments.get("tag")
# Basic sanitization for name and tag filters
if name_filter and ('..' in name_filter or '/' in name_filter):
raise ValueError("Invalid name filter format")
if tag and ('..' in tag or '/' in tag):
raise ValueError("Invalid tag filter format")
releases = self.mender_client.get_releases(
name=name_filter,
tag=tag,
limit=limit
)
result = self._format_releases_output(releases)
elif name == "get_release_status":
# Validate input parameters
validated = validate_input(ReleaseNameInput, arguments)
release_name = validated["release_name"]
release = self.mender_client.get_release(release_name)
result = self._format_release_output(release)
elif name == "get_device_inventory":
# Validate input parameters
validated = validate_input(DeviceIdInput, arguments)
device_id = validated["device_id"]
inventory = self.mender_client.get_device_inventory(device_id)
result = self._format_device_inventory_output(inventory)
elif name == "list_device_inventory":
# Validate input parameters
validated_limit = validate_input(LimitInput, {"limit": arguments.get("limit", 20)})
limit = validated_limit.get("limit", 20)
# has_attribute is optional, basic sanitization
has_attribute = arguments.get("has_attribute")
if has_attribute and ('..' in has_attribute or '/' in has_attribute):
raise ValueError("Invalid has_attribute filter format")
inventories = self.mender_client.get_devices_inventory(
limit=limit,
has_attribute=has_attribute
)
result = self._format_inventories_output(inventories)
elif name == "get_inventory_groups":
# No input parameters to validate
groups = self.mender_client.get_inventory_groups()
result = self._format_inventory_groups_output(groups)
elif name == "get_deployment_device_log":
# Validate input parameters
deployment_validated = validate_input(DeploymentIdInput, {"deployment_id": arguments["deployment_id"]})
device_validated = validate_input(DeviceIdInput, {"device_id": arguments["device_id"]})
deployment_id = deployment_validated["deployment_id"]
device_id = device_validated["device_id"]
log = self.mender_client.get_deployment_device_log(deployment_id, device_id)
result = self._format_deployment_log_output(log)
elif name == "get_deployment_logs":
# Validate input parameters
validated = validate_input(DeploymentIdInput, arguments)
deployment_id = validated["deployment_id"]
logs = self.mender_client.get_deployment_logs(deployment_id)
result = self._format_deployment_logs_output(logs)
elif name == "get_audit_logs":
# Validate input parameters with comprehensive filtering support
from datetime import datetime
# Validate limit parameter
limit = arguments.get("limit", 50)
if limit and (not isinstance(limit, int) or limit < 1 or limit > 1000):
raise ValueError("Limit must be an integer between 1 and 1000")
# Validate and parse date filters
start_date = None
end_date = None
if arguments.get("start_date"):
try:
start_date = datetime.fromisoformat(arguments["start_date"].replace('Z', '+00:00'))
except ValueError:
raise ValueError("start_date must be in ISO format (e.g., '2023-08-27T00:00:00Z')")
if arguments.get("end_date"):
try:
end_date = datetime.fromisoformat(arguments["end_date"].replace('Z', '+00:00'))
except ValueError:
raise ValueError("end_date must be in ISO format (e.g., '2023-08-27T23:59:59Z')")
# Validate and sanitize string filters
user = arguments.get("user")
action = arguments.get("action")
object_type = arguments.get("object_type")
# Basic sanitization for string parameters
for param_name, param_value in [("user", user), ("action", action), ("object_type", object_type)]:
if param_value and ('..' in param_value or '/' in param_value or '<' in param_value or '>' in param_value):
raise ValueError(f"Invalid {param_name} filter format")
# Get audit logs with all filters
audit_log = self.mender_client.get_audit_logs(
limit=limit,
start_date=start_date,
end_date=end_date,
user=user,
action=action,
object_type=object_type
)
result = self._format_audit_log_output(audit_log)
else:
result = f"Unknown tool: {name}"
return [TextContent(type="text", text=result)]
except ValueError as e:
# Input validation errors
error_msg = f"Input validation error: {str(e)}"
self.security_logger.log_secure(
40, # ERROR level
f"Input validation failed for tool {name}: {str(e)}"
)
return [TextContent(type="text", text=error_msg)]
except MenderAPIError as e:
# Mender API errors (already sanitized by ErrorSanitizer)
error_msg = f"Mender API Error: {e.message}"
if e.status_code:
error_msg += f" (HTTP {e.status_code})"
return [TextContent(type="text", text=error_msg)]
except Exception as e:
# Unexpected errors - sanitize before exposing
sanitized_error = SecurityLogger.sanitize_message(str(e))
error_msg = f"Unexpected error: {sanitized_error}"
self.security_logger.log_secure(
50, # CRITICAL level
f"Unexpected error in tool {name}: {str(e)}"
)
return [TextContent(type="text", text=error_msg)]
def _format_device_output(self, device) -> str:
"""Format device information for output."""
output = f"Device ID: {device.id}\n"
output += f"Status: {device.status}\n"
if device.device_type:
output += f"Device Type: {device.device_type}\n"
if device.created_ts:
output += f"Created: {device.created_ts}\n"
if device.updated_ts:
output += f"Last Updated: {device.updated_ts}\n"
output += f"Decommissioning: {device.decommissioning}\n"
if device.attributes:
output += "Attributes:\n"
for attr in device.attributes:
output += f" - {attr.get('name', 'N/A')}: {attr.get('value', 'N/A')}\n"
return output
def _format_devices_output(self, devices) -> str:
"""Format devices list for output."""
if not devices:
return "No devices found."
output = f"Found {len(devices)} device(s):\n\n"
for device in devices:
output += f"• {device.id}\n"
output += f" Status: {device.status}\n"
if device.device_type:
output += f" Type: {device.device_type}\n"
if device.updated_ts:
output += f" Last Updated: {device.updated_ts}\n"
output += "\n"
return output
def _format_deployment_output(self, deployment) -> str:
"""Format deployment information for output."""
output = f"Deployment ID: {deployment.id}\n"
output += f"Name: {deployment.name}\n"
output += f"Artifact: {deployment.artifact_name}\n"
output += f"Status: {deployment.status}\n"
if deployment.created:
output += f"Created: {deployment.created}\n"
if deployment.finished:
output += f"Finished: {deployment.finished}\n"
if deployment.device_count:
output += f"Device Count: {deployment.device_count}\n"
if deployment.statistics:
output += "Statistics:\n"
for key, value in deployment.statistics.items():
output += f" {key}: {value}\n"
return output
def _format_deployments_output(self, deployments) -> str:
"""Format deployments list for output."""
if not deployments:
return "No deployments found."
output = f"Found {len(deployments)} deployment(s):\n\n"
for deployment in deployments:
output += f"• {deployment.name} (ID: {deployment.id})\n"
output += f" Status: {deployment.status}\n"
output += f" Artifact: {deployment.artifact_name}\n"
if deployment.created:
output += f" Created: {deployment.created}\n"
output += "\n"
return output
def _format_artifacts_output(self, artifacts) -> str:
"""Format artifacts list for output."""
if not artifacts:
return "No artifacts found."
output = f"Found {len(artifacts)} artifact(s):\n\n"
for artifact in artifacts:
output += f"• {artifact.name} (ID: {artifact.id})\n"
if artifact.description:
output += f" Description: {artifact.description}\n"
if artifact.device_types_compatible:
output += f" Compatible Types: {', '.join(artifact.device_types_compatible)}\n"
if artifact.size:
output += f" Size: {artifact.size} bytes\n"
output += "\n"
return output
def _format_release_output(self, release) -> str:
"""Format release information for output."""
output = f"Release Name: {release.name}\n"
if release.modified:
output += f"Last Modified: {release.modified}\n"
if release.artifacts_count:
output += f"Artifacts Count: {release.artifacts_count}\n"
if release.notes:
output += f"Notes: {release.notes}\n"
if release.tags:
output += "Tags:\n"
for tag in release.tags:
output += f" - {tag.get('key', 'N/A')}: {tag.get('value', 'N/A')}\n"
if release.artifacts:
output += f"Artifacts ({len(release.artifacts)}):\n"
for artifact in release.artifacts:
output += f" • {artifact.get('name', 'N/A')}\n"
if artifact.get('id'):
output += f" ID: {artifact.get('id')}\n"
if artifact.get('size'):
size_mb = artifact.get('size') / (1024*1024)
output += f" Size: {size_mb:.1f} MB\n"
output += f" Signed: {artifact.get('signed', False)}\n"
if artifact.get('device_types_compatible'):
device_types = artifact.get('device_types_compatible', [])
output += self._format_device_types(device_types)
output += "\n"
return output
def _format_device_types(self, device_types) -> str:
"""Format device types list with bullet points and 64-char line wrapping."""
if not device_types:
return ""
if len(device_types) <= 3:
# For 3 or fewer types, use inline format
return f" Device Types: {', '.join(device_types)}\n"
# For more than 3 types, use bullet point format
output = f" Device Types ({len(device_types)}):\n"
for device_type in device_types:
# Format each device type as a bullet point
prefix = " • " # 8 characters
max_device_type_length = 64 - len(prefix) - 3 # 3 for "..."
if len(prefix + device_type) <= 64:
output += f"{prefix}{device_type}\n"
else:
# Wrap long device type names - truncate to fit within 64 chars
truncated = device_type[:max_device_type_length]
output += f"{prefix}{truncated}...\n"
return output
def _format_tags(self, tags) -> str:
"""Format tags list with bullet points and 64-char line wrapping."""
if not tags:
return ""
# Convert tags to formatted strings
tag_strings = []
for tag in tags:
key = tag.get('key', 'N/A')
value = tag.get('value', 'N/A')
tag_strings.append(f"{key}:{value}")
if len(tag_strings) <= 3:
# For 3 or fewer tags, use inline format
return f" Tags: {', '.join(tag_strings)}\n"
# For more than 3 tags, use bullet point format
output = f" Tags ({len(tag_strings)}): \n"
for tag_string in tag_strings:
# Format each tag as a bullet point
prefix = " • " # 6 characters
max_tag_length = 64 - len(prefix) - 3 # 3 for "..."
if len(prefix + tag_string) <= 64:
output += f"{prefix}{tag_string}\n"
else:
# Wrap long tag names - truncate to fit within 64 chars
truncated = tag_string[:max_tag_length]
output += f"{prefix}{truncated}...\n"
return output
def _format_releases_output(self, releases) -> str:
"""Format releases list for output."""
if not releases:
return "No releases found."
output = f"Found {len(releases)} release(s):\n\n"
for release in releases:
output += f"• {release.name}\n"
if release.modified:
output += f" Last Modified: {release.modified}\n"
if release.artifacts_count:
output += f" Artifacts: {release.artifacts_count}\n"
if release.notes:
output += f" Notes: {release.notes}\n"
if release.tags:
output += self._format_tags(release.tags)
# Show first artifact info if available
if release.artifacts and len(release.artifacts) > 0:
artifact = release.artifacts[0]
if artifact.get('size'):
size_mb = artifact.get('size') / (1024*1024)
output += f" Size: {size_mb:.1f} MB\n"
output += f" Signed: {artifact.get('signed', False)}\n"
output += "\n"
return output
def _format_device_inventory_output(self, inventory) -> str:
"""Format complete device inventory for display."""
output = f"Device ID: {inventory.device_id}\n"
if inventory.updated_ts:
output += f"Last Updated: {inventory.updated_ts}\n"
# Get group information if available
try:
group = self.mender_client.get_device_group(inventory.device_id)
if group:
output += f"Group: {group}\n"
except Exception:
# Ignore group errors to avoid breaking inventory display
pass
if not inventory.attributes:
output += "No inventory attributes found.\n"
return output
output += f"\nInventory Attributes ({len(inventory.attributes)}):\n"
for attr in inventory.attributes:
attr_name = attr.name
attr_value = str(attr.value)
# Truncate long values for readability
if len(attr_value) > 60:
attr_value = attr_value[:57] + "..."
output += f" • {attr_name}: {attr_value}\n"
return output
def _format_inventories_output(self, inventories) -> str:
"""Format device inventories list for display."""
if not inventories:
return "No device inventories found."
output = f"Found {len(inventories)} device inventories:\n\n"
for inventory in inventories:
output += f"• {inventory.device_id}\n"
if inventory.updated_ts:
output += f" Last Updated: {inventory.updated_ts}\n"
attr_count = len(inventory.attributes)
if attr_count > 0:
output += f" Attributes: {attr_count}\n"
# Show first few attributes as preview
preview_attrs = inventory.attributes[:3]
for attr in preview_attrs:
attr_value = str(attr.value)
if len(attr_value) > 30:
attr_value = attr_value[:27] + "..."
output += f" - {attr.name}: {attr_value}\n"
if attr_count > 3:
output += f" ... and {attr_count - 3} more\n"
else:
output += " No attributes\n"
output += "\n"
return output
def _format_inventory_groups_output(self, groups) -> str:
"""Format inventory groups information."""
if not groups:
return "No inventory groups found."
output = f"Found {len(groups)} inventory groups:\n\n"
for group in groups:
group_name = group.get("group", "Unknown")
device_count = group.get("device_count", 0)
output += f"• {group_name}\n"
if device_count > 0:
output += f" Devices: {device_count}\n"
else:
output += " No devices\n"
# Show group attributes if available
if "attributes" in group and group["attributes"]:
attrs = group["attributes"]
output += f" Group Attributes: {len(attrs)}\n"
for key, value in attrs.items():
value_str = str(value)
if len(value_str) > 40:
value_str = value_str[:37] + "..."
output += f" - {key}: {value_str}\n"
output += "\n"
return output
def _format_deployment_log_output(self, log) -> str:
"""Format deployment log for specific device."""
from .mender_api import MenderDeploymentLog
if not isinstance(log, MenderDeploymentLog):
return f"Invalid deployment log data: {str(log)}"
output = f"Deployment Log\n"
output += f"================\n"
output += f"Deployment ID: {log.deployment_id}\n"
output += f"Device ID: {log.device_id}\n"
if log.retrieved_at:
output += f"Retrieved: {log.retrieved_at}\n"
output += f"Log Entries: {len(log.entries)}\n\n"
if not log.entries:
output += "No log entries found.\n"
output += "Note: Deployment logs may only be available for failed deployments\n"
output += "or may not be enabled for this Mender configuration.\n"
return output
output += "Log Details:\n"
output += "------------\n"
for entry in log.entries:
# Format timestamp
timestamp_str = ""
if entry.timestamp:
timestamp_str = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S")
# Format level
level_str = ""
if entry.level:
level_str = f"[{entry.level}] "
# Format message with truncation if too long
message = entry.message
if len(message) > 200:
message = message[:197] + "..."
if timestamp_str:
output += f"{timestamp_str} {level_str}{message}\n"
else:
output += f"{level_str}{message}\n"
return output
def _format_deployment_logs_output(self, logs) -> str:
"""Format deployment logs for all devices."""
from .mender_api import MenderDeploymentLog
if not logs:
return "No deployment logs found.\n" \
"Note: Deployment logs may only be available for failed deployments\n" \
"or may not be enabled for this Mender configuration."
output = f"Deployment Logs Summary\n"
output += f"======================\n"
output += f"Found logs for {len(logs)} device(s):\n\n"
for log in logs:
if not isinstance(log, MenderDeploymentLog):
continue
output += f"• Device: {log.device_id}\n"
output += f" Log Entries: {len(log.entries)}\n"
if log.entries:
# Show first few log entries as preview
preview_entries = log.entries[:3]
for entry in preview_entries:
level_str = f"[{entry.level}] " if entry.level else ""
message = entry.message
if len(message) > 80:
message = message[:77] + "..."
output += f" {level_str}{message}\n"
if len(log.entries) > 3:
output += f" ... and {len(log.entries) - 3} more entries\n"
else:
output += " No log entries\n"
output += "\n"
output += "Use 'get_deployment_device_log' for complete logs of specific devices.\n"
return output
def _format_audit_log_output(self, audit_log) -> str:
"""Format audit log entries for display."""
from datetime import datetime
from .mender_api import MenderAuditLog
if not isinstance(audit_log, MenderAuditLog):
return f"Invalid audit log data: {str(audit_log)}"
output = f"Mender Audit Logs\n"
output += f"==================\n"
if audit_log.retrieved_at:
output += f"Retrieved: {audit_log.retrieved_at.strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
if audit_log.total_count is not None:
output += f"Total Entries: {audit_log.total_count}\n"
output += f"Showing: {len(audit_log.entries)}\n"
else:
output += f"Entries: {len(audit_log.entries)}\n"
output += "\n"
if not audit_log.entries:
output += "No audit log entries found.\n"
output += "This may be due to:\n"
output += "• Audit logging not enabled for this Mender instance\n"
output += "• Insufficient permissions to access audit logs\n"
output += "• No matching entries for the specified filters\n"
output += "• Audit logs API not available in this Mender version\n"
return output
output += "Audit Log Entries (newest first):\n"
output += "==================================\n"
# Sort entries by timestamp (newest first)
sorted_entries = sorted(
audit_log.entries,
key=lambda x: x.timestamp or datetime.min,
reverse=True
)
for i, entry in enumerate(sorted_entries):
if i > 0:
output += "\n" + "-" * 60 + "\n"
# Format timestamp
if entry.timestamp:
timestamp_str = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC")
output += f"Time: {timestamp_str}\n"
# Format user information
if entry.user:
output += f"User: {entry.user}\n"
# Format action
if entry.action:
output += f"Action: {entry.action}\n"
# Format object information
if entry.object_type or entry.object_id:
object_info = []
if entry.object_type:
object_info.append(f"Type: {entry.object_type}")
if entry.object_id:
# Truncate long object IDs for readability
obj_id = entry.object_id
if len(obj_id) > 40:
obj_id = obj_id[:37] + "..."
object_info.append(f"ID: {obj_id}")
output += f"Object: {', '.join(object_info)}\n"
# Format result/status
if entry.result:
output += f"Result: {entry.result}\n"
# Format context information
context_info = []
if entry.ip_address:
context_info.append(f"IP: {entry.ip_address}")
if entry.user_agent:
# Truncate long user agents
user_agent = entry.user_agent
if len(user_agent) > 60:
user_agent = user_agent[:57] + "..."
context_info.append(f"Agent: {user_agent}")
if context_info:
output += f"Context: {', '.join(context_info)}\n"
# Format additional details
if entry.details:
output += "Details:\n"
for key, value in entry.details.items():
# Truncate long detail values
value_str = str(value)
if len(value_str) > 80:
value_str = value_str[:77] + "..."
output += f" {key}: {value_str}\n"
return output
async def run(self) -> None:
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
@click.command()
@click.option(
"--server-url",
required=False,
help="Mender server URL (default: https://hosted.mender.io)",
default="https://hosted.mender.io"
)
@click.option(
"--access-token",
required=False,
help="Personal Access Token for authentication"
)
@click.option(
"--token-file",
required=False,
help="File containing the Personal Access Token"
)
def main(server_url: str, access_token: Optional[str], token_file: Optional[str]) -> None:
"""Run the Mender MCP server."""
# Get access token from various sources
token = None
if access_token:
token = access_token
elif token_file:
try:
with open(os.path.expanduser(token_file)) as f:
token = f.read().strip()
except FileNotFoundError:
click.echo(f"Error: Token file not found: {token_file}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error reading token file: {e}", err=True)
sys.exit(1)
else:
# Try environment variable
token = os.getenv("MENDER_ACCESS_TOKEN")
if not token:
click.echo(
"Error: No access token provided. Use --access-token, --token-file, "
"or set MENDER_ACCESS_TOKEN environment variable.",
err=True
)
sys.exit(1)
# Create and run server
server = MenderMCPServer(server_url, token)
try:
asyncio.run(server.run())
except KeyboardInterrupt:
click.echo("\nServer stopped by user.", err=True)
except Exception as e:
click.echo(f"Server error: {e}", err=True)
sys.exit(1)
finally:
server.mender_client.close()
if __name__ == "__main__":
main()