test_enhanced_append_entry.py•17.4 kB
#!/usr/bin/env python3
"""Comprehensive test suite for enhanced append_entry functionality.
This test validates all the new features:
- Newline handling and sanitization
- Auto-split multiline detection
- Enhanced bulk mode with direct list support
- Individual timestamps for bulk entries
- Robust error handling with fallbacks
- Performance optimizations for large content
"""
import asyncio
import json
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List
# Add the MCP_SPINE directory to Python path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from scribe_mcp.tools.append_entry import (
    _sanitize_message,
    _should_use_bulk_mode,
    _split_multiline_message,
    _prepare_bulk_items_with_timestamps,
    _apply_inherited_metadata,
    append_entry,
)
from scribe_mcp.config.settings import Settings
from scribe_mcp.state import StateManager
class MockStorage:
    """Mock storage backend for testing."""
    def __init__(self):
        self.projects = {}
        self.entries = []
    async def setup(self):
        pass
    async def close(self):
        pass
    async def fetch_project(self, name: str):
        return self.projects.get(name)
    async def upsert_project(self, name: str, repo_root: str, progress_log_path: str):
        project = {"name": name, "root": repo_root, "progress_log": progress_log_path}
        self.projects[name] = project
        return project
    async def insert_entry(self, **kwargs):
        self.entries.append(kwargs)
