test_acp.pyโข2.89 kB
#!/usr/bin/env python3
"""
Simple test script for ACP functionality.
"""
import asyncio
import sys
import os
# Add the project root to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from katamari_mcp.acp.heuristics import HeuristicEngine, HeuristicTags, RiskLevel, Complexity, Scope
async def test_heuristics():
"""Test the heuristic engine."""
print("๐ง Testing Heuristic Engine...")
engine = HeuristicEngine()
# Test low-risk operation
low_risk_op = {
"type": "create",
"target": "capability",
"description": "Simple web search enhancement"
}
tags = engine.evaluate_operation(low_risk_op)
print(f"Low-risk operation tags: {tags.to_dict()}")
print(f"Can auto-approve: {engine.can_auto_approve(tags)}")
print(f"Needs parallel testing: {engine.needs_parallel_testing(tags)}")
# Test high-risk operation
high_risk_op = {
"type": "modify_core",
"target": "security_validator",
"description": "Modify security validation logic"
}
high_risk_tags = engine.evaluate_operation(high_risk_op)
print(f"\nHigh-risk operation tags: {high_risk_tags.to_dict()}")
print(f"Can auto-approve: {engine.can_auto_approve(high_risk_tags)}")
print(f"Needs parallel testing: {engine.needs_parallel_testing(high_risk_tags)}")
print(f"Needs manual approval: {engine.needs_manual_approval(high_risk_tags)}")
async def test_acp_controller():
"""Test the ACP controller."""
print("\n๐ฎ Testing ACP Controller...")
from katamari_mcp.acp.controller import ACPController
controller = ACPController()
# Test system inspection
inspection = await controller.inspect_capabilities()
print(f"System inspection: {inspection}")
# Test capability proposal
proposal = await controller.propose_capability(
"PDF text extraction with OCR",
{"format": "pdf", "ocr_needed": True}
)
print(f"Capability proposal: {proposal['name']}")
print(f"Heuristic tags: {proposal['heuristic_tags']}")
# Test heuristic application
decision = await controller.apply_heuristics({
"type": "create",
"target": "capability",
"description": "New PDF processing tool"
})
print(f"Heuristic decision: {decision['recommended_actions']}")
async def main():
"""Run all tests."""
print("๐ Testing Katamari MCP ACP System")
print("=" * 50)
try:
await test_heuristics()
await test_acp_controller()
print("\nโ
All ACP tests passed!")
except Exception as e:
print(f"\nโ Test failed: {str(e)}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)