#!/usr/bin/env python3
"""
KIA MCP Server Usability Test Script
=====================================
This script tests the usability of the KIA MCP server after configuring API keys.
It demonstrates all 4 core tools with real examples.
Usage:
1. Configure API keys:
export MORPH_API_KEY="your-key"
export CHROMA_API_KEY="your-key"
export OPENROUTER_API_KEY="your-key"
2. Run tests:
python test_usability.py
"""
import asyncio
import sys
from datetime import datetime
from pathlib import Path
# Add current directory to path
sys.path.insert(0, str(Path(__file__).parent))
from fastmcp import Context
import server
class MockContext(Context):
"""Mock context for testing without full MCP client."""
def __init__(self):
self.logs = []
self.progress_updates = []
self.errors = []
async def info(self, message: str):
"""Log info message."""
self.logs.append(f"INFO: {message}")
print(f" ℹ️ {message}")
async def warn(self, message: str):
"""Log warning message."""
self.logs.append(f"WARN: {message}")
print(f" ⚠️ {message}")
async def error(self, message: str):
"""Log error message."""
self.errors.append(message)
print(f" ❌ {message}")
async def report_progress(self, progress: int, total: int):
"""Report progress."""
self.progress_updates.append((progress, total))
percentage = (progress / total * 100) if total > 0 else 0
print(f" 📊 Progress: {progress}/{total} ({percentage:.0f}%)")
def print_section(title: str):
"""Print formatted section header."""
print("\n" + "=" * 70)
print(f" {title}")
print("=" * 70)
def print_result(result: dict, max_length: int = 500):
"""Print result in formatted way."""
print("\n📋 Result:")
for key, value in result.items():
if isinstance(value, str) and len(value) > max_length:
print(f" {key}: {value[:max_length]}... (truncated)")
elif isinstance(value, dict):
print(f" {key}:")
for k, v in value.items():
if isinstance(v, (list, dict)) and len(str(v)) > 100:
print(f" {k}: {str(v)[:100]}... (truncated)")
else:
print(f" {k}: {v}")
elif isinstance(value, list) and len(value) > 3:
print(f" {key}: [{len(value)} items]")
else:
print(f" {key}: {value}")
async def test_evolve_code():
"""Test 1: evolve_code - Transform code to production quality."""
print_section("TEST 1: evolve_code - Production Code Evolution")
# Sample code with issues
sample_code = """
def calc(items):
total = 0
for item in items:
total += item
return total
"""
print("\n📝 Original Code:")
print(sample_code)
print("\n🚀 Starting evolution with quality threshold 0.85...")
ctx = MockContext()
try:
result = await server.evolve_code.fn(
code=sample_code,
tests=None,
quality_threshold=0.85,
max_iterations=5,
ctx=ctx,
)
print_result(result, max_length=300)
print("\n✅ Test 1 PASSED: evolve_code executed successfully")
return True
except Exception as e:
print(f"\n❌ Test 1 FAILED: {e}")
return False
async def test_search_local_codebase():
"""Test 2: search_local_codebase - Search this repository."""
print_section("TEST 2: search_local_codebase - Semantic Code Search")
print("\n🔍 Searching for 'FastMCP server implementation'...")
ctx = MockContext()
try:
result = await server.search_local_codebase.fn(
query="FastMCP server implementation", repo_path=".", max_results=5, ctx=ctx
)
print_result(result)
print("\n✅ Test 2 PASSED: search_local_codebase executed successfully")
return True
except Exception as e:
print(f"\n❌ Test 2 FAILED: {e}")
return False
async def test_search_packages():
"""Test 3: search_packages - Search public packages."""
print_section("TEST 3: search_packages - Public Package Search")
print("\n📦 Searching for 'JWT authentication implementation'...")
ctx = MockContext()
try:
result = await server.search_packages.fn(
query="JWT authentication implementation",
packages=["fastapi", "flask", "django"],
language="python",
max_results=5,
ctx=ctx,
)
print_result(result)
print("\n✅ Test 3 PASSED: search_packages executed successfully")
return True
except Exception as e:
print(f"\n❌ Test 3 FAILED: {e}")
return False
async def test_learn_pattern():
"""Test 4: learn_pattern - Extract coding patterns."""
print_section("TEST 4: learn_pattern - Pattern Learning")
before_code = """
def process(x):
return x * 2
"""
after_code = """
def process(x: int) -> int:
'''Process input value by doubling it.'''
if not isinstance(x, int):
raise TypeError("Expected int")
return x * 2
"""
print("\n📚 Learning pattern from code improvement...")
print("\nBefore:")
print(before_code)
print("\nAfter:")
print(after_code)
ctx = MockContext()
try:
result = await server.learn_pattern.fn(
before_code=before_code,
after_code=after_code,
context="Added type safety and documentation",
ctx=ctx,
)
print_result(result)
print("\n✅ Test 4 PASSED: learn_pattern executed successfully")
return True
except Exception as e:
print(f"\n❌ Test 4 FAILED: {e}")
return False
async def test_server_state():
"""Test 5: Check server state and statistics."""
print_section("TEST 5: Server State & Statistics")
print("\n📊 Current Server Statistics:")
print(f" Total Evolutions: {server.STATE.stats['total_evolutions']}")
print(f" Successful Evolutions: {server.STATE.stats['successful_evolutions']}")
print(f" Patterns Learned: {server.STATE.stats['patterns_learned']}")
print(f" GEPA Evolutions: {server.STATE.stats['gepa_evolutions']}")
print(f" Morph Merges: {server.STATE.stats['morph_merges']}")
print(f" Chroma Searches: {server.STATE.stats['chroma_searches']}")
print(f" Morph Searches: {server.STATE.stats['morph_searches']}")
print("\n🗂️ Pattern Library:")
print(f" Total Patterns: {len(server.STATE.patterns)}")
if server.STATE.patterns:
print("\n Recent Patterns:")
for i, (pid, pattern) in enumerate(list(server.STATE.patterns.items())[:3]):
print(f" {i + 1}. {pattern.name} (confidence: {pattern.confidence:.1%})")
print("\n📜 Evolution History:")
print(f" Total Evolutions Recorded: {len(server.STATE.evolution_history)}")
if server.STATE.evolution_history:
print("\n Recent Evolutions:")
for i, evolution in enumerate(server.STATE.evolution_history[-3:]):
print(
f" {i + 1}. Iterations: {evolution.get('iterations', 0)}, "
f"Improvement: {evolution.get('improvement', 0):.2%}"
)
print("\n✅ Test 5 PASSED: Server state accessed successfully")
return True
async def run_all_tests():
"""Run all usability tests."""
print("\n" + "╔" + "═" * 68 + "╗")
print("║" + " " * 15 + "KIA MCP Server Usability Tests" + " " * 23 + "║")
print("╚" + "═" * 68 + "╝")
# Check API configuration
print("\n🔑 API Configuration:")
print(
f" Morph API: {'✅ Available' if server.MORPH_AVAILABLE else '❌ Not configured'}"
)
print(
f" Chroma API: {'✅ Available' if server.CHROMA_API_KEY else '❌ Not configured'}"
)
print(
f" GEPA (OpenRouter): {'✅ Available' if server.OPENROUTER_AVAILABLE else '❌ Not configured'}"
)
if not any(
[server.MORPH_AVAILABLE, server.CHROMA_API_KEY, server.OPENROUTER_AVAILABLE]
):
print("\n⚠️ WARNING: No API keys configured!")
print(" Tests will run in demonstration mode with simulated results.")
print("\n To test with real APIs, configure keys:")
print(" export MORPH_API_KEY='your-key'")
print(" export CHROMA_API_KEY='your-key'")
print(" export OPENROUTER_API_KEY='your-key'")
# Run tests
results = []
start_time = datetime.now()
print("\n" + "─" * 70)
print("Running Tests...")
print("─" * 70)
# Test 1: evolve_code
results.append(await test_evolve_code())
# Test 2: search_local_codebase
results.append(await test_search_local_codebase())
# Test 3: search_packages
results.append(await test_search_packages())
# Test 4: learn_pattern
results.append(await test_learn_pattern())
# Test 5: server state
results.append(await test_server_state())
# Summary
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print_section("Test Summary")
passed = sum(results)
total = len(results)
print(f"\n📊 Results:")
print(f" Total Tests: {total}")
print(f" Passed: {passed} ✅")
print(f" Failed: {total - passed} ❌")
print(f" Success Rate: {passed / total * 100:.0f}%")
print(f" Duration: {duration:.2f} seconds")
if passed == total:
print("\n🎉 All tests passed! KIA MCP Server is fully operational.")
print("\n✅ Production Ready:")
print(" • All 4 core tools working")
print(" • Error handling functional")
print(" • Progress reporting active")
print(" • State management working")
print("\n🚀 Ready to use with MCP clients (Claude, Cursor, VS Code)")
return 0
else:
print(f"\n⚠️ {total - passed} test(s) failed. Check errors above.")
return 1
async def test_specific_tool(tool_name: str):
"""Test a specific tool by name."""
tests = {
"evolve_code": test_evolve_code,
"search_local_codebase": test_search_local_codebase,
"search_packages": test_search_packages,
"learn_pattern": test_learn_pattern,
"state": test_server_state,
}
if tool_name not in tests:
print(f"❌ Unknown tool: {tool_name}")
print(f"Available tools: {', '.join(tests.keys())}")
return 1
print(f"\n🧪 Testing: {tool_name}")
result = await tests[tool_name]()
return 0 if result else 1
def main():
"""Main entry point."""
import sys
if len(sys.argv) > 1:
# Test specific tool
tool_name = sys.argv[1]
exit_code = asyncio.run(test_specific_tool(tool_name))
else:
# Run all tests
exit_code = asyncio.run(run_all_tests())
sys.exit(exit_code)
if __name__ == "__main__":
main()