from mcp.server.fastmcp import FastMCP
from typing import List, Dict, Any
import json
import os
# Import existing tool logic
from cmdb_tool import CMDBTool
from fw_tool import FirewallTool
# Initialize FastMCP Server
mcp = FastMCP("NetSec Guardian Agents")
# Initialize Tool Instances
# Note: Ideally these should be singletons or initialized per request if stateless
# For now, we initialize them once at startup
fw_tool = FirewallTool()
cmdb_tool = CMDBTool()
@mcp.tool()
def search_cmdb_asset(keyword: str) -> str:
"""
Search for asset information in the CMDB based on a keyword.
Args:
keyword: The search term (System Name, IP, or Abbreviation).
Returns:
JSON string containing a list of matching asset details (IP, Name, Owner, etc.).
"""
try:
results = cmdb_tool.search_cmdb_asset(keyword)
# Handle case where lower layer returns empty list but we want to confirm if it was error or just no results
# Assuming lower layer raises exception on error now.
return json.dumps(results, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "suggestion": "Please check MCP environment configuration."}, ensure_ascii=False)
@mcp.tool()
def get_firewall_object(ip_address: str) -> str:
"""
Get detailed information about an address object from the Firewall.
Args:
ip_address: The IP address to query (e.g., "10.1.1.5").
Returns:
JSON string containing object details (Name, Zone, Tags, Description).
"""
try:
result = fw_tool.get_fw_object_info(ip_address)
return json.dumps(result, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e), "suggestion": "Please check MCP environment configuration."}, ensure_ascii=False)
@mcp.tool()
def search_firewall_policy(source_ip: str, destination_ip: str = "any") -> str:
"""
Search for security rules in the Firewall configuration.
Args:
source_ip: The source IP address to filter by.
destination_ip: The destination IP address to filter by (default: "any").
Returns:
JSON string containing a list of matching security rules (Rule Name, Action, Service, etc.).
"""
try:
results = fw_tool.search_security_policy(source_ip, destination_ip=destination_ip)
return json.dumps(results, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
def main():
mcp.run()
if __name__ == "__main__":
main()