Skip to main content
Glama
test_complete_session_manager.py19.8 kB
""" Complete functional tests for Session Manager covering all functionality. """ import pytest import asyncio import json import time import os import tempfile from pathlib import Path from datetime import datetime, timedelta from shannon_mcp.managers.session import SessionManager from shannon_mcp.managers.binary import BinaryManager class TestCompleteSessionManager: """Exhaustive tests for every Session Manager function.""" @pytest.fixture async def session_setup(self): """Set up session manager with binary manager.""" binary_manager = BinaryManager() binaries = await binary_manager.discover_binaries() if not binaries: pytest.skip("No Claude Code binary found") session_manager = SessionManager( binary_manager=binary_manager, max_sessions=10, default_timeout=300, cache_size=100 ) yield session_manager # Cleanup all sessions await session_manager.cleanup_all_sessions() @pytest.mark.asyncio async def test_session_initialization(self, session_setup): """Test SessionManager initialization with all options.""" # Test custom initialization binary_manager = BinaryManager() # Default options sm1 = SessionManager(binary_manager=binary_manager) assert sm1.max_sessions == 50 assert sm1.default_timeout == 600 assert sm1.cache_size == 1000 # Custom options sm2 = SessionManager( binary_manager=binary_manager, max_sessions=5, default_timeout=30, cache_size=10, auto_cleanup=True, cleanup_interval=60 ) assert sm2.max_sessions == 5 assert sm2.default_timeout == 30 assert sm2.cache_size == 10 assert sm2.auto_cleanup == True @pytest.mark.asyncio async def test_session_creation_options(self, session_setup): """Test creating sessions with all possible options.""" manager = session_setup # Test basic session creation session1 = await manager.create_session("test-basic") assert session1.id == "test-basic" assert session1.status == "initialized" assert session1.created_at is not None # Test with all options session2 = await manager.create_session( session_id="test-full", options={ "model": "claude-3-opus-20240229", "temperature": 0.7, "max_tokens": 4096, "stream": True, "timeout": 120, "system_prompt": "You are a helpful assistant", "stop_sequences": ["\n\n", "END"], "top_p": 0.9, "top_k": 40 }, metadata={ "user": "test_user", "purpose": "testing", "tags": ["test", "functional"] } ) assert session2.options["model"] == "claude-3-opus-20240229" assert session2.options["temperature"] == 0.7 assert session2.metadata["user"] == "test_user" assert "test" in session2.metadata["tags"] # Test session ID generation session3 = await manager.create_session() # Auto-generated ID assert session3.id is not None assert len(session3.id) > 0 # Cleanup for session in [session1, session2, session3]: await manager.close_session(session.id) @pytest.mark.asyncio async def test_session_lifecycle(self, session_setup): """Test complete session lifecycle and state transitions.""" manager = session_setup # Create session session = await manager.create_session("lifecycle-test") assert session.status == "initialized" # Start session await manager.start_session(session.id) assert session.status == "starting" # Wait for running state max_wait = 10 start_time = time.time() while session.status != "running" and time.time() - start_time < max_wait: await asyncio.sleep(0.1) assert session.status == "running" assert session.process is not None assert session.pid is not None print(f"\nSession PID: {session.pid}") print(f"Session started in: {time.time() - start_time:.2f}s") # Pause session await manager.pause_session(session.id) assert session.status == "paused" # Resume session await manager.resume_session(session.id) assert session.status == "running" # Stop session await manager.stop_session(session.id) assert session.status == "stopped" # Close session await manager.close_session(session.id) assert session.status == "closed" # Verify process terminated if session.pid: import psutil assert not psutil.pid_exists(session.pid) @pytest.mark.asyncio async def test_prompt_execution(self, session_setup): """Test executing prompts with various options.""" manager = session_setup session = await manager.create_session("prompt-test") await manager.start_session(session.id) # Simple prompt result1 = await manager.execute_prompt( session.id, "What is 2 + 2? Reply with just the number." ) print(f"\nSimple prompt result: {result1}") assert result1 is not None assert "4" in str(result1) # Prompt with options result2 = await manager.execute_prompt( session.id, "Write a haiku about coding", options={ "temperature": 0.9, "max_tokens": 100 } ) print(f"\nHaiku result: {result2}") assert result2 is not None assert len(str(result2)) > 10 # Prompt with timeout try: result3 = await manager.execute_prompt( session.id, "Count to 1000000 very slowly", timeout=2.0 ) except asyncio.TimeoutError: print("\nPrompt timed out as expected") result3 = None # Multi-turn conversation await manager.execute_prompt(session.id, "Remember the number 42") result4 = await manager.execute_prompt( session.id, "What number did I ask you to remember?" ) print(f"\nMemory test result: {result4}") assert "42" in str(result4) await manager.close_session(session.id) @pytest.mark.asyncio async def test_streaming_execution(self, session_setup): """Test streaming prompt execution.""" manager = session_setup session = await manager.create_session( "stream-test", options={"stream": True} ) await manager.start_session(session.id) # Collect stream chunks chunks = [] chunk_times = [] last_time = time.time() async for chunk in manager.stream_prompt( session.id, "Count from 1 to 10, one number per line" ): current_time = time.time() chunk_times.append(current_time - last_time) last_time = current_time chunks.append(chunk) print(f"Chunk {len(chunks)}: {chunk}") print(f"\nTotal chunks: {len(chunks)}") print(f"Average chunk interval: {sum(chunk_times)/len(chunk_times):.3f}s") # Verify streaming worked assert len(chunks) > 1 # Combine chunks and verify content if isinstance(chunks[0], dict): full_content = "".join(c.get("content", "") for c in chunks) else: full_content = "".join(str(c) for c in chunks) # Should contain numbers 1-10 for i in range(1, 11): assert str(i) in full_content await manager.close_session(session.id) @pytest.mark.asyncio async def test_session_cancellation(self, session_setup): """Test cancelling sessions and prompts.""" manager = session_setup session = await manager.create_session("cancel-test") await manager.start_session(session.id) # Start long-running prompt prompt_task = asyncio.create_task( manager.execute_prompt( session.id, "Write a very detailed essay about the history of computing, at least 5000 words" ) ) # Cancel after short delay await asyncio.sleep(1.0) # Cancel the session cancelled = await manager.cancel_session(session.id) assert cancelled == True assert session.status == "cancelled" # Verify prompt task was cancelled with pytest.raises(asyncio.CancelledError): await prompt_task # Test cancelling specific prompt session2 = await manager.create_session("cancel-prompt-test") await manager.start_session(session2.id) # Start prompt with ID prompt_id = "test-prompt-123" prompt_task2 = asyncio.create_task( manager.execute_prompt( session2.id, "Another long task", prompt_id=prompt_id ) ) await asyncio.sleep(0.5) # Cancel specific prompt cancelled = await manager.cancel_prompt(session2.id, prompt_id) assert cancelled == True await manager.close_session(session2.id) @pytest.mark.asyncio async def test_session_state_management(self, session_setup): """Test session state saving and restoration.""" manager = session_setup # Create session with some state session = await manager.create_session("state-test") await manager.start_session(session.id) # Build up state await manager.execute_prompt(session.id, "My name is Alice") await manager.execute_prompt(session.id, "I live in New York") await manager.execute_prompt(session.id, "My favorite color is blue") # Get current state state = await manager.get_session_state(session.id) print(f"\nSession state size: {len(json.dumps(state))} bytes") assert "messages" in state assert len(state["messages"]) >= 6 # 3 prompts + 3 responses # Save state to file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(state, f) state_file = f.name # Close original session await manager.close_session(session.id) # Create new session and restore state new_session = await manager.create_session("restored-test") # Load state from file with open(state_file, 'r') as f: loaded_state = json.load(f) await manager.restore_session_state(new_session.id, loaded_state) await manager.start_session(new_session.id) # Verify state was restored result = await manager.execute_prompt( new_session.id, "What is my name, where do I live, and what's my favorite color?" ) print(f"\nRestored session response: {result}") assert "Alice" in str(result) assert "New York" in str(result) assert "blue" in str(result) await manager.close_session(new_session.id) os.unlink(state_file) @pytest.mark.asyncio async def test_session_caching(self, session_setup): """Test session caching mechanism.""" manager = session_setup # Create multiple sessions session_ids = [] for i in range(5): session = await manager.create_session(f"cache-test-{i}") await manager.start_session(session.id) session_ids.append(session.id) # Add to cache await manager.execute_prompt(session.id, f"Session {i}") # Check cache stats cache_stats = manager.get_cache_stats() print(f"\nCache stats: {cache_stats}") assert cache_stats["size"] >= 5 assert cache_stats["hits"] >= 0 # Access sessions to test cache hits for session_id in session_ids: session = await manager.get_session(session_id) assert session is not None # Check updated stats new_stats = manager.get_cache_stats() assert new_stats["hits"] > cache_stats["hits"] # Test cache eviction manager.cache_size = 3 # Reduce cache size manager.evict_old_sessions() final_stats = manager.get_cache_stats() assert final_stats["size"] <= 3 # Cleanup for session_id in session_ids: await manager.close_session(session_id) @pytest.mark.asyncio async def test_concurrent_sessions(self, session_setup): """Test managing multiple concurrent sessions.""" manager = session_setup # Create multiple sessions concurrently session_count = 5 async def create_and_use_session(index): session = await manager.create_session(f"concurrent-{index}") await manager.start_session(session.id) # Execute some prompts result = await manager.execute_prompt( session.id, f"Calculate {index} * {index}" ) return session.id, result # Run concurrently tasks = [create_and_use_session(i) for i in range(session_count)] results = await asyncio.gather(*tasks) print(f"\nCreated {len(results)} concurrent sessions") for session_id, result in results: print(f" {session_id}: {result}") # Verify all sessions are active active_sessions = await manager.list_active_sessions() assert len(active_sessions) >= session_count # Test session limits manager.max_sessions = 3 # Try to create more sessions with pytest.raises(Exception) as exc_info: for i in range(5): await manager.create_session(f"limit-test-{i}") await manager.start_session(f"limit-test-{i}") assert "limit" in str(exc_info.value).lower() # Cleanup all sessions await manager.cleanup_all_sessions() @pytest.mark.asyncio async def test_session_metrics(self, session_setup): """Test session metrics and statistics.""" manager = session_setup session = await manager.create_session("metrics-test") await manager.start_session(session.id) # Initial metrics initial_stats = await manager.get_session_stats(session.id) print(f"\nInitial stats: {initial_stats}") assert initial_stats["prompt_count"] == 0 assert initial_stats["total_tokens"] == 0 assert initial_stats["status"] == "running" # Execute prompts and track metrics prompts = [ "What is AI?", "Explain machine learning in one sentence", "List 3 programming languages" ] for prompt in prompts: await manager.execute_prompt(session.id, prompt) # Get updated metrics final_stats = await manager.get_session_stats(session.id) print(f"\nFinal stats: {final_stats}") assert final_stats["prompt_count"] == len(prompts) assert final_stats["total_tokens"] > 0 assert final_stats["avg_response_time"] > 0 assert final_stats["runtime_seconds"] > 0 # Test global metrics global_stats = await manager.get_global_stats() print(f"\nGlobal stats: {global_stats}") assert global_stats["total_sessions"] >= 1 assert global_stats["active_sessions"] >= 1 assert global_stats["total_prompts"] >= len(prompts) await manager.close_session(session.id) @pytest.mark.asyncio async def test_session_persistence(self, session_setup): """Test session persistence to database.""" manager = session_setup # Enable persistence await manager.enable_persistence(Path("/tmp/session_test.db")) # Create and use session session = await manager.create_session("persist-test") await manager.start_session(session.id) await manager.execute_prompt(session.id, "Test persistence") # Save to database await manager.persist_session(session.id) # Simulate restart - create new manager new_manager = SessionManager(binary_manager=manager.binary_manager) await new_manager.enable_persistence(Path("/tmp/session_test.db")) # Load persisted sessions loaded_sessions = await new_manager.load_persisted_sessions() print(f"\nLoaded {len(loaded_sessions)} persisted sessions") assert len(loaded_sessions) >= 1 assert any(s.id == "persist-test" for s in loaded_sessions) # Verify session data preserved loaded_session = next(s for s in loaded_sessions if s.id == "persist-test") assert loaded_session.message_count > 0 # Cleanup await manager.close_session(session.id) Path("/tmp/session_test.db").unlink(missing_ok=True) @pytest.mark.asyncio async def test_session_resource_monitoring(self, session_setup): """Test monitoring session resource usage.""" manager = session_setup session = await manager.create_session("resource-test") await manager.start_session(session.id) # Enable resource monitoring await manager.enable_resource_monitoring(session.id, interval=1) # Generate some load for i in range(3): await manager.execute_prompt( session.id, f"Generate a list of {100 * (i + 1)} random numbers" ) await asyncio.sleep(1) # Get resource history resources = await manager.get_resource_history(session.id) print(f"\nResource samples: {len(resources)}") for i, sample in enumerate(resources[:3]): print(f" Sample {i+1}:") print(f" CPU: {sample.get('cpu_percent', 0):.1f}%") print(f" Memory: {sample.get('memory_mb', 0):.1f} MB") print(f" Time: {sample.get('timestamp', 'N/A')}") assert len(resources) >= 2 # Check resource alerts alerts = await manager.check_resource_alerts( session.id, cpu_threshold=50, memory_threshold=100 ) print(f"\nResource alerts: {len(alerts)}") for alert in alerts: print(f" {alert['type']}: {alert['message']}") await manager.close_session(session.id)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server