ip_addresses.py•7.9 kB
"""MCP tools for IP address queries."""
import logging
from typing import Any, Dict, List, Optional
from mcp.types import Tool, TextContent
from src.netbox_client import NetBoxClient
from src.state_confidence import StateConfidenceClient
logger = logging.getLogger(__name__)
def get_ip_tools(
    netbox_client: NetBoxClient, state_client: Optional[StateConfidenceClient]
) -> List[Tool]:
    """Get all IP address-related MCP tools.
    Args:
        netbox_client: NetBox API client
        state_client: Optional state confidence client
    Returns:
        List of Tool definitions
    """
    return [
        Tool(
            name="list_ips",
            description="List IP addresses from NetBox. Supports filtering by address or device name.",
            inputSchema={
                "type": "object",
                "properties": {
                    "address": {
                        "type": "string",
                        "description": "Filter by IP address (with CIDR notation if needed)",
                    },
                    "device": {
                        "type": "string",
                        "description": "Filter by device name",
                    },
                    "limit": {
                        "type": "integer",
                        "description": "Maximum number of results (default: 100)",
                        "default": 100,
                    },
                    "include_certainty": {
                        "type": "boolean",
                        "description": "Include state confidence scores in response",
                        "default": True,
                    },
                },
            },
        ),
        Tool(
            name="get_ip",
            description="Get detailed information about a specific IP address, including assignments.",
            inputSchema={
                "type": "object",
                "properties": {
                    "ip_address": {
                        "type": "string",
                        "description": "IP address (with CIDR notation if needed)",
                    },
                    "include_certainty": {
                        "type": "boolean",
                        "description": "Include state confidence score in response",
                        "default": True,
                    },
                },
                "required": ["ip_address"],
            },
        ),
        Tool(
            name="search_ips",
            description="Search for IP addresses by address or hostname. Returns matching IP addresses.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query (IP address or hostname)",
                    },
                    "limit": {
                        "type": "integer",
                        "description": "Maximum number of results (default: 50)",
                        "default": 50,
                    },
                    "include_certainty": {
                        "type": "boolean",
                        "description": "Include state confidence scores in response",
                        "default": True,
                    },
                },
                "required": ["query"],
            },
        ),
    ]
async def handle_list_ips(
    arguments: Dict[str, Any],
    netbox_client: NetBoxClient,
    state_client: Optional[StateConfidenceClient],
) -> List[TextContent]:
    """Handle list_ips tool call.
    Args:
        arguments: Tool arguments
        netbox_client: NetBox API client
        state_client: Optional state confidence client
    Returns:
        List of TextContent with results
    """
    address = arguments.get("address")
    device = arguments.get("device")
    limit = arguments.get("limit", 100)
    include_certainty = arguments.get("include_certainty", True)
    ip_addresses = netbox_client.list_ip_addresses(address=address, device=device, limit=limit)
    results = []
    for ip_addr in ip_addresses:
        result = ip_addr.copy()
        if include_certainty and state_client:
            ip_addr_str = ip_addr.get("address", "")
            # Remove CIDR notation for state_id lookup
            state_id = ip_addr_str.split("/")[0] if "/" in ip_addr_str else ip_addr_str
            certainty = state_client.get_certainty_score(
                state_id=state_id, state_type="ip_address"
            )
            if certainty:
                result["certainty"] = certainty
        results.append(result)
    import json
    return [
        TextContent(
            type="text",
            text=f"Found {len(results)} IP address(es):\n\n{json.dumps(results, indent=2)}",
        )
    ]
async def handle_get_ip(
    arguments: Dict[str, Any],
    netbox_client: NetBoxClient,
    state_client: Optional[StateConfidenceClient],
) -> List[TextContent]:
    """Handle get_ip tool call.
    Args:
        arguments: Tool arguments
        netbox_client: NetBox API client
        state_client: Optional state confidence client
    Returns:
        List of TextContent with result
    """
    ip_address = arguments.get("ip_address")
    if not ip_address:
        return [
            TextContent(
                type="text",
                text="Error: ip_address parameter is required",
            )
        ]
    include_certainty = arguments.get("include_certainty", True)
    ip_addr = netbox_client.get_ip_address(ip_address)
    if not ip_addr:
        return [
            TextContent(
                type="text",
                text=f"IP address '{ip_address}' not found in NetBox",
            )
        ]
    result = ip_addr.copy()
    if include_certainty and state_client:
        # Remove CIDR notation for state_id lookup
        state_id = ip_address.split("/")[0] if "/" in ip_address else ip_address
        certainty = state_client.get_certainty_score(
            state_id=state_id, state_type="ip_address"
        )
        if certainty:
            result["certainty"] = certainty
    import json
    return [
        TextContent(
            type="text",
            text=f"IP address details:\n\n{json.dumps(result, indent=2)}",
        )
    ]
async def handle_search_ips(
    arguments: Dict[str, Any],
    netbox_client: NetBoxClient,
    state_client: Optional[StateConfidenceClient],
) -> List[TextContent]:
    """Handle search_ips tool call.
    Args:
        arguments: Tool arguments
        netbox_client: NetBox API client
        state_client: Optional state confidence client
    Returns:
        List of TextContent with results
    """
    query = arguments.get("query")
    if not query:
        return [
            TextContent(
                type="text",
                text="Error: query parameter is required",
            )
        ]
    limit = arguments.get("limit", 50)
    include_certainty = arguments.get("include_certainty", True)
    ip_addresses = netbox_client.search_ip_addresses(query, limit=limit)
    results = []
    for ip_addr in ip_addresses:
        result = ip_addr.copy()
        if include_certainty and state_client:
            ip_addr_str = ip_addr.get("address", "")
            # Remove CIDR notation for state_id lookup
            state_id = ip_addr_str.split("/")[0] if "/" in ip_addr_str else ip_addr_str
            certainty = state_client.get_certainty_score(
                state_id=state_id, state_type="ip_address"
            )
            if certainty:
                result["certainty"] = certainty
        results.append(result)
    import json
    return [
        TextContent(
            type="text",
            text=f"Found {len(results)} matching IP address(es):\n\n{json.dumps(results, indent=2)}",
        )
    ]