# Code Quality Enhancement Plan
## Executive Summary
Based on parallel analysis of the codebase (June 2025), this plan addresses significant opportunities for improvement:
- **30-40% code duplication** in error handling and validation
- **Performance bottlenecks** from synchronous processing and missing caching
- **28.6% of MCP tools** lacking complete documentation
- **Unused code** that could be removed (schema extraction, debug files)
## Analysis Results
### 1. Duplicate Code Patterns Found
#### 1.1 GitHub Function Error Handling Pattern
All GitHub functions in `qdrant_mcp_context_aware.py` follow the same pattern:
```python
try:
error, instances = validate_github_prerequisites(...)
if error:
return error
github_client, _, _, _, _ = instances
# ... operation ...
console_logger.info(...)
return {...}
except Exception as e:
error_msg = f"Failed to ...: {str(e)}"
console_logger.error(error_msg, extra={...})
return {"error": error_msg}
```
This pattern appears in ~30+ GitHub functions.
#### 1.2 HTTP Endpoint Response Pattern
In `http_server.py`, all endpoints follow similar patterns:
```python
async def endpoint():
try:
result = await run_in_executor(...)
return JSONResponse(content=result)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
```
#### 1.3 Other Patterns
- Validation and prerequisites checking
- Logging with extra context
- Collection name generation
- Memory cleanup pattern
- Model loading with fallback
### 2. Unused Code Identified
- **Schema extraction methods** in `config_indexer.py` (`extract_schema()` and helpers)
- **Debug test files** in `tests/debug/` directory
- **Obsolete test functions** like `clear_test_collections()`
### 3. Performance Issues
#### 3.1 Synchronous Processing
- Files processed one-by-one in `index_directory()`
- No batching for Qdrant operations
- Missing parallel processing opportunities
#### 3.2 Inefficient Operations
- Repeated Qdrant client calls in loops
- Full BM25 index rebuilds after each file
- Large data structures passed unnecessarily
- Missing caching for expensive operations
#### 3.3 Memory Management
- Entire files loaded for hashing
- Large result sets held in memory
- Cleanup only every 50 files
### 4. Documentation Gaps
- **16 of 56 MCP tools** missing "WHEN TO USE THIS TOOL" section
- Multiple utility functions without docstrings
- HTTP endpoints with minimal documentation
- Classes missing class-level documentation
## Implementation Plan
### š Priority 1: Code Consolidation (1-2 weeks)
#### 1.1 Create Core Abstractions
**File: `src/core/decorators.py`**
```python
import functools
from typing import Callable, Any, Dict, Tuple
def github_operation(operation_name: str, require_repo: bool = False,
require_projects: bool = False) -> Callable:
"""
Decorator for GitHub operations that handles common patterns:
- Prerequisites validation
- Error handling
- Logging
Args:
operation_name: Human-readable operation name for logging
require_repo: Whether the operation requires a repository
require_projects: Whether the operation requires projects manager
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Dict[str, Any]:
try:
error, instances = validate_github_prerequisites(
require_repo=require_repo,
require_projects=require_projects
)
if error:
return error
# Call the actual function with instances
result = func(instances, *args, **kwargs)
# Log success
console_logger.info(
f"Successfully completed {operation_name}",
extra={
"operation": func.__name__,
"status": "success"
}
)
return result
except Exception as e:
error_msg = f"Failed to {operation_name}: {str(e)}"
console_logger.error(error_msg, extra={
"operation": func.__name__,
"error": str(e),
"error_type": type(e).__name__,
"status": "error"
})
return {"error": error_msg}
return wrapper
return decorator
```
**File: `src/core/handlers.py`**
```python
from fastapi import HTTPException
from fastapi.responses import JSONResponse
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncEndpointHandler:
"""Base handler for HTTP endpoints with common error handling."""
executor = ThreadPoolExecutor(max_workers=4)
@classmethod
async def handle(cls, operation: Callable, *args, **kwargs) -> JSONResponse:
"""
Handle async endpoint execution with standard error handling.
Args:
operation: The synchronous operation to execute
*args, **kwargs: Arguments to pass to the operation
Returns:
JSONResponse with the result or error
"""
try:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
cls.executor,
operation,
*args,
**kwargs
)
return JSONResponse(content=result)
except HTTPException as e:
raise e
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e), "type": type(e).__name__}
)
```
**File: `src/core/logging_utils.py`**
```python
class OperationLogger:
"""Standardized logging for operations."""
@staticmethod
def log_operation(level: str, message: str, operation: str,
duration_ms: float = None, **extra_fields):
"""
Log an operation with standard fields.
Args:
level: Log level (info, warning, error)
message: Log message
operation: Operation name
duration_ms: Operation duration in milliseconds
**extra_fields: Additional fields to include
"""
extra = {
"operation": operation,
"timestamp": datetime.utcnow().isoformat(),
**extra_fields
}
if duration_ms is not None:
extra["duration_ms"] = duration_ms
getattr(console_logger, level)(message, extra=extra)
```
#### 1.2 Implement DRY Patterns
- [ ] Create GitHub operation decorator for 30+ functions
- [ ] Implement HTTP endpoint base handler
- [ ] Create memory management mixin
- [ ] Implement model loading strategy pattern
- [ ] Create collection name generator utility
**Expected Impact**: Remove 800-1000 lines of duplicate code
### š Priority 2: Performance Optimization (2-3 weeks)
#### 2.1 Batch Processing Implementation
**File: `src/core/batch_processing.py`**
```python
import asyncio
from typing import List, Callable, Any, Dict
from pathlib import Path
import concurrent.futures
class BatchFileProcessor:
"""Process files in parallel batches for improved performance."""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
async def process_files(self,
files: List[Path],
processor: Callable,
batch_size: int = 10) -> List[Dict[str, Any]]:
"""
Process files in parallel batches.
Args:
files: List of file paths to process
processor: Function to process each file
batch_size: Number of files to process concurrently
Returns:
List of results from processing
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
# Process batch in parallel
loop = asyncio.get_event_loop()
batch_results = await asyncio.gather(*[
loop.run_in_executor(executor, processor, file)
for file in batch
])
results.extend(batch_results)
# Log progress
OperationLogger.log_operation(
"info",
f"Processed batch {i//batch_size + 1}/{(len(files) + batch_size - 1)//batch_size}",
"batch_processing",
files_processed=len(results),
total_files=len(files)
)
return results
class BatchQdrantOperations:
"""Batch operations for Qdrant to improve performance."""
@staticmethod
def batch_upsert(client, collection_name: str,
points: List[PointStruct],
batch_size: int = 100) -> Dict[str, Any]:
"""
Upsert points in batches for better performance.
Args:
client: Qdrant client
collection_name: Target collection
points: Points to upsert
batch_size: Points per batch
Returns:
Summary of operation
"""
total_points = len(points)
upserted = 0
errors = []
for i in range(0, total_points, batch_size):
batch = points[i:i + batch_size]
try:
client.upsert(
collection_name=collection_name,
points=batch
)
upserted += len(batch)
except Exception as e:
errors.append({
"batch_index": i // batch_size,
"error": str(e)
})
return {
"total_points": total_points,
"upserted": upserted,
"errors": errors if errors else None
}
```
#### 2.2 Caching Layer
**File: `src/core/caching.py`**
```python
from functools import lru_cache, wraps
import time
import hashlib
import json
from typing import Any, Dict, Optional
class TTLCache:
"""Time-based cache with TTL support."""
def __init__(self, ttl_seconds: int = 3600):
self.ttl_seconds = ttl_seconds
self.cache = {}
def get(self, key: str) -> Optional[Any]:
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl_seconds:
return value
else:
del self.cache[key]
return None
def set(self, key: str, value: Any):
self.cache[key] = (value, time.time())
class AnalysisCache:
"""Cache for expensive analysis operations."""
def __init__(self):
self.issue_cache = TTLCache(ttl_seconds=3600) # 1 hour
self.embedding_cache = lru_cache(maxsize=1000)
def cache_issue_analysis(self, issue_id: str, analysis: Dict[str, Any]):
"""Cache GitHub issue analysis results."""
cache_key = f"issue_analysis_{issue_id}"
self.issue_cache.set(cache_key, analysis)
def get_issue_analysis(self, issue_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached issue analysis."""
cache_key = f"issue_analysis_{issue_id}"
return self.issue_cache.get(cache_key)
@staticmethod
def cache_embeddings(file_hash: str):
"""Decorator to cache embeddings by file hash."""
def decorator(func):
cache = {}
@wraps(func)
def wrapper(content: str, *args, **kwargs):
# Generate cache key from content hash
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
cache_key = f"{file_hash}_{content_hash}"
if cache_key in cache:
return cache[cache_key]
result = func(content, *args, **kwargs)
cache[cache_key] = result
# Limit cache size
if len(cache) > 100:
# Remove oldest entries
for key in list(cache.keys())[:20]:
del cache[key]
return result
return wrapper
return decorator
```
#### 2.3 Performance Optimizations
- [ ] Convert file processing to async with `asyncio.gather()`
- [ ] Implement connection pooling for Qdrant
- [ ] Stream large results instead of loading into memory
- [ ] Pre-compile regex patterns outside loops
- [ ] Implement incremental BM25 updates
**Expected Impact**: 3-5x performance improvement for large operations
### š Priority 3: Documentation Enhancement (1 week)
#### 3.1 MCP Tool Documentation Template
```python
@mcp.tool()
def tool_name(param1: type, param2: type) -> Dict[str, Any]:
"""
Brief description of what the tool does.
WHEN TO USE THIS TOOL:
- Specific use case 1
- Specific use case 2
- User asks to "example phrase"
- Specific scenario where this tool applies
This tool automatically:
- Key behavior 1
- Key behavior 2
- Important side effects
Args:
param1: Description of parameter 1
param2: Description of parameter 2
Returns:
Description of return value structure
Raises:
ExceptionType: When this exception occurs
"""
```
#### 3.2 Documentation Tasks
- [ ] Add "WHEN TO USE THIS TOOL" to 16 missing MCP tools
- [ ] Document all utility functions with complete docstrings
- [ ] Add class-level docstrings to all classes
- [ ] Create automated documentation generator
- [ ] Add usage examples to complex functions
### š Priority 4: Code Cleanup (3-4 days)
#### 4.1 Dead Code Removal
- [ ] Remove schema extraction methods from `config_indexer.py`
- [ ] Archive debug test files in `tests/debug/`
- [ ] Remove commented-out code blocks
- [ ] Clean up unused imports
- [ ] Remove obsolete test functions
#### 4.2 Code Reorganization
```
src/
āāā core/ # Core utilities and abstractions
ā āāā __init__.py
ā āāā decorators.py # Common decorators
ā āāā handlers.py # Base handlers
ā āāā caching.py # Caching utilities
ā āāā batch_processing.py # Batch operations
ā āāā logging_utils.py # Logging utilities
āāā integrations/ # External integrations
ā āāā __init__.py
ā āāā github/ # All GitHub-related code
ā ā āāā __init__.py
ā ā āāā client.py
ā ā āāā operations.py
ā ā āāā models.py
ā āāā qdrant/ # Qdrant-specific code
ā āāā __init__.py
ā āāā client.py
ā āāā operations.py
āāā indexers/ # Keep existing structure
āāā utils/ # Keep existing structure
āāā ...
```
### š Priority 5: Advanced Features (2-3 weeks)
#### 5.1 Intelligent Batch Indexing
```python
class SmartIndexer:
"""Intelligent indexing with parallel processing and progress tracking."""
def __init__(self):
self.batch_processor = BatchFileProcessor()
self.performance_monitor = PerformanceMonitor()
async def index_directory_parallel(self,
directory: Path,
max_workers: int = 4) -> Dict[str, Any]:
"""
Index directory with parallel processing and smart batching.
Automatically adjusts batch size based on:
- Available memory
- File sizes
- System load
"""
# Implementation details...
```
#### 5.2 Streaming Results
```python
class StreamingSearchResults:
"""Stream search results to reduce memory usage."""
async def search_stream(self,
query: str,
chunk_size: int = 10):
"""
Yield search results as they're found.
Yields:
Chunks of search results
"""
# Implementation details...
```
## Implementation Roadmap
### Week 1-2: Code Consolidation
- Day 1-2: Create core abstractions (decorators, handlers)
- Day 3-4: Refactor GitHub functions to use decorator
- Day 5-6: Refactor HTTP endpoints to use base handler
- Day 7-8: Create shared utilities (logging, collection names)
- Day 9-10: Testing and integration
### Week 3-4: Performance Optimization
- Day 1-2: Implement batch file processing
- Day 3-4: Add caching layer for expensive operations
- Day 5-6: Convert to async operations where beneficial
- Day 7-8: Optimize Qdrant operations (batching, streaming)
- Day 9-10: Performance testing and benchmarking
### Week 5: Documentation Sprint
- Day 1: Add missing MCP tool documentation
- Day 2: Document utility functions
- Day 3: Add class-level documentation
- Day 4: Create documentation generator
- Day 5: Review and polish documentation
### Week 6: Cleanup & Testing
- Day 1: Remove dead code
- Day 2: Reorganize code structure
- Day 3-4: Add integration tests for new features
- Day 5: Final cleanup and code review
### Week 7-8: Advanced Features
- Day 1-3: Implement parallel indexing
- Day 4-5: Add streaming results
- Day 6-7: Deploy performance monitoring
- Day 8-10: Integration and testing
## Success Metrics
### Code Quality Metrics
- **Code duplication**: Reduce by 30-40%
- **Test coverage**: Increase to >80%
- **Documentation coverage**: 100% for public APIs
- **Cyclomatic complexity**: Reduce by 25%
### Performance Metrics
- **Indexing speed**: 3-5x improvement for large directories
- **Search latency**: <200ms p95
- **Memory usage**: 50% reduction for large operations
- **Batch processing**: 10x improvement for bulk operations
### Maintainability Metrics
- **Time to fix bugs**: -50%
- **Time to add features**: -30%
- **Developer onboarding**: -40%
- **Code review time**: -25%
## Quick Wins (Immediate Implementation)
1. **Add missing MCP tool documentation** (1 day)
- High impact, low effort
- Improves tool discovery
2. **Create GitHub operation decorator** (1 day)
- Eliminates most duplicate code
- Immediate maintainability improvement
3. **Remove unused schema extraction** (2 hours)
- Clean up ~200 lines of dead code
4. **Implement basic file batching** (1 day)
- 2-3x performance improvement for indexing
## Risk Mitigation
### Technical Risks
- **Breaking changes**: Use feature flags for gradual rollout
- **Performance regressions**: Comprehensive benchmarking before/after
- **Integration issues**: Extensive testing with existing code
### Process Risks
- **Scope creep**: Stick to defined priorities
- **Timeline slippage**: Start with quick wins
- **Team adoption**: Clear documentation and examples
## Conclusion
This enhancement plan transforms the codebase from a rapidly-grown prototype to a production-ready system. By focusing on consolidation, performance, documentation, and cleanup, we can achieve:
- **Better maintainability** through reduced duplication
- **Improved performance** through batching and caching
- **Enhanced reliability** through consistent error handling
- **Easier development** through complete documentation
The phased approach ensures continuous improvement while maintaining system stability.