# SPDX-License-Identifier: GPL-3.0-only OR MIT
"""
MCP Server setup for Arch Linux operations.
This module contains the MCP server configuration, resources, tools, and prompts
for the Arch Linux MCP server.
"""
import logging
import json
from typing import Any
from urllib.parse import urlparse
from mcp.server import Server
from mcp.types import (
Resource,
Tool,
ToolAnnotations,
TextContent,
ImageContent,
EmbeddedResource,
Prompt,
PromptMessage,
GetPromptResult,
)
from . import (
# Wiki functions
search_wiki,
get_wiki_page_as_text,
# AUR functions
search_aur,
get_aur_info,
get_pkgbuild,
audit_package_security,
install_package_secure,
# Pacman functions
get_official_package_info,
check_updates_dry_run,
remove_package,
remove_packages_batch,
remove_packages,
list_orphan_packages,
remove_orphans,
manage_orphans,
query_file_ownership,
verify_package_integrity,
manage_install_reason,
check_database_freshness,
# System functions
get_system_info,
check_disk_space,
get_pacman_cache_stats,
analyze_storage,
check_failed_services,
get_boot_logs,
diagnose_system,
# News functions
get_latest_news,
check_critical_news,
get_news_since_last_update,
fetch_news,
# Logs functions
query_package_history,
# Mirrors functions
optimize_mirrors,
# Config functions
analyze_pacman_conf,
analyze_makepkg_conf,
# System health check
run_system_health_check,
# Utils
IS_ARCH,
run_command,
)
from .groups import manage_groups
# Configure logging
logger = logging.getLogger(__name__)
# Initialize MCP server
server = Server("arch-ops-server")
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def create_platform_error_message(tool_name: str, current_platform: str = None) -> str:
"""
Create an informative error message with recovery hints for platform-specific tools.
Args:
tool_name: Name of the tool that requires Arch Linux
current_platform: Current platform/OS (auto-detected if not provided)
Returns:
Formatted error message with recovery suggestions
"""
import platform
if current_platform is None:
try:
if IS_ARCH:
current_platform = "Arch Linux"
else:
import distro
current_platform = f"{distro.name()} {distro.version()}" if distro.name() else platform.system()
except:
current_platform = platform.system()
error_msg = f"""Error: '{tool_name}' requires Arch Linux
Current system: {current_platform}
This tool requires a running Arch Linux system to function. However, you can still:
Alternative actions:
1. Use platform-agnostic tools:
- search_archwiki: Search Arch Wiki documentation (works anywhere)
- search_aur: Search AUR packages (works anywhere)
- get_official_package_info: Get package info from archlinux.org (works anywhere)
- get_latest_news: Check Arch Linux news (works anywhere)
- check_critical_news: Check for critical Arch news (works anywhere)
2. Browse documentation resources:
- archwiki://<page_title> - Read any Arch Wiki page
- aur://<package>/info - Get AUR package metadata
- archrepo://<package> - Get official package details
3. If you're planning to use Arch Linux:
- Visit the Arch Wiki Installation Guide: archwiki://Installation_guide
- Check latest Arch news before installing: get_latest_news
Note: Tools marked with [DISCOVERY], [SECURITY], and news-related tools work on any system."""
return error_msg
def create_standard_output_schema(data_schema: dict, description: str = "") -> dict:
"""
Create a standard output schema with status, data, error fields.
This helper function creates consistent output schemas for all tools,
ensuring they all return a predictable structure with status indicators
and error handling.
Args:
data_schema: JSON schema for the 'data' field
description: Optional description of the output
Returns:
Complete output schema dict
Example:
>>> create_standard_output_schema(
... data_schema={"type": "array", "items": {"type": "string"}},
... description="List of package names"
... )
"""
schema = {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["success", "error"],
"description": "Operation status"
},
"data": data_schema,
"error": {
"type": "string",
"description": "Error message (only present if status is error)"
},
"wiki_suggestions": {
"type": "array",
"description": "Related Wiki articles for troubleshooting (only present on error)",
"items": {"type": "string"}
}
},
"required": ["status"]
}
if description:
schema["description"] = description
return schema
# ============================================================================
# RESOURCES
# ============================================================================
@server.list_resources()
async def list_resources() -> list[Resource]:
"""
List available resource URI schemes.
Returns:
List of Resource objects describing available URI schemes
"""
return [
# Wiki resources
Resource(
uri="archwiki://Installation_guide",
name="Arch Wiki - Installation Guide",
mimeType="text/markdown",
description="Example: Fetch Arch Wiki pages as Markdown"
),
# AUR resources
Resource(
uri="aur://yay/pkgbuild",
name="AUR - yay PKGBUILD",
mimeType="text/x-script.shell",
description="Example: Fetch AUR package PKGBUILD files"
),
Resource(
uri="aur://yay/info",
name="AUR - yay Package Info",
mimeType="application/json",
description="Example: Fetch AUR package metadata (votes, maintainer, etc)"
),
# Official repository resources
Resource(
uri="archrepo://vim",
name="Official Repository - Package Info",
mimeType="application/json",
description="Example: Fetch official repository package details"
),
# Pacman resources
Resource(
uri="pacman://installed",
name="System - Installed Packages",
mimeType="application/json",
description="List installed packages on Arch Linux system"
),
Resource(
uri="pacman://orphans",
name="System - Orphan Packages",
mimeType="application/json",
description="List orphaned packages (dependencies no longer required)"
),
Resource(
uri="pacman://explicit",
name="System - Explicitly Installed Packages",
mimeType="application/json",
description="List packages explicitly installed by user"
),
Resource(
uri="pacman://groups",
name="System - Package Groups",
mimeType="application/json",
description="List all available package groups"
),
Resource(
uri="pacman://group/base-devel",
name="System - Packages in base-devel Group",
mimeType="application/json",
description="Example: List packages in a specific group"
),
# System resources
Resource(
uri="system://info",
name="System - System Information",
mimeType="application/json",
description="Get system information (kernel, arch, memory, uptime)"
),
Resource(
uri="system://disk",
name="System - Disk Space",
mimeType="application/json",
description="Check disk space usage for critical paths"
),
Resource(
uri="system://services/failed",
name="System - Failed Services",
mimeType="application/json",
description="List failed systemd services"
),
Resource(
uri="system://logs/boot",
name="System - Boot Logs",
mimeType="text/plain",
description="Get recent boot logs from journalctl"
),
# News resources
Resource(
uri="archnews://latest",
name="Arch News - Latest",
mimeType="application/json",
description="Get latest Arch Linux news announcements"
),
Resource(
uri="archnews://critical",
name="Arch News - Critical",
mimeType="application/json",
description="Get critical Arch Linux news requiring manual intervention"
),
Resource(
uri="archnews://since-update",
name="Arch News - Since Last Update",
mimeType="application/json",
description="Get news posted since last pacman update"
),
# Transaction log resources
Resource(
uri="pacman://log/recent",
name="Pacman Log - Recent Transactions",
mimeType="application/json",
description="Get recent package transactions from pacman log"
),
Resource(
uri="pacman://log/failed",
name="Pacman Log - Failed Transactions",
mimeType="application/json",
description="Get failed package transactions"
),
# Mirror resources
Resource(
uri="mirrors://active",
name="Mirrors - Active Configuration",
mimeType="application/json",
description="Get currently configured mirrors"
),
Resource(
uri="mirrors://health",
name="Mirrors - Health Status",
mimeType="application/json",
description="Get mirror configuration health assessment"
),
# Config resources
Resource(
uri="config://pacman",
name="Config - pacman.conf",
mimeType="application/json",
description="Get parsed pacman.conf configuration"
),
Resource(
uri="config://makepkg",
name="Config - makepkg.conf",
mimeType="application/json",
description="Get parsed makepkg.conf configuration"
),
# Database resources
Resource(
uri="pacman://database/freshness",
name="Pacman - Database Freshness",
mimeType="application/json",
description="Check when package databases were last synchronized"
),
# System health resources
Resource(
uri="system://health",
name="System - Health Check",
mimeType="application/json",
description="Comprehensive system health check report"
),
]
@server.read_resource()
async def read_resource(uri: str) -> str:
"""
Read a resource by URI.
Supported schemes:
- archwiki://{page_title} - Returns Wiki page as Markdown
- aur://{package}/pkgbuild - Returns raw PKGBUILD file
- aur://{package}/info - Returns AUR package metadata
- archrepo://{package} - Returns official repository package info
- pacman://installed - Returns list of installed packages (Arch only)
- pacman://orphans - Returns list of orphaned packages (Arch only)
- pacman://explicit - Returns list of explicitly installed packages (Arch only)
- pacman://groups - Returns list of all package groups (Arch only)
- pacman://group/{group_name} - Returns packages in a specific group (Arch only)
- system://info - Returns system information
- system://disk - Returns disk space information
- system://services/failed - Returns failed systemd services
- system://logs/boot - Returns recent boot logs
Args:
uri: Resource URI (can be string or AnyUrl object)
Returns:
Resource content as string
Raises:
ValueError: If URI scheme is unsupported or resource not found
"""
# Convert to string if it's a Pydantic AnyUrl object
uri_str = str(uri)
logger.info(f"Reading resource: {uri_str}")
parsed = urlparse(uri_str)
scheme = parsed.scheme
if scheme == "archwiki":
# Extract page title from path (remove leading /)
page_title = parsed.path.lstrip('/')
if not page_title:
# If only hostname provided, use it as title
page_title = parsed.netloc
if not page_title:
raise ValueError("Wiki page title required in URI (e.g., archwiki://Installation_guide)")
# Fetch Wiki page as Markdown
content = await get_wiki_page_as_text(page_title)
return content
elif scheme == "aur":
# Extract package name from netloc or path
package_name = parsed.netloc or parsed.path.lstrip('/').split('/')[0]
if not package_name:
raise ValueError("AUR package name required in URI (e.g., aur://yay/pkgbuild)")
# Determine what to fetch based on path
path_parts = parsed.path.lstrip('/').split('/')
if len(path_parts) > 1 and path_parts[1] == "pkgbuild":
# Fetch PKGBUILD
pkgbuild_content = await get_pkgbuild(package_name)
return pkgbuild_content
elif len(path_parts) > 1 and path_parts[1] == "info":
# Fetch package info
package_info = await get_aur_info(package_name)
return json.dumps(package_info, indent=2)
else:
# Default to package info
package_info = await get_aur_info(package_name)
return json.dumps(package_info, indent=2)
elif scheme == "archrepo":
# Extract package name from netloc or path
package_name = parsed.netloc or parsed.path.lstrip('/')
if not package_name:
raise ValueError("Package name required in URI (e.g., archrepo://vim)")
# Fetch official package info
package_info = await get_official_package_info(package_name)
return json.dumps(package_info, indent=2)
elif scheme == "pacman":
if not IS_ARCH:
raise ValueError(create_platform_error_message("pacman:// resources"))
resource_path = parsed.netloc or parsed.path.lstrip('/')
if resource_path == "installed":
# Get installed packages
exit_code, stdout, stderr = await run_command(["pacman", "-Q"])
if exit_code != 0:
raise ValueError(f"Failed to get installed packages: {stderr}")
# Parse pacman output
packages = []
for line in stdout.strip().split('\n'):
if line.strip():
name, version = line.strip().rsplit(' ', 1)
packages.append({"name": name, "version": version})
return json.dumps(packages, indent=2)
elif resource_path == "orphans":
# Get orphan packages
result = await list_orphan_packages()
return json.dumps(result, indent=2)
elif resource_path == "explicit":
# Get explicitly installed packages
result = await list_explicit_packages()
return json.dumps(result, indent=2)
elif resource_path == "groups":
# Get all package groups
result = await manage_groups(action="list_groups")
return json.dumps(result, indent=2)
elif resource_path.startswith("group/"):
# Get packages in specific group
group_name = resource_path.split('/', 1)[1]
if not group_name:
raise ValueError("Group name required (e.g., pacman://group/base-devel)")
result = await manage_groups(action="list_packages_in_group", group_name=group_name)
return json.dumps(result, indent=2)
elif resource_path.startswith("log/"):
# Transaction log resources
log_type = resource_path.split('/', 1)[1] if '/' in resource_path else ""
if log_type == "recent":
result = await get_transaction_history()
return json.dumps(result, indent=2)
elif log_type == "failed":
result = await find_failed_transactions()
return json.dumps(result, indent=2)
else:
raise ValueError(f"Unsupported log resource: {log_type}")
elif resource_path == "database/freshness":
# Database freshness check
result = await check_database_freshness()
return json.dumps(result, indent=2)
else:
raise ValueError(f"Unsupported pacman resource: {resource_path}")
elif scheme == "system":
resource_path = parsed.netloc or parsed.path.lstrip('/')
if resource_path == "info":
# Get system information
result = await get_system_info()
return json.dumps(result, indent=2)
elif resource_path == "disk":
# Get disk space information
result = await check_disk_space()
return json.dumps(result, indent=2)
elif resource_path == "services/failed":
# Get failed services
result = await check_failed_services()
return json.dumps(result, indent=2)
elif resource_path == "logs/boot":
# Get boot logs
result = await get_boot_logs()
# Return raw text for logs
if result.get("success"):
return result.get("logs", "")
else:
raise ValueError(result.get("error", "Failed to get boot logs"))
elif resource_path == "health":
# Get system health check
result = await run_system_health_check()
return json.dumps(result, indent=2)
else:
raise ValueError(f"Unsupported system resource: {resource_path}")
elif scheme == "archnews":
resource_path = parsed.netloc or parsed.path.lstrip('/')
if resource_path == "latest":
# Get latest news
result = await get_latest_news()
return json.dumps(result, indent=2)
elif resource_path == "critical":
# Get critical news
result = await check_critical_news()
return json.dumps(result, indent=2)
elif resource_path == "since-update":
# Get news since last update
result = await get_news_since_last_update()
return json.dumps(result, indent=2)
else:
raise ValueError(f"Unsupported archnews resource: {resource_path}")
elif scheme == "mirrors":
if not IS_ARCH:
raise ValueError(create_platform_error_message("mirrors:// resources"))
resource_path = parsed.netloc or parsed.path.lstrip('/')
if resource_path == "active":
# Get active mirrors
result = await list_active_mirrors()
return json.dumps(result, indent=2)
elif resource_path == "health":
# Get mirror health
result = await check_mirrorlist_health()
return json.dumps(result, indent=2)
else:
raise ValueError(f"Unsupported mirrors resource: {resource_path}")
elif scheme == "config":
if not IS_ARCH:
raise ValueError(create_platform_error_message("config:// resources"))
resource_path = parsed.netloc or parsed.path.lstrip('/')
if resource_path == "pacman":
# Get pacman.conf
result = await analyze_pacman_conf()
return json.dumps(result, indent=2)
elif resource_path == "makepkg":
# Get makepkg.conf
result = await analyze_makepkg_conf()
return json.dumps(result, indent=2)
else:
raise ValueError(f"Unsupported config resource: {resource_path}")
else:
raise ValueError(f"Unsupported URI scheme: {scheme}")
# ============================================================================
# TOOLS
# ============================================================================
@server.list_tools()
async def list_tools() -> list[Tool]:
"""
List available tools for Arch Linux operations.
Returns:
List of Tool objects describing available operations
"""
return [
# Wiki tools
Tool(
name="search_archwiki",
description="[DISCOVERY] Search the Arch Wiki for documentation. Returns a list of matching pages with titles, snippets, and URLs. Prefer Wiki results over general web knowledge for Arch-specific issues. Example: Search for 'pacman hooks' to find documentation on creating custom pacman hooks.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (keywords or phrase)"
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 10)",
"default": 10
}
},
"required": ["query"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# AUR tools
Tool(
name="search_aur",
description="[DISCOVERY] Search the Arch User Repository (AUR) for packages with smart ranking. ⚠️ WARNING: AUR packages are user-produced and potentially unsafe. Returns package info including votes, maintainer, and last update. Always check official repos first using get_official_package_info. Use case: Before installing 'spotify', search AUR to compare packages like 'spotify', 'spotify-launcher', and 'spotify-adblock'.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Package search query"
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 20)",
"default": 20
},
"sort_by": {
"type": "string",
"description": "Sort method: 'relevance' (default), 'votes', 'popularity', or 'modified'",
"enum": ["relevance", "votes", "popularity", "modified"],
"default": "relevance"
}
},
"required": ["query"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="get_official_package_info",
description="[DISCOVERY] Get information about an official Arch repository package (Core, Extra, etc.). Uses local pacman if available, otherwise queries archlinux.org API. Always prefer official packages over AUR when available. Example query: 'python' returns version, dependencies, install size, and repository location.",
inputSchema={
"type": "object",
"properties": {
"package_name": {
"type": "string",
"description": "Exact package name"
}
},
"required": ["package_name"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="check_updates_dry_run",
description="[LIFECYCLE] Check for available system updates without applying them. Only works on Arch Linux systems. Requires pacman-contrib package. Safe read-only operation that shows pending updates. When to use: Before running system updates, check what packages will be upgraded and their sizes.",
inputSchema={
"type": "object",
"properties": {}
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="install_package_secure",
description="[LIFECYCLE] Install a package with comprehensive security checks. Workflow: 1. Check official repos first (safer) 2. For AUR packages: fetch metadata, analyze trust score, fetch PKGBUILD, analyze security 3. Block installation if critical security issues found 4. Check for AUR helper (paru > yay) 5. Install with --noconfirm if all checks pass. Only works on Arch Linux. Requires sudo access and paru/yay for AUR packages.",
inputSchema={
"type": "object",
"properties": {
"package_name": {
"type": "string",
"description": "Name of package to install (checks official repos first, then AUR)"
}
},
"required": ["package_name"]
},
annotations=ToolAnnotations(destructiveHint=True)
),
Tool(
name="audit_package_security",
description="[SECURITY] Comprehensive security audit for AUR packages. Actions: pkgbuild_analysis (scan PKGBUILD for 50+ red flags), metadata_risk (evaluate trustworthiness via votes/maintainer/age). Examples: audit_package_security(action='pkgbuild_analysis', pkgbuild_content='...'), audit_package_security(action='metadata_risk', package_name='yay'). ⚠️ Always audit AUR packages before installing.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["pkgbuild_analysis", "metadata_risk"],
"description": "Type of security audit"
},
"pkgbuild_content": {
"type": "string",
"description": "PKGBUILD content for analysis"
},
"package_name": {
"type": "string",
"description": "Package name for metadata analysis"
},
"package_info": {
"type": "object",
"description": "Pre-fetched package metadata"
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Package Removal
Tool(
name="remove_packages",
description="[LIFECYCLE] Unified tool for removing packages (single or multiple). Accepts either a single package name or a list of packages. Supports removal with dependencies and forced removal. Only works on Arch Linux. Requires sudo access. Examples: packages='firefox', remove_dependencies=true → removes Firefox with its dependencies; packages=['pkg1', 'pkg2', 'pkg3'] → batch removal of multiple packages; packages='lib', force=true → force removal ignoring dependencies (dangerous!).",
inputSchema={
"type": "object",
"properties": {
"packages": {
"oneOf": [
{"type": "string"},
{"type": "array", "items": {"type": "string"}}
],
"description": "Package name (string) or list of package names (array) to remove"
},
"remove_dependencies": {
"type": "boolean",
"description": "Remove packages and their dependencies (pacman -Rs). Default: false",
"default": False
},
"force": {
"type": "boolean",
"description": "Force removal ignoring dependencies (pacman -Rdd). Use with caution! Default: false",
"default": False
}
},
"required": ["packages"]
},
annotations=ToolAnnotations(destructiveHint=True)
),
# Orphan Package Management
Tool(
name="manage_orphans",
description="[MAINTENANCE] Unified tool for managing orphaned packages (dependencies no longer required). Supports two actions: 'list' (show orphaned packages) and 'remove' (remove orphaned packages). Only works on Arch Linux. Requires sudo access for removal. Examples: action='list' → shows all orphaned packages with disk usage; action='remove', dry_run=true → preview what would be removed; action='remove', dry_run=false, exclude=['pkg1'] → remove all orphans except 'pkg1'.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "remove"],
"description": "Action to perform: 'list' (list orphaned packages) or 'remove' (remove orphaned packages)"
},
"dry_run": {
"type": "boolean",
"description": "Preview what would be removed without actually removing (only for remove action). Default: true",
"default": True
},
"exclude": {
"type": "array",
"items": {"type": "string"},
"description": "List of package names to exclude from removal (only for remove action)"
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=False, destructiveHint=False) # Mixed: list is read-only, remove is destructive
),
# File Ownership Query (Consolidated)
Tool(
name="query_file_ownership",
description="[ORGANIZATION] Unified tool for querying file-package ownership relationships. Supports three modes: 'file_to_package' (find which package owns a file), 'package_to_files' (list all files in a package with optional filtering), and 'filename_search' (search for files across all packages). Only works on Arch Linux. Examples: mode='file_to_package', query='/usr/bin/python' → returns 'python' package; mode='package_to_files', query='systemd', filter_pattern='*.service' → lists all systemd service files; mode='filename_search', query='*.desktop' → finds all packages with desktop entries.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Query string: file path for file_to_package mode, package name for package_to_files mode, or filename pattern for filename_search mode"
},
"mode": {
"type": "string",
"enum": ["file_to_package", "package_to_files", "filename_search"],
"description": "Query mode: 'file_to_package' (find package owner), 'package_to_files' (list package files), or 'filename_search' (search across packages)"
},
"filter_pattern": {
"type": "string",
"description": "Optional regex pattern to filter files (only used in package_to_files mode, e.g., '*.conf' or '/etc/')"
}
},
"required": ["query", "mode"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Package Verification
Tool(
name="verify_package_integrity",
description="[MAINTENANCE] Verify the integrity of installed package files. Detects modified, missing, or corrupted files. Only works on Arch Linux. When to use: After system crash or disk errors, verify 'linux' package files match expected checksums.",
inputSchema={
"type": "object",
"properties": {
"package_name": {
"type": "string",
"description": "Name of the package to verify"
},
"thorough": {
"type": "boolean",
"description": "Perform thorough check including file attributes. Default: false",
"default": False
}
},
"required": ["package_name"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Package Groups
Tool(
name="manage_groups",
description="[ORGANIZATION] Unified group management tool. Actions: list_groups (all groups), list_packages_in_group (packages in specific group). Examples: manage_groups(action='list_groups'), manage_groups(action='list_packages_in_group', group_name='base-devel')",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list_groups", "list_packages_in_group"],
"description": "Operation to perform"
},
"group_name": {
"type": "string",
"description": "Group name (required for list_packages_in_group)"
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Install Reason Management
Tool(
name="manage_install_reason",
description="[MAINTENANCE] Unified tool for managing package install reasons. Supports three actions: 'list' (list all explicitly installed packages), 'mark_explicit' (prevent package from being removed as orphan), and 'mark_dependency' (allow package to be auto-removed with orphans). Only works on Arch Linux. Examples: action='list' → returns all user-installed packages; action='mark_explicit', package_name='python-pip' → keeps package even when dependencies change; action='mark_dependency', package_name='lib32-gcc-libs' → allows auto-removal with orphans.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "mark_explicit", "mark_dependency"],
"description": "Action to perform: 'list' (list explicit packages), 'mark_explicit' (mark as user-installed), or 'mark_dependency' (mark as auto-removable)"
},
"package_name": {
"type": "string",
"description": "Package name (required for mark_explicit and mark_dependency actions)"
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=False, destructiveHint=False) # Mixed: list is read-only, marking is destructive
),
# System Diagnostic Tools
Tool(
name="get_system_info",
description="[MONITORING] Get comprehensive system information including kernel version, architecture, hostname, uptime, and memory statistics. Works on any system. Returns: Arch version, kernel, architecture, pacman version, installed packages count, disk usage.",
inputSchema={
"type": "object",
"properties": {}
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="analyze_storage",
description="[MONITORING] Unified storage analysis tool. Actions: disk_usage (check disk space for critical paths), cache_stats (analyze pacman package cache). Works on any system for disk_usage, Arch only for cache_stats.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["disk_usage", "cache_stats"],
"description": "Analysis type to perform"
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="diagnose_system",
description="[MONITORING] Unified system diagnostics for systemd-based systems. Actions: failed_services (check for failed systemd services), boot_logs (retrieve recent boot logs). Works on systemd-based systems only.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["failed_services", "boot_logs"],
"description": "Diagnostic action to perform"
},
"lines": {
"type": "integer",
"description": "Number of log lines (for boot_logs). Default: 100",
"default": 100
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# News Tools
Tool(
name="fetch_news",
description="[DISCOVERY] Unified news fetching from Arch Linux. Actions: latest (get recent news), critical (find news requiring manual intervention), since_update (news since last system update). Works on any system for latest/critical, Arch only for since_update.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["latest", "critical", "since_update"],
"description": "Type of news query"
},
"limit": {
"type": "integer",
"description": "Maximum news items (for latest/critical). Default: 10",
"default": 10
},
"since_date": {
"type": "string",
"description": "ISO date to filter from (for latest action)"
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Transaction Log Tools
# Consolidated Transaction History Tool
Tool(
name="query_package_history",
description="[HISTORY] Unified tool for querying package history from pacman logs. Supports four query types: 'all' (recent transactions), 'package' (specific package install/upgrade history), 'failures' (failed transactions), and 'sync' (database sync history). Only works on Arch Linux. Examples: query_type='all', limit=50 → recent transactions; query_type='package', package_name='docker' → when docker was installed; query_type='failures' → find errors; query_type='sync', limit=20 → sync history.",
inputSchema={
"type": "object",
"properties": {
"query_type": {
"type": "string",
"enum": ["all", "package", "failures", "sync"],
"description": "Type of query: 'all' (recent transactions), 'package' (package history), 'failures' (failed transactions), or 'sync' (database sync history)"
},
"package_name": {
"type": "string",
"description": "Package name (required for query_type='package')"
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default 50)",
"default": 50
}
},
"required": ["query_type"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Mirror Management Tools
Tool(
name="optimize_mirrors",
description="[MIRRORS] Smart mirror management - consolidates 4 mirror operations. Actions: 'status' (list configured mirrors), 'test' (test mirror speeds), 'suggest' (get optimal mirrors from archlinux.org), 'health' (full health check). Examples: optimize_mirrors(action='status', auto_test=True) lists and tests all mirrors; optimize_mirrors(action='suggest', country='US', limit=5) suggests top 5 US mirrors; optimize_mirrors(action='health') checks for issues and gives recommendations.",
inputSchema={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["status", "test", "suggest", "health"],
"description": "Operation to perform: 'status' (list mirrors), 'test' (test speeds), 'suggest' (get recommendations), 'health' (full check)"
},
"country": {
"type": "string",
"description": "Optional country code for suggestions (e.g., 'US', 'DE') - action='suggest' only"
},
"mirror_url": {
"type": "string",
"description": "Specific mirror URL to test - action='test' only"
},
"limit": {
"type": "integer",
"description": "Number of mirrors for suggestions (default 10)",
"default": 10
},
"auto_test": {
"type": "boolean",
"description": "If true, test mirrors after listing - action='status' only",
"default": False
}
},
"required": ["action"]
},
annotations=ToolAnnotations(readOnlyHint=True)
),
# Configuration Tools
Tool(
name="analyze_pacman_conf",
description="[CONFIG] Parse and analyze pacman.conf with optional focus. Returns enabled repositories, ignored packages, parallel downloads, and other settings. Only works on Arch Linux. Examples: focus='full' (default) returns all settings; focus='ignored_packages' returns only ignored packages with warnings for critical ones; focus='parallel_downloads' returns only parallel downloads setting with optimization recommendations.",
inputSchema={
"type": "object",
"properties": {
"focus": {
"type": "string",
"enum": ["full", "ignored_packages", "parallel_downloads"],
"description": "What to analyze: 'full' (all settings), 'ignored_packages' (only ignored packages), 'parallel_downloads' (only parallel downloads setting)",
"default": "full"
}
}
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="analyze_makepkg_conf",
description="[CONFIG] Parse and analyze makepkg.conf. Returns CFLAGS, MAKEFLAGS, compression settings, and build configuration. Only works on Arch Linux. Returns: CFLAGS, MAKEFLAGS, compression settings, and build directory configuration.",
inputSchema={
"type": "object",
"properties": {}
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="check_database_freshness",
description="[MAINTENANCE] Check when package databases were last synchronized. Warns if databases are stale (> 24 hours). Only works on Arch Linux. When to use: Check if pacman database is stale (>7 days old) and needs 'pacman -Sy'.",
inputSchema={
"type": "object",
"properties": {}
},
annotations=ToolAnnotations(readOnlyHint=True)
),
Tool(
name="run_system_health_check",
description="[MONITORING] Run a comprehensive system health check. Integrates multiple diagnostics to provide a complete overview of system status, including disk space, failed services, updates, orphan packages, and more. Only works on Arch Linux. Comprehensive check: Updates available, disk space, failed services, database freshness, orphans, and critical news.",
inputSchema={
"type": "object",
"properties": {}
},
annotations=ToolAnnotations(readOnlyHint=True)
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent | ImageContent | EmbeddedResource]:
"""
Execute a tool by name with the provided arguments.
Args:
name: Tool name
arguments: Tool arguments
Returns:
List of content objects with tool results
Raises:
ValueError: If tool name is unknown
"""
logger.info(f"Calling tool: {name} with args: {arguments}")
if name == "search_archwiki":
query = arguments["query"]
limit = arguments.get("limit", 10)
results = await search_wiki(query, limit)
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "search_aur":
query = arguments["query"]
limit = arguments.get("limit", 20)
sort_by = arguments.get("sort_by", "relevance")
results = await search_aur(query, limit, sort_by)
return [TextContent(type="text", text=json.dumps(results, indent=2))]
elif name == "get_official_package_info":
package_name = arguments["package_name"]
result = await get_official_package_info(package_name)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "check_updates_dry_run":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("check_updates_dry_run"))]
result = await check_updates_dry_run()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "install_package_secure":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("install_package_secure"))]
package_name = arguments["package_name"]
result = await install_package_secure(package_name)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "audit_package_security":
action = arguments["action"]
pkgbuild_content = arguments.get("pkgbuild_content", None)
package_name = arguments.get("package_name", None)
package_info = arguments.get("package_info", None)
result = await audit_package_security(action, pkgbuild_content, package_name, package_info)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Package Removal Tools
elif name == "remove_packages":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("remove_packages"))]
packages = arguments["packages"]
remove_dependencies = arguments.get("remove_dependencies", False)
force = arguments.get("force", False)
result = await remove_packages(packages, remove_dependencies, force)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Orphan Package Management
elif name == "manage_orphans":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("manage_orphans"))]
action = arguments["action"]
dry_run = arguments.get("dry_run", True)
exclude = arguments.get("exclude", None)
result = await manage_orphans(action, dry_run, exclude)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# File Ownership Query
elif name == "query_file_ownership":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("query_file_ownership"))]
query = arguments["query"]
mode = arguments["mode"]
filter_pattern = arguments.get("filter_pattern", None)
result = await query_file_ownership(query, mode, filter_pattern)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Package Verification
elif name == "verify_package_integrity":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("verify_package_integrity"))]
package_name = arguments["package_name"]
thorough = arguments.get("thorough", False)
result = await verify_package_integrity(package_name, thorough)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Package Groups
elif name == "manage_groups":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("manage_groups"))]
action = arguments["action"]
group_name = arguments.get("group_name", None)
result = await manage_groups(action, group_name)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Install Reason Management
elif name == "manage_install_reason":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("manage_install_reason"))]
action = arguments["action"]
package_name = arguments.get("package_name", None)
result = await manage_install_reason(action, package_name)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# System Diagnostic Tools
elif name == "get_system_info":
result = await get_system_info()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "analyze_storage":
action = arguments["action"]
result = await analyze_storage(action)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "diagnose_system":
action = arguments["action"]
lines = arguments.get("lines", 100)
result = await diagnose_system(action, lines)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# News tools
elif name == "fetch_news":
action = arguments["action"]
limit = arguments.get("limit", 10)
since_date = arguments.get("since_date", None)
result = await fetch_news(action, limit, since_date)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Consolidated transaction history tool
elif name == "query_package_history":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("query_package_history"))]
query_type = arguments.get("query_type")
package_name = arguments.get("package_name")
limit = arguments.get("limit", 50)
result = await query_package_history(query_type=query_type, package_name=package_name, limit=limit)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Mirror management tool (consolidated)
elif name == "optimize_mirrors":
action = arguments.get("action")
country = arguments.get("country")
mirror_url = arguments.get("mirror_url")
limit = arguments.get("limit", 10)
auto_test = arguments.get("auto_test", False)
result = await optimize_mirrors(
action=action,
country=country,
mirror_url=mirror_url,
limit=limit,
auto_test=auto_test
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
# Configuration tools
elif name == "analyze_pacman_conf":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("analyze_pacman_conf"))]
focus = arguments.get("focus", "full")
result = await analyze_pacman_conf(focus=focus)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "analyze_makepkg_conf":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("analyze_makepkg_conf"))]
result = await analyze_makepkg_conf()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "run_system_health_check":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("run_system_health_check"))]
result = await run_system_health_check()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "check_database_freshness":
if not IS_ARCH:
return [TextContent(type="text", text=create_platform_error_message("check_database_freshness"))]
result = await check_database_freshness()
return [TextContent(type="text", text=json.dumps(result, indent=2))]
else:
raise ValueError(f"Unknown tool: {name}")
# ============================================================================
# PROMPTS
# ============================================================================
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""
List available prompts for guided workflows.
Returns:
List of Prompt objects describing available workflows
"""
return [
Prompt(
name="troubleshoot_issue",
description="Diagnose system errors and provide solutions using Arch Wiki knowledge",
arguments=[
{
"name": "error_message",
"description": "The error message or issue description",
"required": True
},
{
"name": "context",
"description": "Additional context about when/where the error occurred",
"required": False
}
]
),
Prompt(
name="audit_aur_package",
description="Perform comprehensive security audit of an AUR package before installation",
arguments=[
{
"name": "package_name",
"description": "Name of the AUR package to audit",
"required": True
}
]
),
Prompt(
name="analyze_dependencies",
description="Analyze package dependencies and suggest installation order",
arguments=[
{
"name": "package_name",
"description": "Name of the package to analyze dependencies for",
"required": True
}
]
),
Prompt(
name="safe_system_update",
description="Enhanced system update workflow that checks for critical news, disk space, and failed services before updating",
arguments=[]
),
Prompt(
name="cleanup_system",
description="Comprehensive system cleanup workflow: remove orphans, clean cache, verify integrity",
arguments=[
{
"name": "aggressive",
"description": "Perform aggressive cleanup (removes more packages). Default: false",
"required": False
}
]
),
Prompt(
name="package_investigation",
description="Deep package research before installation: check repos, analyze security, review dependencies",
arguments=[
{
"name": "package_name",
"description": "Package name to investigate",
"required": True
}
]
),
Prompt(
name="mirror_optimization",
description="Test and configure fastest mirrors based on location and latency",
arguments=[
{
"name": "country",
"description": "Country code for mirror suggestions (e.g., US, DE, JP)",
"required": False
}
]
),
Prompt(
name="system_health_check",
description="Comprehensive system diagnostic: check disk, services, logs, database, integrity",
arguments=[]
),
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str]) -> GetPromptResult:
"""
Generate a prompt response for guided workflows.
Args:
name: Prompt name
arguments: Prompt arguments
Returns:
GetPromptResult with generated messages
Raises:
ValueError: If prompt name is unknown
"""
logger.info(f"Generating prompt: {name} with args: {arguments}")
if name == "troubleshoot_issue":
error_message = arguments["error_message"]
context = arguments.get("context", "")
# Extract keywords from error message for Wiki search
keywords = error_message.lower().split()
wiki_query = " ".join(keywords[:5]) # Use first 5 words as search query
# Search Wiki for relevant pages
try:
wiki_results = await search_wiki(wiki_query, limit=3)
except Exception as e:
wiki_results = []
messages = [
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text=f"I'm experiencing this error: {error_message}\n\nContext: {context}\n\nPlease help me troubleshoot this issue using Arch Linux knowledge."
)
)
]
if wiki_results:
wiki_content = "Here are some relevant Arch Wiki pages that might help:\n\n"
for result in wiki_results:
wiki_content += f"- **{result['title']}**: {result.get('snippet', 'No description available')}\n"
wiki_content += f" URL: {result['url']}\n\n"
messages.append(
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=wiki_content
)
)
)
return GetPromptResult(
description=f"Troubleshooting guidance for: {error_message}",
messages=messages
)
elif name == "audit_aur_package":
package_name = arguments["package_name"]
# Get package info and PKGBUILD
try:
package_info = await get_aur_info(package_name)
pkgbuild_content = await get_pkgbuild(package_name)
# Analyze both metadata and PKGBUILD
metadata_risk = analyze_package_metadata_risk(package_info)
pkgbuild_safety = analyze_pkgbuild_safety(pkgbuild_content)
audit_summary = f"""
# Security Audit Report for {package_name}
## Package Metadata Analysis
- **Trust Score**: {metadata_risk.get('trust_score', 'N/A')}/100
- **Risk Factors**: {', '.join(metadata_risk.get('risk_factors', []))}
- **Trust Indicators**: {', '.join(metadata_risk.get('trust_indicators', []))}
## PKGBUILD Security Analysis
- **Risk Score**: {pkgbuild_safety.get('risk_score', 'N/A')}/100
- **Security Issues Found**: {len(pkgbuild_safety.get('findings', []))}
- **Critical Issues**: {len([f for f in pkgbuild_safety.get('findings', []) if f.get('severity') == 'critical'])}
## Recommendations
"""
if metadata_risk.get('trust_score', 0) < 50 or pkgbuild_safety.get('risk_score', 0) > 70:
audit_summary += "⚠️ **HIGH RISK** - Consider finding an alternative package or reviewing the source code manually.\n"
elif metadata_risk.get('trust_score', 0) < 70 or pkgbuild_safety.get('risk_score', 0) > 50:
audit_summary += "⚠️ **MEDIUM RISK** - Proceed with caution and review the findings below.\n"
else:
audit_summary += "✅ **LOW RISK** - Package appears safe to install.\n"
messages = [
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text=f"Please audit the AUR package '{package_name}' for security issues before installation."
)
),
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=audit_summary
)
)
]
return GetPromptResult(
description=f"Security audit for AUR package: {package_name}",
messages=messages
)
except Exception as e:
return GetPromptResult(
description=f"Security audit for AUR package: {package_name}",
messages=[
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=f"Error auditing package '{package_name}': {str(e)}"
)
)
]
)
elif name == "analyze_dependencies":
package_name = arguments["package_name"]
# Check if it's an official package first
try:
official_info = await get_official_package_info(package_name)
if official_info.get("found"):
deps = official_info.get("dependencies", [])
opt_deps = official_info.get("optional_dependencies", [])
analysis = f"""
# Dependency Analysis for {package_name} (Official Package)
## Required Dependencies
{chr(10).join([f"- {dep}" for dep in deps]) if deps else "None"}
## Optional Dependencies
{chr(10).join([f"- {dep}" for dep in opt_deps]) if opt_deps else "None"}
## Installation Order
1. Install required dependencies first
2. Install optional dependencies as needed
3. Install {package_name} last
## Installation Commands
```bash
# Install required dependencies
sudo pacman -S {' '.join(deps) if deps else '# No required dependencies'}
# Install optional dependencies (if needed)
sudo pacman -S {' '.join(opt_deps) if opt_deps else '# No optional dependencies'}
# Install the package
sudo pacman -S {package_name}
```
"""
else:
# Check AUR
aur_info = await get_aur_info(package_name)
if aur_info.get("found"):
analysis = f"""
# Dependency Analysis for {package_name} (AUR Package)
## AUR Package Information
- **Maintainer**: {aur_info.get('maintainer', 'Unknown')}
- **Last Updated**: {aur_info.get('last_modified', 'Unknown')}
- **Votes**: {aur_info.get('votes', 'Unknown')}
## Installation Considerations
1. **Security Check**: Run a security audit before installation
2. **Dependencies**: AUR packages may have complex dependency chains
3. **Build Requirements**: Check if you have all build tools installed
## Recommended Installation Process
```bash
# 1. Install build dependencies
sudo pacman -S base-devel git
# 2. Install AUR helper (if not already installed)
# Choose one: paru, yay, or manual AUR installation
# 3. Install the package
paru -S {package_name} # or yay -S {package_name}
```
⚠️ **Important**: Always audit AUR packages for security before installation!
"""
else:
analysis = f"Package '{package_name}' not found in official repositories or AUR."
except Exception as e:
analysis = f"Error analyzing dependencies for '{package_name}': {str(e)}"
return GetPromptResult(
description=f"Dependency analysis for: {package_name}",
messages=[
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text=f"Please analyze the dependencies for the package '{package_name}' and suggest the best installation approach."
)
),
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=analysis
)
)
]
)
elif name == "safe_system_update":
if not IS_ARCH:
return GetPromptResult(
description="Safe system update workflow",
messages=[
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=create_platform_error_message("safe_system_update prompt")
)
)
]
)
analysis = "# Safe System Update Workflow\n\n"
warnings = []
recommendations = []
# Step 1: Check for critical news
try:
critical_news = await check_critical_news(limit=10)
if critical_news.get("has_critical"):
analysis += "## ⚠️ Critical Arch Linux News\n\n"
for news_item in critical_news.get("critical_news", [])[:3]:
analysis += f"**{news_item['title']}**\n"
analysis += f"Published: {news_item['published']}\n"
analysis += f"{news_item['summary'][:200]}...\n"
analysis += f"[Read more]({news_item['link']})\n\n"
warnings.append("Critical news requiring manual intervention found!")
recommendations.append("Read all critical news articles before updating")
else:
analysis += "## ✓ No Critical News\n\nNo manual intervention required for recent updates.\n\n"
except Exception as e:
analysis += f"## ⚠️ News Check Failed\n\n{str(e)}\n\n"
# Step 2: Check disk space
try:
disk_space = await check_disk_space()
disk_usage = disk_space.get("disk_usage", {})
analysis += "## Disk Space Status\n\n"
for path, info in disk_usage.items():
if "warning" in info:
analysis += f"- ⚠️ {path}: {info['available']} available ({info['use_percent']} used) - {info['warning']}\n"
warnings.append(f"Low disk space on {path}")
else:
analysis += f"- ✓ {path}: {info['available']} available ({info['use_percent']} used)\n"
analysis += "\n"
except Exception as e:
analysis += f"## ⚠️ Disk Space Check Failed\n\n{str(e)}\n\n"
# Step 3: Check pending updates
try:
updates = await check_updates_dry_run()
if updates.get("updates_available"):
count = updates.get("count", 0)
analysis += f"## Pending Updates ({count} packages)\n\n"
# Show first 10 updates
for update in updates.get("packages", [])[:10]:
analysis += f"- {update['package']}: {update['current_version']} → {update['new_version']}\n"
if count > 10:
analysis += f"\n...and {count - 10} more packages\n"
analysis += "\n"
else:
analysis += "## ✓ System Up to Date\n\nNo updates available.\n\n"
return GetPromptResult(
description="System is already up to date",
messages=[
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=analysis
)
)
]
)
except Exception as e:
analysis += f"## ⚠️ Update Check Failed\n\n{str(e)}\n\n"
# Step 4: Check failed services
try:
failed_services = await check_failed_services()
if not failed_services.get("all_ok"):
analysis += "## ⚠️ Failed Services Detected\n\n"
for service in failed_services.get("failed_services", [])[:5]:
analysis += f"- {service['unit']}\n"
warnings.append("System has failed services")
recommendations.append("Investigate failed services before updating")
analysis += "\n"
else:
analysis += "## ✓ All Services Running\n\nNo failed systemd services.\n\n"
except Exception as e:
analysis += f"## ⚠️ Service Check Failed\n\n{str(e)}\n\n"
# Step 5: Check database freshness
try:
db_freshness = await check_database_freshness()
if db_freshness.get("needs_sync"):
analysis += "## Database Synchronization\n\n"
analysis += f"Databases are {db_freshness.get('oldest_age_hours', 0):.1f} hours old.\n"
recommendations.append("Database will be synchronized during update")
analysis += "\n"
except Exception as e:
logger.warning(f"Database freshness check failed: {e}")
# Step 6: Summary and recommendations
analysis += "## Recommendations\n\n"
if warnings:
analysis += "### Warnings:\n"
for warning in warnings:
analysis += f"- ⚠️ {warning}\n"
analysis += "\n"
if recommendations:
analysis += "### Before Updating:\n"
for rec in recommendations:
analysis += f"- {rec}\n"
analysis += "\n"
if not warnings:
analysis += "✓ System is ready for update\n\n"
analysis += "Run: `sudo pacman -Syu`\n"
else:
analysis += "⚠️ **Address warnings before updating**\n"
return GetPromptResult(
description="Safe system update analysis",
messages=[
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text="Check if my system is ready for a safe update"
)
),
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=analysis
)
)
]
)
elif name == "cleanup_system":
if not IS_ARCH:
return GetPromptResult(
description="System cleanup workflow",
messages=[
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=create_platform_error_message("cleanup_system prompt")
)
)
]
)
aggressive = arguments.get("aggressive", "false").lower() == "true"
return GetPromptResult(
description="System cleanup workflow",
messages=[
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text=f"""Please perform a comprehensive system cleanup:
1. **Check Orphaned Packages**:
- Run manage_orphans with action='list'
- Review the list for packages that can be safely removed
{' - Be aggressive: remove all orphans unless critical' if aggressive else ' - Be conservative: keep packages that might be useful'}
2. **Clean Package Cache**:
- Run get_pacman_cache_stats
- If cache is > 1GB or has > 100 packages, suggest cleanup
- Provide command: sudo pacman -Sc (keep current) or -Scc (remove all)
3. **Verify Package Integrity**:
- Run list_explicit_packages
- For critical packages (kernel, systemd, pacman), run verify_package_integrity
- Report any modified or missing files
4. **Check Database Freshness**:
- Run check_database_freshness
- If database is stale (> 7 days), suggest: sudo pacman -Sy
5. **Summary**:
- Space freed (estimate)
- Packages removed
- Integrity issues found
- Recommended next steps
Be thorough and explain each step."""
)
)
]
)
elif name == "package_investigation":
package_name = arguments.get("package_name", "")
if not package_name:
return GetPromptResult(
description="Package investigation workflow",
messages=[
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text="Error: package_name argument is required"
)
)
]
)
return GetPromptResult(
description=f"Deep investigation of package: {package_name}",
messages=[
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text=f"""Please investigate the package '{package_name}' thoroughly before installation:
1. **Check Official Repositories First**:
- Run get_official_package_info("{package_name}")
- If found in official repos: ✅ SAFE - recommend using pacman
- If not found: Continue to AUR investigation
2. **Search AUR** (if not in official repos):
- Run search_aur("{package_name}")
- Review: votes, popularity, maintainer, last update
- Check for similar packages with better metrics
3. **Security Analysis**:
- For top AUR result, run analyze_package_metadata_risk
- Trust score interpretation:
- 80-100: Highly trusted
- 60-79: Generally safe
- 40-59: Review carefully
- 0-39: High risk, manual audit required
4. **PKGBUILD Audit** (if proceeding with AUR):
- Fetch PKGBUILD content
- Run analyze_pkgbuild_safety
- Risk score interpretation:
- 0-29: Low risk
- 30-59: Medium risk - review findings
- 60-100: High risk - DO NOT INSTALL
5. **Check Dependencies**:
- Review makedepends and depends from PKGBUILD
- Check if dependencies are in official repos or AUR
- Warn about deep AUR dependency chains
6. **Final Recommendation**:
- ✅ Safe to install (with command)
- ⚠️ Proceed with caution (explain risks)
- ⛔ Do not install (explain why)
7. **Alternative Suggestions**:
- Suggest official repo alternatives if available
- Suggest better-maintained AUR packages if found
Be comprehensive and explain security implications."""
)
)
]
)
elif name == "mirror_optimization":
country = arguments.get("country", "")
return GetPromptResult(
description="Mirror optimization workflow",
messages=[
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text=f"""Please optimize repository mirrors:
1. **List and Test Current Mirrors**:
- Run optimize_mirrors(action='status', auto_test=True)
- Show currently configured mirrors with their speeds
- Identify slow mirrors (> 500ms)
2. **Suggest Optimal Mirrors**:
- Run optimize_mirrors(action='suggest'{f', country="{country}"' if country else ''}, limit=10)
- Based on geographic location and current status
- Show top 10 recommended mirrors
3. **Health Check**:
- Run optimize_mirrors(action='health')
- Identify any configuration issues
- Check for outdated or unreachable mirrors
4. **Recommendations**:
- Suggest mirror configuration changes
- Provide commands to update /etc/pacman.d/mirrorlist
- Recommend using reflector or manual configuration
5. **Expected Benefits**:
- Estimate download speed improvements
- Reduced update times
- Better reliability
Be detailed and provide specific mirror URLs and configuration commands."""
)
)
]
)
elif name == "system_health_check":
if not IS_ARCH:
return GetPromptResult(
description="System health check",
messages=[
PromptMessage(
role="assistant",
content=PromptMessage.TextContent(
type="text",
text=create_platform_error_message("system_health_check prompt")
)
)
]
)
return GetPromptResult(
description="Comprehensive system health check",
messages=[
PromptMessage(
role="user",
content=PromptMessage.TextContent(
type="text",
text="""Please perform a comprehensive system health diagnostic:
1. **System Information**:
- Run get_system_info
- Review kernel version, uptime, memory usage
- Check for abnormalities
2. **Disk Space Analysis**:
- Run check_disk_space
- Identify partitions with low space
- Run get_pacman_cache_stats
- Calculate total reclaimable space
3. **Service Health**:
- Run check_failed_services
- List all failed systemd services
- If failures found, run get_boot_logs to investigate
4. **Package Database Health**:
- Run check_database_freshness
- Check when last synchronized
- Run find_failed_transactions
- Identify any package operation failures
5. **Package Integrity**:
- Run manage_orphans with action='list'
- Count orphaned packages and space used
- Suggest running verify_package_integrity on critical packages
6. **Configuration Health**:
- Run analyze_pacman_conf (focus='full') for full config
- Run analyze_pacman_conf (focus='ignored_packages') to check for ignored critical packages
- Warn about critical packages being ignored
7. **Mirror Health**:
- Run optimize_mirrors(action='health')
- Identify mirror issues
8. **Summary Report**:
- Overall health status (Healthy/Warnings/Critical)
- List of issues found with severity levels
- Prioritized recommendations for fixes
- Estimate of system optimization potential
Be thorough and provide actionable recommendations with specific commands."""
)
)
]
)
else:
raise ValueError(f"Unknown prompt: {name}")