We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/Charleslotto/klipper-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
Spoolman Integration Tools
Filament spool tracking and management
"""
import json
from typing import Optional
import aiohttp
import config
def register_spoolman_tools(mcp):
"""Register Spoolman tools."""
async def spoolman_request(method: str, endpoint: str, json_data: dict = None):
"""Make request to Spoolman API."""
if not config.SPOOLMAN_ENABLED:
return {"error": "Spoolman not enabled. Set SPOOLMAN_ENABLED=true"}
url = f"{config.SPOOLMAN_URL}/api/v1{endpoint}"
try:
async with aiohttp.ClientSession() as session:
async with session.request(method, url, json=json_data) as response:
if response.status == 404:
return {"error": f"Spoolman endpoint not found: {endpoint}"}
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
return {"error": f"Spoolman connection error: {str(e)}"}
@mcp.tool()
async def list_spools(filament_type: Optional[str] = None) -> str:
"""
List all filament spools in Spoolman.
Args:
filament_type: Optional filter by type (e.g., 'PLA', 'ABS', 'PETG')
"""
result = await spoolman_request("GET", "/spool")
if "error" in result:
return json.dumps(result)
spools = result if isinstance(result, list) else []
# Filter by type if specified
if filament_type:
filament_type_lower = filament_type.lower()
spools = [s for s in spools if s.get("filament", {}).get("material", "").lower() == filament_type_lower]
formatted = []
for spool in spools:
filament = spool.get("filament", {})
formatted.append({
"id": spool.get("id"),
"name": filament.get("name"),
"material": filament.get("material"),
"color": filament.get("color_hex"),
"vendor": filament.get("vendor", {}).get("name"),
"remaining_weight_g": spool.get("remaining_weight"),
"used_weight_g": spool.get("used_weight"),
"first_used": spool.get("first_used"),
"last_used": spool.get("last_used"),
})
return json.dumps({
"count": len(formatted),
"spools": formatted
}, indent=2)
@mcp.tool()
async def get_spool_details(spool_id: int) -> str:
"""
Get detailed information about a specific spool.
Args:
spool_id: The spool ID from Spoolman
"""
result = await spoolman_request("GET", f"/spool/{spool_id}")
if "error" in result:
return json.dumps(result)
spool = result
filament = spool.get("filament", {})
return json.dumps({
"id": spool.get("id"),
"filament": {
"name": filament.get("name"),
"material": filament.get("material"),
"color": filament.get("color_hex"),
"diameter_mm": filament.get("diameter"),
"density_g_cm3": filament.get("density"),
"settings": {
"print_temp": filament.get("settings", {}).get("print_temp"),
"bed_temp": filament.get("settings", {}).get("bed_temp"),
}
},
"vendor": spool.get("filament", {}).get("vendor", {}).get("name"),
"weight": {
"initial_g": spool.get("initial_weight"),
"remaining_g": spool.get("remaining_weight"),
"used_g": spool.get("used_weight"),
"remaining_percent": round(
spool.get("remaining_weight", 0) / spool.get("initial_weight", 1) * 100, 1
) if spool.get("initial_weight") else None
},
"price": spool.get("price"),
"location": spool.get("location"),
"lot_nr": spool.get("lot_nr"),
"comment": spool.get("comment"),
"first_used": spool.get("first_used"),
"last_used": spool.get("last_used"),
}, indent=2)
@mcp.tool()
async def get_active_spool() -> str:
"""Get the currently active spool (if set via Moonraker-Spoolman integration)."""
from moonraker import get_client
client = get_client()
# Query Moonraker for active spool (if spoolman integration is configured)
result = await client.query_printer_objects({
"spoolman": ["spool_id", "filament_name", "filament_used"]
})
if "error" in result:
# Spoolman object may not exist in Moonraker
return json.dumps({
"message": "Spoolman integration not configured in Moonraker",
"hint": "Add [spoolman] section to moonraker.conf"
})
status = result.get("result", {}).get("status", {})
spoolman = status.get("spoolman", {})
if not spoolman or not spoolman.get("spool_id"):
return json.dumps({"active_spool": None, "message": "No spool currently active"})
return json.dumps({
"spool_id": spoolman.get("spool_id"),
"filament_name": spoolman.get("filament_name"),
"filament_used_mm": spoolman.get("filament_used"),
}, indent=2)
@mcp.tool()
async def set_active_spool(spool_id: int) -> str:
"""
Set the active spool for tracking filament usage.
Args:
spool_id: The spool ID to set as active
"""
from moonraker import get_client
client = get_client()
result = await client.run_gcode(f"SET_ACTIVE_SPOOL ID={spool_id}")
if "error" in result:
return json.dumps({"error": result["error"]})
return json.dumps({
"success": True,
"active_spool_id": spool_id
})
@mcp.tool()
async def clear_active_spool() -> str:
"""Clear the currently active spool."""
from moonraker import get_client
client = get_client()
result = await client.run_gcode("CLEAR_ACTIVE_SPOOL")
if "error" in result:
return json.dumps({"error": result["error"]})
return json.dumps({
"success": True,
"message": "Active spool cleared"
})
@mcp.tool()
async def get_filament_vendors() -> str:
"""List all filament vendors in Spoolman."""
result = await spoolman_request("GET", "/vendor")
if "error" in result:
return json.dumps(result)
vendors = result if isinstance(result, list) else []
formatted = []
for vendor in vendors:
formatted.append({
"id": vendor.get("id"),
"name": vendor.get("name"),
"comment": vendor.get("comment"),
})
return json.dumps({
"count": len(formatted),
"vendors": formatted
}, indent=2)
@mcp.tool()
async def get_low_filament_spools(threshold_grams: int = 100) -> str:
"""
Get spools that are running low on filament.
Args:
threshold_grams: Warn if remaining weight is below this (default: 100g)
"""
result = await spoolman_request("GET", "/spool")
if "error" in result:
return json.dumps(result)
spools = result if isinstance(result, list) else []
low_spools = []
for spool in spools:
remaining = spool.get("remaining_weight", 0)
if remaining < threshold_grams and remaining > 0: # Exclude empty/archived
filament = spool.get("filament", {})
low_spools.append({
"id": spool.get("id"),
"name": filament.get("name"),
"material": filament.get("material"),
"color": filament.get("color_hex"),
"remaining_g": remaining,
"location": spool.get("location"),
})
# Sort by remaining weight
low_spools.sort(key=lambda x: x.get("remaining_g", 0))
return json.dumps({
"threshold_grams": threshold_grams,
"low_spools_count": len(low_spools),
"spools": low_spools
}, indent=2)
@mcp.tool()
async def get_filament_usage_by_material() -> str:
"""Get total filament usage grouped by material type."""
result = await spoolman_request("GET", "/spool")
if "error" in result:
return json.dumps(result)
spools = result if isinstance(result, list) else []
usage_by_material = {}
for spool in spools:
filament = spool.get("filament", {})
material = filament.get("material", "Unknown")
used = spool.get("used_weight", 0) or 0
if material not in usage_by_material:
usage_by_material[material] = {
"total_used_g": 0,
"spool_count": 0
}
usage_by_material[material]["total_used_g"] += used
usage_by_material[material]["spool_count"] += 1
# Format and sort by usage
formatted = []
for material, data in usage_by_material.items():
formatted.append({
"material": material,
"total_used_g": round(data["total_used_g"], 1),
"total_used_kg": round(data["total_used_g"] / 1000, 2),
"spool_count": data["spool_count"]
})
formatted.sort(key=lambda x: x["total_used_g"], reverse=True)
return json.dumps({
"usage_by_material": formatted
}, indent=2)