test_tools.pyโข3.8 kB
#!/usr/bin/env python3
"""
Test script for instant_review and debt_orchestrator tools
"""
import sys
import asyncio
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.orchestrator import OrchestrationEngine
from src.core.db_wrapper import ThreadSafeDB
async def test_tools():
print("๐งช Testing SCS-MCP Analysis Tools")
print("=" * 60)
# Initialize database
db_path = Path(".claude-symbols/search.db")
db = ThreadSafeDB(db_path)
# Initialize orchestrator
orchestrator = OrchestrationEngine(db, project_root=".")
# Test instant_review
print("\n1๏ธโฃ Testing instant_review tool...")
print("-" * 40)
test_code = '''
def calculate_total(items):
total = 0
for item in items:
if item > 0:
total = total + item
return total
def process_data(data):
result = []
for i in range(len(data)):
if data[i] != None:
result.append(data[i] * 2)
return result
'''
try:
result = await orchestrator.execute_flow("instant_review", {"code": test_code})
if result and "agents" in result:
print("โ
instant_review executed successfully")
if "review_aggregator" in result["agents"]:
review = result["agents"]["review_aggregator"]
if "output" in review:
print("Review output received (truncated):")
print(str(review["output"])[:200] + "...")
else:
print("โ ๏ธ instant_review returned minimal output")
print(f"Result: {result}")
except Exception as e:
print(f"โ instant_review failed: {e}")
# Test debt_orchestrator
print("\n2๏ธโฃ Testing debt_orchestrator tool...")
print("-" * 40)
try:
# Create a small test file
test_file = Path("test_temp.py")
test_file.write_text(test_code)
result = await orchestrator.execute_flow("debt_orchestrator", {
"code": test_code,
"file_path": "test_temp.py"
})
if result:
print("โ
debt_orchestrator executed successfully")
if isinstance(result, dict):
print(f"Results keys: {list(result.keys())}")
else:
print("โ ๏ธ debt_orchestrator returned minimal output")
# Clean up test file
test_file.unlink()
except Exception as e:
print(f"โ debt_orchestrator failed: {e}")
# Test with real file
print("\n3๏ธโฃ Testing with real project file...")
print("-" * 40)
real_file = Path("src/core/clean_search.py")
if real_file.exists():
try:
# Read first 50 lines for quick test
with open(real_file, 'r') as f:
lines = f.readlines()[:50]
sample_code = ''.join(lines)
result = await orchestrator.execute_flow("instant_review", {"code": sample_code})
if result:
print("โ
Real file review successful")
else:
print("โ ๏ธ Real file review returned minimal output")
except Exception as e:
print(f"โ Real file review failed: {e}")
print("\n" + "=" * 60)
print("๐ก Analysis:")
# Check if flows are defined
if hasattr(orchestrator, 'flows'):
print(f"Available flows: {list(orchestrator.flows.keys()) if orchestrator.flows else 'None'}")
# Check if agents are initialized
if hasattr(orchestrator, 'agents'):
print(f"Agent manager initialized: {orchestrator.agents is not None}")
print("\nโจ Test complete!")
if __name__ == "__main__":
asyncio.run(test_tools())