import os
import requests
import xml.etree.ElementTree as ET
import base64
from typing import List, Dict, Any, Optional
from loguru import logger
class FirewallTool:
def __init__(self, hostname: str = None, username: str = None, password: str = None):
"""
Initialize connection to Palo Alto Networks Firewall / Panorama.
Configuration *must* be provided via Environment Variables or Arguments.
"""
self.hostname = hostname or os.getenv("FW_HOSTNAME")
self.username = username or os.getenv("FW_USERNAME")
self.password = password or os.getenv("FW_PASSWORD")
# We don't raise error here to allow server startup,
# but we will check strict validity before any request.
if self.hostname:
self.base_url = f"https://{self.hostname}/api"
else:
self.base_url = None
# Disable SSL warnings for internal self-signed certs
requests.packages.urllib3.disable_warnings()
# Basic Auth Setup
self.headers = {}
if self.username and self.password:
creds = f"{self.username}:{self.password}"
b64_creds = base64.b64encode(creds.encode()).decode()
self.headers["Authorization"] = f"Basic {b64_creds}"
def _validate_config(self):
missing = []
if not self.hostname: missing.append("FW_HOSTNAME")
if not self.username: missing.append("FW_USERNAME")
if not self.password: missing.append("FW_PASSWORD")
if missing:
raise ValueError(f"Configuration Error: Missing required environment variables: {', '.join(missing)}")
def _post_request(self, params: Dict[str, str]) -> Optional[ET.Element]:
"""
Helper to send requests to PAN-OS API with Basic Auth.
"""
self._validate_config()
try:
response = requests.post(
self.base_url,
data=params,
headers=self.headers,
verify=False,
timeout=10
)
response.raise_for_status()
# Debug: print(response.text)
# Handle non-XML responses (e.g. auth errors might return HTML)
if 'xml' not in response.headers.get('Content-Type', ''):
# PAN-OS sometimes returns text/plain for some errors
pass
root = ET.fromstring(response.content)
status = root.get('status')
if status == 'success':
return root
else:
# Handle standard API errors
msg = root.find('./result/msg')
error_text = msg.text if msg is not None else "Unknown Error"
logger.error(f"[FW] API Error: {error_text}")
return None
except Exception as e:
logger.error(f"[FW] Connection Failed: {e}")
return None
def get_fw_object_info(self, ip_address: str) -> Dict[str, Any]:
"""
Tool: get_fw_object_info
Description: Find if an address object exists for this IP.
"""
# XPath to find address object by ip-netmask value
# Note: This is a robust search for specific IP match
xpath = f"/config/devices/entry/vsys/entry/address/entry[ip-netmask='{ip_address}']"
params = {
'type': 'config',
'action': 'get',
'xpath': xpath
}
root = self._post_request(params)
if root:
result = root.find('./result')
# Check if any entry was found
if result is not None and len(result) > 0:
entry = result.find('./entry')
if entry is not None:
name = entry.get('name')
# Extract Tags
tags = []
tag_root = entry.find('./tag')
if tag_root:
tags = [t.text for t in tag_root.findall('./member')]
description = entry.findtext('description', '')
return {
"found": True,
"name": name,
"ip": ip_address,
"tags": tags,
"description": description,
"type": "address_object"
}
return {
"found": False,
"ip": ip_address,
"message": f"No address object found specifically for {ip_address}"
}
def get_system_info(self) -> Dict[str, Any]:
"""
Get system details (Hostname, Model, Version).
API: type=op&cmd=<show><system><info></info></system></show>
"""
params = {
'type': 'op',
'cmd': '<show><system><info></info></system></show>'
}
root = self._post_request(params)
info = {}
if root is not None:
result = root.find('./result/system')
if result is not None:
info['hostname'] = result.findtext('hostname')
info['model'] = result.findtext('model')
info['version'] = result.findtext('sw-version')
info['is_panorama'] = 'Panorama' in (info['model'] or '')
return info
def search_security_policy(self, source_ip: str, destination_ip: str = "any", debug: bool = False) -> List[Dict[str, Any]]:
"""
Tool: search_security_policy
Description: Retrieve security rules from configuration (Pre/Post/Default).
"""
# Standalone FW typically uses 'rulebase/security/rules'
# But may also have 'pre-rulebase' or 'post-rulebase' if pushed from Panorama
scopes = [
("Default", "/config/devices/entry[@name='localhost.localdomain']/vsys/entry[@name='vsys1']/rulebase/security/rules"),
("Pre", "/config/devices/entry[@name='localhost.localdomain']/vsys/entry[@name='vsys1']/pre-rulebase/security/rules"),
("Post", "/config/devices/entry[@name='localhost.localdomain']/vsys/entry[@name='vsys1']/post-rulebase/security/rules")
]
all_rules = []
for scope_name, xpath in scopes:
params = {
'type': 'config',
'action': 'get',
'xpath': xpath
}
if debug:
logger.info(f"[*] Querying {scope_name} Rules...")
root = self._post_request(params)
if root is not None:
# Result -> rules -> entry
result = root.find('./result/rules')
if result is None:
result = root.find('./result')
if result is not None:
entries = result.findall('./entry')
if debug:
logger.info(f" Found {len(entries)} entries in {scope_name}")
for entry in entries:
rule_name = entry.get('name')
sources = [m.text for m in entry.findall('./source/member')]
destinations = [m.text for m in entry.findall('./destination/member')]
services = [m.text for m in entry.findall('./service/member')]
applications = [m.text for m in entry.findall('./application/member')]
action = entry.findtext('action')
disabled = entry.findtext('disabled') == 'yes'
# Store everything first
all_rules.append({
"scope": scope_name,
"rule_name": rule_name,
"action": action,
"source": sources,
"destination": destinations,
"service": services,
"application": applications,
"disabled": disabled
})
if debug:
logger.info(f"[*] Total Rules Fetched (Raw): {len(all_rules)}")
# Client-side filtering
filtered_rules = []
for r in all_rules:
# Match if: IP in source OR IP in destination OR 'any' is used
match_source = "any" in r['source'] or source_ip in r['source']
match_dest = "any" in r['destination'] or (destination_ip != "any" and destination_ip in r['destination'])
if match_source or match_dest:
filtered_rules.append(r)
return filtered_rules
if __name__ == "__main__":
import os
os.environ["FW_HOSTNAME"] = "10.1.2.150"
os.environ["FW_USERNAME"] = "admin"
os.environ["FW_PASSWORD"] = "SHmz8866."
tool = FirewallTool()
if tool.headers:
logger.info("[*] Basic Auth credentials configured.")
# Test 1: Search Object
# ...
# Test 2: Search Policy
test_ip = "172.17.39.87"
logger.info(f"\n[*] Searching policy for Source: {test_ip} (Debug Mode)")
try:
# Enable Debug to see counts
policies = tool.search_security_policy(source_ip=test_ip, destination_ip="8.8.8.8", debug=True)
logger.info(f"[*] Matched Filter Rules: {len(policies)}")
except Exception as e:
logger.error(f"[!] Policy Search Failed: {e}")