class EnhancedAppendEntryTest:
    """Test suite for enhanced append_entry functionality."""
    def __init__(self):
        self.temp_dir = None
        self.mock_storage = None
    def setup(self):
        """Set up test environment."""
        self.temp_dir = Path(tempfile.mkdtemp())
        self.mock_storage = MockStorage()
        # Create test project directory structure
        self.project_dir = self.temp_dir / "test-project"
        self.project_dir.mkdir()
        (self.project_dir / "docs" / "dev_plans").mkdir(parents=True)
        self.log_path = self.project_dir / "docs" / "dev_plans" / "PROGRESS_LOG.md"
    def cleanup(self):
        """Clean up test environment."""
        import shutil
        if self.temp_dir:
            shutil.rmtree(self.temp_dir)
    def test_message_sanitization(self):
        """Test message sanitization for MCP protocol."""
        print("🧪 Testing message sanitization...")
        # Test basic newline replacement
        message = "Line 1\nLine 2\r\nLine 3"
        sanitized = _sanitize_message(message)
        expected = "Line 1\\nLine 2\\nLine 3"
        assert sanitized == expected, f"Expected {expected}, got {sanitized}"
        print("   ✅ Basic newline sanitization")
        # Test empty message
        assert _sanitize_message("") == ""
        assert _sanitize_message(None) == None
        print("   ✅ Empty message handling")
        # Test complex multiline with special characters
        complex_msg = "Error: Something went wrong\nTraceback: line 42\n  -> function call"
        sanitized = _sanitize_message(complex_msg)
        assert "\\n" in sanitized and "\n" not in sanitized
        print("   ✅ Complex multiline sanitization")
    def test_bulk_mode_detection(self):
        """Test bulk mode detection logic."""
        print("🧪 Testing bulk mode detection...")
        # Test newline detection
        single_line = "This is a single line"
        multiline = "Line 1\nLine 2\nLine 3"
        assert not _should_use_bulk_mode(single_line)
        assert _should_use_bulk_mode(multiline)
        print("   ✅ Newline detection")
        # Test pipe character detection
        pipe_message = "Status: OK | Component: auth | Time: 1.2s"
        assert _should_use_bulk_mode(pipe_message)
        print("   ✅ Pipe character detection")
        # Test long message detection
        long_message = "x" * 600  # Over 500 character threshold
        assert _should_use_bulk_mode(long_message)
        print("   ✅ Long message detection")
        # Test explicit bulk parameters
        assert _should_use_bulk_mode("", items="[]")
        assert _should_use_bulk_mode("", items_list=[{"message": "test"}])
        print("   ✅ Explicit bulk parameter detection")
    def test_multiline_splitting(self):
        """Test multiline message splitting with smart detection."""
        print("🧪 Testing multiline splitting...")
        # Test basic splitting
        message = "Task 1: Setup\nTask 2: Implement\nTask 3: Test\n\nTask 4: Deploy"
        entries = _split_multiline_message(message)
        assert len(entries) == 4  # Should skip empty line
        assert entries[0]["message"] == "Task 1: Setup"
        assert entries[3]["message"] == "Task 4: Deploy"
        print("   ✅ Basic splitting with empty line filtering")
        # Test status auto-detection
        error_message = "Error: Connection failed\n  timeout after 30s\nSUCCESS: Retry worked"
        entries = _split_multiline_message(error_message)
        assert entries[0].get("status") == "error"
        assert entries[2].get("status") == "success"
        print("   ✅ Status auto-detection")
        # Test emoji detection
        emoji_message = "🚀 Deploying application\n✅ Deployment complete\n🎉 All systems operational"
        entries = _split_multiline_message(emoji_message)
        assert entries[0].get("emoji") == "🚀"
        assert entries[1].get("emoji") == "✅"
        assert entries[2].get("emoji") == "🎉"
        print("   ✅ Emoji auto-detection")
    def test_timestamp_preparation(self):
        """Test timestamp preparation for bulk entries."""
        print("🧪 Testing timestamp preparation...")
        items = [
            {"message": "First task"},
            {"message": "Second task"},
            {"message": "Third task"},
        ]
        # Test with base timestamp
        base_time = "2025-10-25 12:00:00 UTC"
        prepared = _prepare_bulk_items_with_timestamps(items, base_time, stagger_seconds=5)
        assert len(prepared) == 3
        assert prepared[0]["timestamp_utc"] == "2025-10-25 12:00:00 UTC"
        assert prepared[1]["timestamp_utc"] == "2025-10-25 12:00:05 UTC"
        assert prepared[2]["timestamp_utc"] == "2025-10-25 12:00:10 UTC"
        print("   ✅ Timestamp staggering with base time")
        # Test without base timestamp (should use current time)
        items_no_timestamp = [{"message": "Test"}]
        prepared_now = _prepare_bulk_items_with_timestamps(items_no_timestamp)
        assert "timestamp_utc" in prepared_now[0]
        # Should be close to current time
        ts = datetime.strptime(prepared_now[0]["timestamp_utc"], "%Y-%m-%d %H:%M:%S UTC")
        now = datetime.now(timezone.utc)
        assert abs((now - ts).total_seconds()) < 5  # Within 5 seconds
        print("   ✅ Auto-timestamp generation")
    def test_metadata_inheritance(self):
        """Test metadata inheritance for bulk entries."""
        print("🧪 Testing metadata inheritance...")
        items = [
            {"message": "Task 1"},
            {"message": "Task 2", "status": "success"},
            {"message": "Task 3", "meta": {"component": "auth"}},
            {"message": "Task 4", "status": "error", "meta": {"component": "db", "retry": 3}},
        ]
        inherited_meta = {"project": "my-app", "version": "2.0"}
        inherited_status = "info"
        inherited_emoji = "🔧"
        inherited_agent = "BuildBot"
        result = _apply_inherited_metadata(
            items, inherited_meta, inherited_status, inherited_emoji, inherited_agent
        )
        # Check inherited values applied to items without explicit values
        assert result[0]["status"] == "info"  # Inherited
        assert result[0]["emoji"] == "🔧"     # Inherited
        assert result[0]["agent"] == "BuildBot"  # Inherited
        assert result[0]["meta"]["project"] == "my-app"  # Inherited
        assert result[0]["meta"]["version"] == "2.0"   # Inherited
        # Check explicit values are preserved
        assert result[1]["status"] == "success"  # Explicit, not inherited
        assert result[1]["emoji"] == "🔧"        # Inherited
        assert result[2]["meta"]["component"] == "auth"  # Explicit
        assert result[2]["meta"]["project"] == "my-app"   # Merged with inherited
        # Check proper merging for complex case
        assert result[3]["status"] == "error"  # Explicit preserved
        assert result[3]["meta"]["component"] == "db"  # Explicit preserved
        assert result[3]["meta"]["retry"] == 3          # Explicit preserved
        assert result[3]["meta"]["project"] == "my-app" # Inherited merged
        print("   ✅ Metadata inheritance and merging")
    async def test_enhanced_append_entry_functionality(self):
        """Test the enhanced append_entry function with various scenarios."""
        print("🧪 Testing enhanced append_entry functionality...")
        # Mock the global dependencies
        import scribe_mcp.tools.append_entry as append_module
        import scribe_mcp.server as server_module
        # Set up mock server state
        original_storage = server_module.storage_backend
        original_state_manager = server_module.state_manager
        try:
            server_module.storage_backend = self.mock_storage
            server_module.state_manager = StateManager()
            # Create mock project data
            project_data = {
                "name": "test-project",
                "root": str(self.project_dir),
                "progress_log": str(self.log_path),
                "defaults": {
                    "emoji": "📋",
                    "agent": "TestAgent"
                }
            }
            # Mock agent project data
            async def mock_get_agent_project_data(agent_id):
                return project_data, ["test-project"]
            original_get_agent_project_data = append_module.get_agent_project_data
            append_module.get_agent_project_data = mock_get_agent_project_data
            # Mock agent identity
            class MockAgentIdentity:
                async def get_or_create_agent_id(self):
                    return "test-agent"
                async def update_agent_activity(self, *args, **kwargs):
                    pass
            server_module.agent_identity = MockAgentIdentity()
            # Test 1: Single line message (should work normally)
            result = await append_entry(
                message="Single line test message",
                status="info",
                meta={"test": "single_line"}
            )
            assert result["ok"], f"Single line failed: {result.get('error')}"
            assert result["written_count"] == 1
            print("   ✅ Single line message")
            # Test 2: Multiline message with auto_split (should auto-convert to bulk)
            multiline_message = "🚀 Starting deployment\n✅ Build completed\n🔧 Installing dependencies\n✅ Deployment successful"
            result = await append_entry(
                message=multiline_message,
                status="info",
                meta={"phase": "deployment"},
                auto_split=True,
                stagger_seconds=2
            )
            assert result["ok"], f"Multiline auto-split failed: {result.get('error')}"
            assert result["written_count"] == 4
            print("   ✅ Multiline auto-split functionality")
            # Test 3: Direct list bulk mode
            bulk_items = [
                {"message": "Database migration", "status": "info"},
                {"message": "Cache clearing", "status": "success"},
                {"message": "Service restart", "status": "info"},
            ]
            result = await append_entry(
                items_list=bulk_items,
                meta={"operation": "maintenance"},
                stagger_seconds=1
            )
            assert result["ok"], f"Direct list bulk failed: {result.get('error')}"
            assert result["written_count"] == 3
            print("   ✅ Direct list bulk mode")
            # Test 4: JSON string bulk mode (backwards compatibility)
            json_items = json.dumps([
                {"message": "API endpoint created", "status": "success"},
                {"message": "Documentation updated", "status": "info"},
            ])
            result = await append_entry(
                items=json_items,
                meta={"component": "api"}
            )
            assert result["ok"], f"JSON bulk failed: {result.get('error')}"
            assert result["written_count"] == 2
            print("   ✅ JSON string bulk mode (backwards compatibility)")
            # Test 5: Error handling for invalid input
            result = await append_entry(
                message="Invalid | pipe | characters",
                auto_split=False  # Disable auto-split to test validation
            )
            assert not result["ok"]
            assert "pipe" in result["error"]
            assert "suggestion" in result
            print("   ✅ Error handling with helpful suggestions")
            # Test 6: Large bulk content (performance test)
            large_bulk = []
            for i in range(25):  # Test with 25 items
                large_bulk.append({
                    "message": f"Processing item {i+1}",
                    "status": "info" if i % 2 == 0 else "success",
                    "meta": {"batch_id": "test_batch", "item": i+1}
                })
            start_time = time.time()
            result = await append_entry(
                items_list=large_bulk,
                meta={"test": "performance"}
            )
            duration = time.time() - start_time
            assert result["ok"], f"Large bulk failed: {result.get('error')}"
            assert result["written_count"] == 25
            assert duration < 5.0  # Should complete within 5 seconds
            print(f"   ✅ Large bulk performance ({duration:.2f}s for 25 items)")
            # Verify log file was created and has content
            assert self.log_path.exists()
            log_content = self.log_path.read_text()
            assert len(log_content.splitlines()) >= 1 + 4 + 3 + 2 + 25  # All previous entries
            print("   ✅ Log file integrity")
            # Restore original functions
            append_module.get_agent_project_data = original_get_agent_project_data
        finally:
            # Restore original dependencies
            server_module.storage_backend = original_storage
            server_module.state_manager = original_state_manager
    def run_all_tests(self):
        """Run all test cases."""
        print("🚀 Enhanced append_entry Test Suite")
        print("=" * 50)
        try:
            self.setup()
            # Run synchronous tests
            self.test_message_sanitization()
            self.test_bulk_mode_detection()
            self.test_multiline_splitting()
            self.test_timestamp_preparation()
            self.test_metadata_inheritance()
            # Run async tests
            asyncio.run(self.test_enhanced_append_entry_functionality())
            print("\n🎉 All tests passed!")
            print("✅ Enhanced append_entry is working perfectly!")
        except Exception as e:
            print(f"\n❌ Test failed: {e}")
            import traceback
            traceback.print_exc()
            return False
        finally:
            self.cleanup()
        return True
