"""
Herramientas de información del sistema.
"""
import json
import os
import platform
from typing import Any, Dict
import psutil
def get_system_info(args: Dict[str, Any]) -> str:
"""Obtener información del sistema."""
try:
info = {
"sistema_operativo": platform.system(),
"version_os": platform.version(),
"release": platform.release(),
"arquitectura": platform.architecture()[0],
"procesador": platform.processor(),
"hostname": platform.node(),
"usuario_actual": os.getlogin() if hasattr(os, "getlogin") else "N/A",
"directorio_trabajo": os.getcwd(),
"directorio_home": os.path.expanduser("~"),
"variables_entorno_count": len(os.environ),
}
return json.dumps(info, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo información del sistema: {str(e)}"
def get_cpu_info(args: Dict[str, Any]) -> str:
"""Obtener información de la CPU."""
try:
info = {
"nucleos_fisicos": psutil.cpu_count(logical=False),
"nucleos_logicos": psutil.cpu_count(logical=True),
"uso_cpu_percent": psutil.cpu_percent(interval=1),
"uso_por_nucleo": psutil.cpu_percent(interval=1, percpu=True),
"frecuencia_actual": psutil.cpu_freq().current
if psutil.cpu_freq()
else "N/A",
"frecuencia_maxima": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A",
"frecuencia_minima": psutil.cpu_freq().min if psutil.cpu_freq() else "N/A",
}
return json.dumps(info, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo información de CPU: {str(e)}"
def get_memory_info(args: Dict[str, Any]) -> str:
"""Obtener información de memoria."""
try:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
info = {
"memoria_total": format_bytes(memory.total),
"memoria_disponible": format_bytes(memory.available),
"memoria_usada": format_bytes(memory.used),
"memoria_libre": format_bytes(memory.free),
"porcentaje_uso": memory.percent,
"swap_total": format_bytes(swap.total),
"swap_usado": format_bytes(swap.used),
"swap_libre": format_bytes(swap.free),
"swap_porcentaje": swap.percent,
}
return json.dumps(info, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo información de memoria: {str(e)}"
def get_disk_info(args: Dict[str, Any]) -> str:
"""Obtener información de discos."""
try:
disks = []
partitions = psutil.disk_partitions()
for partition in partitions:
try:
partition_usage = psutil.disk_usage(partition.mountpoint)
disk_info = {
"dispositivo": partition.device,
"punto_montaje": partition.mountpoint,
"sistema_archivos": partition.fstype,
"total": format_bytes(partition_usage.total),
"usado": format_bytes(partition_usage.used),
"libre": format_bytes(partition_usage.free),
"porcentaje_uso": round(
partition_usage.used / partition_usage.total * 100, 1
),
}
disks.append(disk_info)
except PermissionError:
# Algunos discos pueden no estar accesibles
continue
return json.dumps({"discos": disks}, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo información de discos: {str(e)}"
def get_network_info(args: Dict[str, Any]) -> str:
"""Obtener información de red."""
try:
interfaces = psutil.net_if_addrs()
stats = psutil.net_if_stats()
io_counters = psutil.net_io_counters(pernic=True)
network_info = {}
for interface_name, addresses in interfaces.items():
interface_info = {"direcciones": [], "estadisticas": {}, "contadores": {}}
# Direcciones
for addr in addresses:
addr_info = {
"familia": str(addr.family),
"direccion": addr.address,
"mascara_red": addr.netmask,
"broadcast": addr.broadcast,
}
interface_info["direcciones"].append(addr_info)
# Estadísticas
if interface_name in stats:
stat = stats[interface_name]
interface_info["estadisticas"] = {
"activa": stat.isup,
"velocidad": f"{stat.speed} Mbps" if stat.speed > 0 else "N/A",
"mtu": stat.mtu,
}
# Contadores de I/O
if interface_name in io_counters:
counter = io_counters[interface_name]
interface_info["contadores"] = {
"bytes_enviados": format_bytes(counter.bytes_sent),
"bytes_recibidos": format_bytes(counter.bytes_recv),
"paquetes_enviados": counter.packets_sent,
"paquetes_recibidos": counter.packets_recv,
}
network_info[interface_name] = interface_info
return json.dumps(network_info, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo información de red: {str(e)}"
def get_processes_info(args: Dict[str, Any]) -> str:
"""Obtener información de procesos."""
limit = args.get("limit", 10)
sort_by = args.get("sort_by", "cpu") # cpu, memory, pid, name
try:
processes = []
for proc in psutil.process_iter(
["pid", "name", "cpu_percent", "memory_percent", "status"]
):
try:
proc_info = proc.info
proc_info["memory_mb"] = (
psutil.Process(proc_info["pid"]).memory_info().rss / 1024 / 1024
)
processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Ordenar procesos
if sort_by == "cpu":
processes.sort(key=lambda x: x.get("cpu_percent", 0), reverse=True)
elif sort_by == "memory":
processes.sort(key=lambda x: x.get("memory_percent", 0), reverse=True)
elif sort_by == "pid":
processes.sort(key=lambda x: x.get("pid", 0))
elif sort_by == "name":
processes.sort(key=lambda x: x.get("name", ""))
# Limitar resultados
processes = processes[:limit]
result = {
"total_procesos": len(list(psutil.process_iter())),
"procesos_top": processes,
"ordenado_por": sort_by,
"limite": limit,
}
return json.dumps(result, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo información de procesos: {str(e)}"
def get_environment_variables(args: Dict[str, Any]) -> str:
"""Obtener variables de entorno."""
filter_pattern = args.get("filter", "").upper()
try:
env_vars = {}
for key, value in os.environ.items():
if not filter_pattern or filter_pattern in key.upper():
env_vars[key] = value
result = {
"total_variables": len(os.environ),
"variables_filtradas": len(env_vars),
"filtro": filter_pattern if filter_pattern else "ninguno",
"variables": env_vars,
}
return json.dumps(result, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error obteniendo variables de entorno: {str(e)}"
def format_bytes(bytes_value: int) -> str:
"""Formatear bytes en formato legible."""
if bytes_value == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while bytes_value >= 1024 and i < len(size_names) - 1:
bytes_value /= 1024.0
i += 1
return f"{bytes_value:.1f} {size_names[i]}"
def register_system_tools(tools: Dict[str, Dict[str, Any]]) -> None:
"""Registrar herramientas de sistema."""
tools["get_system_info"] = {
"description": "Obtener información general del sistema operativo",
"handler": get_system_info,
"inputSchema": {"type": "object", "properties": {}},
}
tools["get_cpu_info"] = {
"description": "Obtener información detallada de la CPU",
"handler": get_cpu_info,
"inputSchema": {"type": "object", "properties": {}},
}
tools["get_memory_info"] = {
"description": "Obtener información de memoria RAM y swap",
"handler": get_memory_info,
"inputSchema": {"type": "object", "properties": {}},
}
tools["get_disk_info"] = {
"description": "Obtener información de discos y particiones",
"handler": get_disk_info,
"inputSchema": {"type": "object", "properties": {}},
}
tools["get_network_info"] = {
"description": "Obtener información de interfaces de red",
"handler": get_network_info,
"inputSchema": {"type": "object", "properties": {}},
}
tools["get_processes"] = {
"description": "Listar procesos del sistema con información de uso",
"handler": get_processes_info,
"inputSchema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Número máximo de procesos a mostrar",
"default": 10,
"minimum": 1,
"maximum": 100,
},
"sort_by": {
"type": "string",
"description": "Campo por el que ordenar (cpu, memory, pid, name)",
"enum": ["cpu", "memory", "pid", "name"],
"default": "cpu",
},
},
},
}
tools["get_env_vars"] = {
"description": "Obtener variables de entorno del sistema",
"handler": get_environment_variables,
"inputSchema": {
"type": "object",
"properties": {
"filter": {
"type": "string",
"description": "Filtro para buscar variables específicas (case-insensitive)",
}
},
},
}