Skip to main content
Glama

MCP Server for Odoo

by ivnvxd
Mozilla Public License 2.0
88
  • Apple
  • Linux
test_caching.py•14.5 kB
"""Tests for caching functionality with Odoo integration.""" import os import time from unittest.mock import Mock, patch import pytest from mcp_server_odoo.config import OdooConfig, load_config from mcp_server_odoo.odoo_connection import OdooConnection from mcp_server_odoo.performance import PerformanceManager # Import skip_on_rate_limit decorator from .test_xmlrpc_operations import skip_on_rate_limit class TestOdooConnectionCaching: """Test caching functionality in OdooConnection.""" @pytest.fixture def mock_config(self): """Create mock config.""" config = Mock(spec=OdooConfig) config.url = os.getenv("ODOO_URL", "http://localhost:8069") config.db = "test" config.username = "test" config.password = "test" config.api_key = None config.uses_api_key = False config.uses_credentials = True config.is_yolo_enabled = False config.yolo_mode = "off" config.get_endpoint_paths.return_value = { "db": "/mcp/xmlrpc/db", "common": "/mcp/xmlrpc/common", "object": "/mcp/xmlrpc/object", } return config @pytest.fixture def mock_performance_manager(self, mock_config): """Create mock performance manager.""" return PerformanceManager(mock_config) def test_fields_get_caching(self, mock_config, mock_performance_manager): """Test fields_get method uses cache.""" # Create connection with performance manager conn = OdooConnection(mock_config, performance_manager=mock_performance_manager) # Mock the connection methods conn._connected = True conn._authenticated = True conn._uid = 2 conn._database = "test" # Mock execute_kw mock_fields = { "name": {"type": "char", "string": "Name"}, "email": {"type": "char", "string": "Email"}, } with patch.object(conn, "execute_kw", return_value=mock_fields) as mock_execute: # First call should hit server fields1 = conn.fields_get("res.partner") assert fields1 == mock_fields mock_execute.assert_called_once() # Second call should use cache fields2 = conn.fields_get("res.partner") assert fields2 == mock_fields mock_execute.assert_called_once() # Still only called once # Check cache stats stats = mock_performance_manager.field_cache.get_stats() assert stats["hits"] == 1 assert stats["misses"] == 1 def test_fields_get_with_attributes_no_cache(self, mock_config, mock_performance_manager): """Test fields_get with specific attributes doesn't use cache.""" conn = OdooConnection(mock_config, performance_manager=mock_performance_manager) # Mock the connection conn._connected = True conn._authenticated = True conn._uid = 2 conn._database = "test" mock_fields = {"name": {"type": "char"}} with patch.object(conn, "execute_kw", return_value=mock_fields) as mock_execute: # Call with attributes should not cache conn.fields_get("res.partner", attributes=["type"]) conn.fields_get("res.partner", attributes=["type"]) # Should call server twice assert mock_execute.call_count == 2 def test_read_caching(self, mock_config, mock_performance_manager): """Test read method uses cache.""" conn = OdooConnection(mock_config, performance_manager=mock_performance_manager) # Mock the connection conn._connected = True conn._authenticated = True conn._uid = 2 conn._database = "test" # Mock records mock_records = [ {"id": 1, "name": "Partner 1"}, {"id": 2, "name": "Partner 2"}, ] with patch.object(conn, "execute_kw", return_value=mock_records) as mock_execute: # First read should hit server records1 = conn.read("res.partner", [1, 2]) assert len(records1) == 2 mock_execute.assert_called_once() # Second read of same records should use cache records2 = conn.read("res.partner", [1, 2]) assert records2 == records1 mock_execute.assert_called_once() # Still only called once # Read with one cached and one new record mock_execute.return_value = [{"id": 3, "name": "Partner 3"}] records3 = conn.read("res.partner", [1, 3]) assert len(records3) == 2 assert records3[0]["id"] == 1 # Cached assert records3[1]["id"] == 3 # New assert mock_execute.call_count == 2 # Called again for record 3 def test_read_cache_respects_fields(self, mock_config, mock_performance_manager): """Test read cache respects requested fields.""" conn = OdooConnection(mock_config, performance_manager=mock_performance_manager) # Mock the connection conn._connected = True conn._authenticated = True conn._uid = 2 conn._database = "test" with patch.object(conn, "execute_kw") as mock_execute: # Read with specific fields mock_execute.return_value = [{"id": 1, "name": "Partner 1"}] conn.read("res.partner", [1], fields=["name"]) # Read same record with different fields should not use cache mock_execute.return_value = [{"id": 1, "email": "test@example.com"}] conn.read("res.partner", [1], fields=["email"]) # Should have called server twice assert mock_execute.call_count == 2 def test_cache_invalidation_on_write(self, mock_config, mock_performance_manager): """Test cache invalidation when records are modified.""" conn = OdooConnection(mock_config, performance_manager=mock_performance_manager) # Mock the connection conn._connected = True conn._authenticated = True conn._uid = 2 conn._database = "test" # Cache a record with patch.object(conn, "execute_kw", return_value=[{"id": 1, "name": "Original"}]): conn.read("res.partner", [1]) # Invalidate the cache for this record mock_performance_manager.invalidate_record_cache("res.partner", 1) # Next read should hit server again with patch.object( conn, "execute_kw", return_value=[{"id": 1, "name": "Updated"}] ) as mock_execute: records = conn.read("res.partner", [1]) assert records[0]["name"] == "Updated" mock_execute.assert_called_once() class TestCachingIntegration: """Integration tests for caching with real Odoo connection.""" @pytest.fixture def real_config(self): """Load real configuration.""" return load_config() @pytest.fixture def performance_manager(self, real_config): """Create performance manager with real config.""" return PerformanceManager(real_config) @pytest.mark.integration def test_real_fields_caching(self, real_config, performance_manager): """Test field caching with real Odoo connection.""" conn = OdooConnection(real_config, performance_manager=performance_manager) try: conn.connect() conn.authenticate() # Measure time for first fields_get call start1 = time.time() fields1 = conn.fields_get("res.partner") duration1 = time.time() - start1 # Measure time for second fields_get call (should be cached) start2 = time.time() fields2 = conn.fields_get("res.partner") duration2 = time.time() - start2 # Cached call should be much faster assert duration2 < duration1 / 10 # At least 10x faster assert fields1 == fields2 # Check cache stats stats = performance_manager.field_cache.get_stats() assert stats["hits"] == 1 assert stats["misses"] == 1 assert stats["hit_rate"] == 0.5 finally: conn.disconnect() @pytest.mark.integration def test_real_record_caching(self, real_config, performance_manager): """Test record caching with real Odoo connection.""" conn = OdooConnection(real_config, performance_manager=performance_manager) try: conn.connect() conn.authenticate() # Get some partner IDs partner_ids = conn.search("res.partner", [], limit=5) if partner_ids: # First read - hits server start1 = time.time() records1 = conn.read("res.partner", partner_ids[:2], ["name", "email", "phone"]) duration1 = time.time() - start1 # Second read - should use cache start2 = time.time() records2 = conn.read("res.partner", partner_ids[:2], ["name", "email", "phone"]) duration2 = time.time() - start2 # Cached read should be faster assert duration2 < duration1 / 5 assert records1 == records2 # Read mix of cached and uncached mixed_ids = [partner_ids[0], partner_ids[3]] # One cached, one new records3 = conn.read("res.partner", mixed_ids, ["name", "email", "phone"]) assert len(records3) == 2 # Check cache stats stats = performance_manager.record_cache.get_stats() assert stats["hits"] >= 2 # At least 2 cache hits finally: conn.disconnect() @pytest.mark.integration @skip_on_rate_limit def test_cache_performance_improvement(self, real_config, performance_manager): """Test overall performance improvement with caching.""" conn = OdooConnection(real_config, performance_manager=performance_manager) try: conn.connect() conn.authenticate() # Get test data partner_ids = conn.search("res.partner", [], limit=20) if len(partner_ids) >= 20: # Simulate repeated access pattern total_time_without_cache = 0 total_time_with_cache = 0 # First pass - cache warming for i in range(10): record_id = partner_ids[i] start = time.time() conn.read("res.partner", [record_id], ["name", "email", "phone", "city"]) total_time_without_cache += time.time() - start # Second pass - using cache for i in range(10): record_id = partner_ids[i] start = time.time() conn.read("res.partner", [record_id], ["name", "email", "phone", "city"]) total_time_with_cache += time.time() - start # Cache should provide significant speedup speedup = total_time_without_cache / total_time_with_cache assert speedup > 5 # At least 5x faster with cache # Get performance stats perf_stats = performance_manager.get_stats() # Check cache effectiveness cache_stats = perf_stats["caches"]["record_cache"] assert cache_stats["hit_rate"] >= 0.5 # Good hit rate # Check connection pooling conn_stats = perf_stats["connection_pool"] # Connection reuse happens after multiple operations on same endpoint assert conn_stats["connections_created"] >= 3 # At least 3 endpoints created finally: conn.disconnect() @pytest.mark.integration @skip_on_rate_limit def test_cache_memory_limits(self, real_config): """Test cache respects memory limits.""" # Create manager with small memory limit small_manager = PerformanceManager(real_config) small_manager.record_cache = small_manager.record_cache.__class__( max_size=10, max_memory_mb=0.1, # Very small limit ) conn = OdooConnection(real_config, performance_manager=small_manager) try: conn.connect() conn.authenticate() # Try to cache many large records partner_ids = conn.search("res.partner", [], limit=50) if len(partner_ids) >= 20: for pid in partner_ids[:20]: try: conn.read("res.partner", [pid]) except Exception: # Some records might not be accessible, continue pass # Check cache didn't grow too large stats = small_manager.record_cache.get_stats() assert stats["total_entries"] <= 10 assert stats["total_size_mb"] <= 0.5 # Allow more overhead for JSON serialization finally: conn.disconnect() @pytest.mark.integration @skip_on_rate_limit def test_connection_pool_reuse(self, real_config, performance_manager): """Test connection pooling improves performance.""" # Create two connections sharing same performance manager conn1 = OdooConnection(real_config, performance_manager=performance_manager) conn2 = OdooConnection(real_config, performance_manager=performance_manager) try: # Connect both conn1.connect() conn1.authenticate() conn2.connect() conn2.authenticate() # They should be reusing connections from pool pool_stats = performance_manager.connection_pool.get_stats() # Should have reused some connections assert pool_stats["connections_reused"] > 0 # Do some operations conn1.fields_get("res.partner") conn2.fields_get("res.partner") # Use same model to avoid permission issue # Check pool is being used effectively pool_stats = performance_manager.connection_pool.get_stats() assert pool_stats["active_connections"] <= 6 # 3 endpoints * 2 connections max finally: conn1.disconnect() conn2.disconnect()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ivnvxd/mcp-server-odoo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server