Skip to main content
Glama
test_shared.py10.2 kB
import pytest from datetime import datetime, timezone, timedelta from unittest.mock import patch, MagicMock from outlook_mcp_server.backend.shared import ( email_cache, email_cache_order, _email_time_cache, add_email_to_cache, get_email_from_cache, clear_cache, get_cache_size, get_cache_stats, cleanup_cache, get_emails_by_date_range, get_emails_by_sender, get_emails_by_subject, get_emails_by_date_range_cached, get_emails_by_sender_cached, get_emails_by_subject_cached ) class TestCacheManagement: """Test suite for cache management functions.""" def setup_method(self): """Setup method to clear cache before each test.""" clear_cache() def teardown_method(self): """Teardown method to clear cache after each test.""" clear_cache() def test_add_email_to_cache(self): """Test adding an email to cache.""" email_data = { "subject": "Test Subject", "sender": "test@example.com", "received_time": datetime.now(timezone.utc).isoformat(), "body": "Test body" } email_id = "test_id_1" add_email_to_cache(email_id, email_data) assert email_id in email_cache assert email_cache[email_id] == email_data assert email_id in email_cache_order def test_add_email_to_cache_eviction(self): """Test cache eviction when max size is exceeded.""" from outlook_mcp_server.backend.config import CacheConfig for i in range(CacheConfig.MAX_EMAILS + 10): email_data = { "subject": f"Test Subject {i}", "sender": f"test{i}@example.com", "received_time": datetime.now(timezone.utc).isoformat(), "body": f"Test body {i}" } add_email_to_cache(f"test_id_{i}", email_data) assert len(email_cache) <= CacheConfig.MAX_EMAILS assert len(email_cache_order) <= CacheConfig.MAX_EMAILS def test_add_email_to_cache_time_cleanup(self): """Test that time cache is cleaned up when email is evicted.""" from outlook_mcp_server.backend.config import CacheConfig received_time = datetime.now(timezone.utc).isoformat() for i in range(CacheConfig.MAX_EMAILS + 10): email_data = { "subject": f"Test Subject {i}", "sender": f"test{i}@example.com", "received_time": received_time, "body": f"Test body {i}" } add_email_to_cache(f"test_id_{i}", email_data) assert len(_email_time_cache) <= CacheConfig.MAX_EMAILS def test_get_email_from_cache_exists(self): """Test retrieving an existing email from cache.""" email_data = { "subject": "Test Subject", "sender": "test@example.com", "received_time": datetime.now(timezone.utc).isoformat(), "body": "Test body" } email_id = "test_id_1" add_email_to_cache(email_id, email_data) result = get_email_from_cache(email_id) assert result == email_data def test_get_email_from_cache_not_exists(self): """Test retrieving a non-existent email from cache.""" result = get_email_from_cache("non_existent_id") assert result is None def test_clear_cache(self): """Test clearing the cache.""" email_data = { "subject": "Test Subject", "sender": "test@example.com", "received_time": datetime.now(timezone.utc).isoformat(), "body": "Test body" } add_email_to_cache("test_id_1", email_data) clear_cache() assert len(email_cache) == 0 assert len(email_cache_order) == 0 assert len(_email_time_cache) == 0 def test_get_cache_size(self): """Test getting cache size.""" assert get_cache_size() == 0 email_data = { "subject": "Test Subject", "sender": "test@example.com", "received_time": datetime.now(timezone.utc).isoformat(), "body": "Test body" } add_email_to_cache("test_id_1", email_data) assert get_cache_size() == 1 def test_get_cache_stats(self): """Test getting cache statistics.""" email_data = { "subject": "Test Subject", "sender": "test@example.com", "received_time": datetime.now(timezone.utc).isoformat(), "body": "Test body" } add_email_to_cache("test_id_1", email_data) stats = get_cache_stats() assert stats["total_emails"] == 1 assert "oldest_email" in stats assert "newest_email" in stats def test_cleanup_cache(self): """Test cache cleanup functionality.""" from outlook_mcp_server.backend.config import CacheConfig old_time = (datetime.now(timezone.utc) - timedelta(days=CacheConfig.CACHE_EXPIRY_HOURS + 1)).isoformat() new_time = datetime.now(timezone.utc).isoformat() add_email_to_cache("old_id", {"subject": "Old", "received_time": old_time}) add_email_to_cache("new_id", {"subject": "New", "received_time": new_time}) cleanup_cache() assert "old_id" not in email_cache assert "new_id" in email_cache class TestCacheQueries: """Test suite for cache query functions.""" def setup_method(self): """Setup method to populate cache with test data.""" clear_cache() now = datetime.now(timezone.utc) test_emails = [ { "id": "email_1", "subject": "Meeting Tomorrow", "sender": "alice@example.com", "received_time": (now - timedelta(hours=1)).isoformat(), "body": "Meeting at 10am" }, { "id": "email_2", "subject": "Project Update", "sender": "bob@example.com", "received_time": (now - timedelta(hours=2)).isoformat(), "body": "Project status update" }, { "id": "email_3", "subject": "Meeting Tomorrow", "sender": "charlie@example.com", "received_time": (now - timedelta(hours=3)).isoformat(), "body": "Another meeting" } ] for email in test_emails: add_email_to_cache(email["id"], email) def teardown_method(self): """Teardown method to clear cache after each test.""" clear_cache() def test_get_emails_by_date_range(self): """Test getting emails by date range.""" now = datetime.now(timezone.utc) start_time = (now - timedelta(hours=2)).isoformat() end_time = now.isoformat() results = get_emails_by_date_range(start_time, end_time) assert len(results) == 2 def test_get_emails_by_sender(self): """Test getting emails by sender.""" results = get_emails_by_sender("alice@example.com") assert len(results) == 1 assert results[0]["sender"] == "alice@example.com" def test_get_emails_by_subject(self): """Test getting emails by subject.""" results = get_emails_by_subject("Meeting") assert len(results) == 2 def test_get_emails_by_date_range_cached(self): """Test getting emails by date range with caching.""" now = datetime.now(timezone.utc) start_time = (now - timedelta(hours=2)).isoformat() end_time = now.isoformat() results1 = get_emails_by_date_range_cached(start_time, end_time) results2 = get_emails_by_date_range_cached(start_time, end_time) assert len(results1) == 2 assert len(results2) == 2 def test_get_emails_by_sender_cached(self): """Test getting emails by sender with caching.""" results1 = get_emails_by_sender_cached("alice@example.com") results2 = get_emails_by_sender_cached("alice@example.com") assert len(results1) == 1 assert len(results2) == 1 def test_get_emails_by_subject_cached(self): """Test getting emails by subject with caching.""" results1 = get_emails_by_subject_cached("Meeting") results2 = get_emails_by_subject_cached("Meeting") assert len(results1) == 2 assert len(results2) == 2 class TestCacheEdgeCases: """Test suite for cache edge cases.""" def setup_method(self): """Setup method to clear cache before each test.""" clear_cache() def teardown_method(self): """Teardown method to clear cache after each test.""" clear_cache() def test_add_email_with_missing_fields(self): """Test adding email with missing fields.""" email_data = { "subject": "Test Subject" } add_email_to_cache("test_id", email_data) assert "test_id" in email_cache def test_get_emails_by_empty_sender(self): """Test getting emails with empty sender filter.""" results = get_emails_by_sender("") assert results == [] def test_get_emails_by_empty_subject(self): """Test getting emails with empty subject filter.""" results = get_emails_by_subject("") assert results == [] def test_get_emails_by_invalid_date_range(self): """Test getting emails with invalid date range.""" now = datetime.now(timezone.utc) start_time = now.isoformat() end_time = (now - timedelta(hours=1)).isoformat() results = get_emails_by_date_range(start_time, end_time) assert results == [] def test_cache_stats_empty(self): """Test getting cache stats when cache is empty.""" stats = get_cache_stats() assert stats["total_emails"] == 0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marlonluo2018/outlook-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server