#!/usr/bin/env python3
"""
AAP Controller Credential Management Tool
"""
from typing import Any, Dict, Optional, Union
from fastmcp import FastMCP
from pydantic import Field
from connectors.aap_connector import get_aap_connector
def register_credential_tools(mcp: FastMCP):
"""Register credential management tools with the MCP server"""
@mcp.tool()
def credential_management(
action: str = Field(description="Action: list_credentials, create, update, delete, list_credential_types, create_credential_type, list_tokens, create_token, delete_token"),
credential_id: Optional[Union[int, float]] = Field(None, description="Credential ID"),
credential_type_id: Optional[Union[int, float]] = Field(None, description="Credential type ID"),
token_id: Optional[Union[int, float]] = Field(None, description="Token ID"),
credential_data: Optional[Dict[str, Any]] = Field(None, description="Credential data"),
credential_type_data: Optional[Dict[str, Any]] = Field(None, description="Credential type data"),
token_data: Optional[Dict[str, Any]] = Field(None, description="Token data"),
filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing")
) -> Dict[str, Any]:
"""
Credential and authentication management tool.
Handles credentials, credential types, and access tokens.
"""
try:
# Credential Operations
if action == "list_credentials":
params = filters or {}
return get_aap_connector().get("credentials/", params)
elif action == "create":
if not credential_data:
return {"error": "credential_data is required"}
return get_aap_connector().post("credentials/", credential_data)
elif action == "update":
if not credential_id or not credential_data:
return {"error": "credential_id and credential_data are required"}
return get_aap_connector().patch(f"credentials/{credential_id}/", credential_data)
elif action == "delete":
if not credential_id:
return {"error": "credential_id is required"}
return get_aap_connector().delete(f"credentials/{credential_id}/")
# Credential Type Operations
elif action == "list_credential_types":
params = filters or {}
return get_aap_connector().get("credential_types/", params)
elif action == "create_credential_type":
if not credential_type_data:
return {"error": "credential_type_data is required"}
return get_aap_connector().post("credential_types/", credential_type_data)
# Token Operations
elif action == "list_tokens":
params = filters or {}
return get_aap_connector().get("tokens/", params)
elif action == "create_token":
if not token_data:
return {"error": "token_data is required"}
return get_aap_connector().post("tokens/", token_data)
elif action == "delete_token":
if not token_id:
return {"error": "token_id is required"}
return get_aap_connector().delete(f"tokens/{token_id}/")
else:
return {"error": f"Unknown action: {action}"}
except Exception as e:
return {"error": f"Credential management failed: {str(e)}"}