#!/usr/bin/env python3
"""
Test suite for verifying token breakdown in MCP vs Direct Search comparison.
This test suite ensures that:
1. Token counting is accurate for both input and output
2. MCP uses significantly fewer tokens than direct search
3. The breakdown correctly separates input vs output tokens
"""
import json
import os
# Add parent directories to path
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from mcp_server.utils.direct_searcher import DirectSearcher
from mcp_server.utils.mcp_client_wrapper import MCPClientWrapper
from mcp_server.utils.token_counter import TokenCounter
class TestTokenBreakdown(unittest.TestCase):
"""Test token breakdown functionality."""
@classmethod
def setUpClass(cls):
"""Set up test environment."""
# Create test workspace with sample files
cls.test_dir = tempfile.mkdtemp()
cls.test_files = cls._create_test_files()
# Initialize components
cls.token_counter = TokenCounter()
cls.direct_searcher = DirectSearcher()
cls.mcp_client = MCPClientWrapper() # Will use mock mode if no index
@classmethod
def _create_test_files(cls):
"""Create test files with known content."""
files = {}
# Create a Python file with a class
python_content = '''
class PluginManager:
"""High-level plugin management and lifecycle operations.
This class handles loading, initialization, and coordination
of all plugins in the MCP system.
"""
def __init__(self, config=None):
self.config = config or {}
self.plugins = {}
self.initialized = False
def load_plugin(self, plugin_name):
"""Load a single plugin by name."""
# Implementation details
pass
def initialize_all(self):
"""Initialize all loaded plugins."""
for name, plugin in self.plugins.items():
plugin.initialize()
self.initialized = True
'''
py_file = Path(cls.test_dir) / "plugin_manager.py"
py_file.write_text(python_content)
files["plugin_manager.py"] = {
"path": str(py_file),
"content": python_content,
"tokens": len(python_content) // 4, # Rough estimate
}
# Create another file with TODO comments
todo_content = """
def process_data(data):
# TODO: Add input validation
result = []
for item in data:
# FIXME: Handle edge cases
if item > 0:
result.append(item * 2)
# TODO: Add logging
return result
"""
todo_file = Path(cls.test_dir) / "processor.py"
todo_file.write_text(todo_content)
files["processor.py"] = {
"path": str(todo_file),
"content": todo_content,
"tokens": len(todo_content) // 4,
}
return files
def test_symbol_search_token_breakdown(self):
"""Test token breakdown for symbol search."""
symbol = "PluginManager"
# 1. MCP Search (simulated)
mcp_query = f"symbol:{symbol}"
mcp_input_tokens = len(mcp_query) // 4
# MCP returns structured JSON with just the relevant info
mcp_response = {
"query": mcp_query,
"results": [
{
"symbol": symbol,
"file": "plugin_manager.py",
"line": 2,
"kind": "class",
"signature": "class PluginManager(config=None)",
"documentation": "High-level plugin management...",
}
],
"total_matches": 1,
"search_time_ms": 25,
}
mcp_response_text = json.dumps(mcp_response, indent=2)
mcp_output_tokens = len(mcp_response_text) // 4
# 2. Direct Search
grep_command = f"grep -n 'class {symbol}' {self.test_dir}"
direct_input_tokens = len(grep_command) // 4
# Direct search requires reading the entire file
file_content = self.test_files["plugin_manager.py"]["content"]
direct_output_tokens = len(file_content) // 4
# Assertions
self.assertLess(
mcp_input_tokens, direct_input_tokens, "MCP query should use fewer input tokens"
)
self.assertLess(
mcp_output_tokens,
direct_output_tokens,
"MCP should return fewer output tokens (just relevant snippet)",
)
# Calculate token reduction
total_mcp = mcp_input_tokens + mcp_output_tokens
total_direct = direct_input_tokens + direct_output_tokens
reduction = 1 - (total_mcp / total_direct)
self.assertGreater(reduction, 0.8, f"MCP should reduce tokens by >80%, got {reduction:.1%}")
# Print detailed breakdown
print("\nSymbol Search Token Breakdown:")
print(" MCP:")
print(f" Input: {mcp_input_tokens} tokens ('{mcp_query}')")
print(f" Output: {mcp_output_tokens} tokens (structured JSON)")
print(f" Total: {total_mcp} tokens")
print(" Direct:")
print(f" Input: {direct_input_tokens} tokens (grep command)")
print(f" Output: {direct_output_tokens} tokens (entire file)")
print(f" Total: {total_direct} tokens")
print(f" Reduction: {reduction:.1%}")
def test_pattern_search_token_breakdown(self):
"""Test token breakdown for pattern search."""
pattern = "TODO|FIXME"
# 1. MCP Pattern Search
mcp_query = f"pattern:{pattern}"
mcp_input_tokens = len(mcp_query) // 4
# MCP returns matches with snippets
mcp_response = {
"query": mcp_query,
"pattern": pattern,
"results": [
{
"file": "processor.py",
"line": 2,
"snippet": " # TODO: Add input validation",
"context": {"before": "def process_data(data):", "after": " result = []"},
},
{
"file": "processor.py",
"line": 6,
"snippet": " # FIXME: Handle edge cases",
"context": {"before": " for item in data:", "after": " if item > 0:"},
},
],
"total_matches": 2,
"search_time_ms": 35,
}
mcp_response_text = json.dumps(mcp_response, indent=2)
mcp_output_tokens = len(mcp_response_text) // 4
# 2. Direct Pattern Search
grep_command = f"grep -n -C 2 '{pattern}' {self.test_dir}"
direct_input_tokens = len(grep_command) // 4
# Direct search returns matches with context lines
# Estimate: each match with 2 lines of context = ~150 chars = ~38 tokens
direct_output_tokens = 2 * 38 # 2 matches
# Assertions
total_mcp = mcp_input_tokens + mcp_output_tokens
total_direct = direct_input_tokens + direct_output_tokens
# For pattern search, the difference might be smaller but still significant
self.assertLess(
total_mcp, total_direct, "MCP should use fewer total tokens for pattern search"
)
print("\nPattern Search Token Breakdown:")
print(" MCP:")
print(f" Input: {mcp_input_tokens} tokens")
print(f" Output: {mcp_output_tokens} tokens (JSON with snippets)")
print(f" Total: {total_mcp} tokens")
print(" Direct:")
print(f" Input: {direct_input_tokens} tokens")
print(f" Output: {direct_output_tokens} tokens (grep output)")
print(f" Total: {total_direct} tokens")
def test_semantic_search_token_comparison(self):
"""Test token comparison for semantic search (MCP only)."""
query = "plugin initialization and lifecycle management"
# MCP Semantic Search
mcp_input_tokens = len(query) // 4
mcp_response = {
"query": f"semantic:{query}",
"results": [
{
"file": "plugin_manager.py",
"line": 18,
"snippet": "def initialize_all(self):",
"score": 0.92,
"relevance": "Method for initializing all plugins",
}
],
"search_time_ms": 450,
}
mcp_response_text = json.dumps(mcp_response, indent=2)
mcp_output_tokens = len(mcp_response_text) // 4
# Direct search equivalent would need to:
# 1. Read ALL files in the codebase
# 2. Send them to an LLM for semantic analysis
# Estimate for just our test files
total_codebase_tokens = sum(f["tokens"] for f in self.test_files.values())
# Assertions
total_mcp = mcp_input_tokens + mcp_output_tokens
self.assertLess(
total_mcp,
total_codebase_tokens,
"MCP semantic search should use fewer tokens than reading entire codebase",
)
reduction = 1 - (total_mcp / total_codebase_tokens)
self.assertGreater(
reduction, 0.9, f"Semantic search should reduce tokens by >90%, got {reduction:.1%}"
)
print("\nSemantic Search Token Comparison:")
print(" MCP:")
print(f" Input: {mcp_input_tokens} tokens")
print(f" Output: {mcp_output_tokens} tokens")
print(f" Total: {total_mcp} tokens")
print(" Direct (hypothetical):")
print(f" Would need to read entire codebase: {total_codebase_tokens} tokens")
print(f" Reduction: {reduction:.1%}")
def test_token_counting_accuracy(self):
"""Test that our token counting is reasonably accurate."""
test_texts = [
("Hello world", 2), # ~2 tokens
("def function_name():", 5), # ~5 tokens
("class MyClass(BaseClass):", 6), # ~6 tokens
('{"key": "value", "number": 123}', 8), # ~8 tokens
]
counter = TokenCounter()
for text, expected in test_texts:
# Our simple estimation (4 chars = 1 token)
estimated = len(text) // 4
# Should be within 50% of expected
self.assertLess(
abs(estimated - expected) / expected,
0.5,
f"Token estimation for '{text}' is off: {estimated} vs {expected}",
)
def test_cost_calculation_with_breakdown(self):
"""Test cost calculation based on token breakdown."""
# Simulate a search with known token counts
input_tokens = 10
output_tokens = 200
counter = TokenCounter()
counter.input_tokens = input_tokens
counter.output_tokens = output_tokens
# Test different models
models = ["gpt-4", "claude-3-sonnet", "gpt-3.5-turbo"]
print(f"\nCost Analysis for {input_tokens} input + {output_tokens} output tokens:")
for model in models:
cost = counter.estimate_cost(model=model)
self.assertGreater(cost, 0, f"Cost for {model} should be positive")
print(f" {model}: ${cost:.6f}")
@classmethod
def tearDownClass(cls):
"""Clean up test files."""
import shutil
shutil.rmtree(cls.test_dir)
class TestTokenReductionScenarios(unittest.TestCase):
"""Test various scenarios showing token reduction benefits."""
def test_multi_file_symbol_search(self):
"""Test token usage when searching across multiple files."""
# Scenario: Finding all usages of a symbol across 10 files
# MCP approach
mcp_input = 15 # "symbol:authenticate"
mcp_output = 500 # JSON with 10 results, each ~50 tokens
mcp_total = mcp_input + mcp_output
# Direct approach
direct_input = 100 # Complex grep command
# Need to read 10 files, each ~5000 tokens
direct_output = 10 * 5000
direct_total = direct_input + direct_output
reduction = 1 - (mcp_total / direct_total)
print("\nMulti-file Symbol Search:")
print(f" MCP: {mcp_total:,} tokens")
print(f" Direct: {direct_total:,} tokens")
print(f" Reduction: {reduction:.1%}")
self.assertGreater(reduction, 0.98, "Multi-file search should show >98% reduction")
def test_refactoring_scenario(self):
"""Test token usage for a refactoring scenario."""
# Scenario: Rename a method used in 20 places
# MCP approach
# 1. Find all references
mcp_find_input = 20 # "references:old_method"
mcp_find_output = 800 # 20 locations with context
# 2. Make targeted edits (just the specific lines)
mcp_edit_tokens = 20 * 50 # 20 edits, ~50 tokens each
mcp_total = mcp_find_input + mcp_find_output + mcp_edit_tokens
# Direct approach
# 1. Search for the method
direct_search_input = 50
direct_search_output = 2000 # Grep output
# 2. Read and edit 20 entire files
direct_file_tokens = 20 * 5000 # 20 files, ~5000 tokens each
direct_total = direct_search_input + direct_search_output + direct_file_tokens
reduction = 1 - (mcp_total / direct_total)
print("\nRefactoring Scenario (20 files):")
print(f" MCP: {mcp_total:,} tokens")
print(f" Direct: {direct_total:,} tokens")
print(f" Reduction: {reduction:.1%}")
self.assertGreater(reduction, 0.97, "Refactoring should show >97% reduction")
if __name__ == "__main__":
unittest.main(verbosity=2)