def main():
    """Run the enhanced append_entry test suite."""
    test_suite = EnhancedAppendEntryTest()
    success = test_suite.run_all_tests()
    if success:
        print("\n🎯 ENHANCED FEATURES VALIDATED:")
        print("   ✅ Newline rejection fixed with message sanitization")
        print("   ✅ Smart multiline detection and auto-split")
        print("   ✅ Enhanced bulk mode with direct list support")
        print("   ✅ Individual timestamps with staggering")
        print("   ✅ Metadata inheritance and merging")
        print("   ✅ Robust error handling with helpful suggestions")
        print("   ✅ Performance optimizations for large content")
        print("   ✅ Backwards compatibility maintained")
        print("\n💡 USAGE EXAMPLES:")
        print("   # Multiline content (auto-splits):")
        print("   await append_entry(message='Task 1\\nTask 2\\nTask 3')")
        print("")
        print("   # Direct list bulk mode:")
        print("   await append_entry(items_list=[")
        print("       {'message': 'Item 1', 'status': 'info'},")
        print("       {'message': 'Item 2', 'status': 'success'}")
        print("   ])")
        print("")
        print("   # With metadata inheritance:")
        print("   await append_entry(")
        print("       message='Line 1\\nLine 2\\nLine 3',")
        print("       meta={'project': 'my-app', 'version': '2.0'},")
        print("       status='info',")
        print("       stagger_seconds=5")
        print("   )")
        return True
    else:
        print("\n❌ Some tests failed - check the implementation")
        return False
if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)