Skip to main content
Glama

MCP Server for Odoo

by ivnvxd
Mozilla Public License 2.0
88
  • Apple
  • Linux
odoo_connection.py40.2 kB
"""Odoo XML-RPC connection management. This module provides the OdooConnection class for managing connections to Odoo via XML-RPC using MCP-specific endpoints. """ import json import logging import socket import urllib.error import urllib.request import xmlrpc.client from contextlib import contextmanager from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urlparse from .config import OdooConfig from .error_sanitizer import ErrorSanitizer from .performance import PerformanceManager logger = logging.getLogger(__name__) class OdooConnectionError(Exception): """Base exception for Odoo connection errors.""" pass class OdooConnection: """Manages XML-RPC connections to Odoo with dynamic endpoint selection. This class provides connection management, health checking, and proper resource cleanup for Odoo XML-RPC connections. Supports both standard MCP endpoints and YOLO mode with standard Odoo endpoints. """ # Connection timeout in seconds DEFAULT_TIMEOUT = 30 def __init__( self, config: OdooConfig, timeout: int = DEFAULT_TIMEOUT, performance_manager: Optional[PerformanceManager] = None, ): """Initialize connection with configuration. Args: config: OdooConfig object with connection parameters timeout: Connection timeout in seconds performance_manager: Optional performance manager for optimizations """ self.config = config self.timeout = timeout self._url_components = self._parse_url(config.url) # Get appropriate endpoints based on mode endpoints = config.get_endpoint_paths() self.DB_ENDPOINT = endpoints["db"] self.COMMON_ENDPOINT = endpoints["common"] self.OBJECT_ENDPOINT = endpoints["object"] # Log YOLO mode if enabled if config.is_yolo_enabled: if config.yolo_mode == "read": logger.warning( "📖 YOLO MODE: READ-ONLY - Connecting to standard Odoo endpoints. " "Write operations will be blocked. Safe for demos and testing." ) elif config.yolo_mode == "true": logger.warning( "🚨 YOLO MODE: FULL ACCESS - Connecting to standard Odoo endpoints. " "ALL operations enabled! No MCP security controls. " "FOR DEVELOPMENT/TESTING ONLY - NEVER USE IN PRODUCTION!" ) # Performance manager for optimizations self._performance_manager = performance_manager or PerformanceManager(config) # XML-RPC proxies (created on connect) self._db_proxy: Optional[xmlrpc.client.ServerProxy] = None self._common_proxy: Optional[xmlrpc.client.ServerProxy] = None self._object_proxy: Optional[xmlrpc.client.ServerProxy] = None # Connection state self._connected = False self._uid: Optional[int] = None self._database: Optional[str] = None self._authenticated = False self._auth_method: Optional[str] = None # 'api_key' or 'password' mode_info = f" (YOLO mode: {config.yolo_mode})" if config.is_yolo_enabled else "" logger.info(f"Initialized OdooConnection for {self._url_components['host']}{mode_info}") def _parse_url(self, url: str) -> Dict[str, Any]: """Parse and validate Odoo URL. Args: url: The Odoo server URL Returns: Dictionary with URL components Raises: OdooConnectionError: If URL is invalid """ try: parsed = urlparse(url) if parsed.scheme not in ("http", "https"): raise OdooConnectionError( f"Invalid URL scheme: {parsed.scheme}. Must be http or https" ) if not parsed.hostname: raise OdooConnectionError("Invalid URL: missing hostname") port = parsed.port if not port: port = 443 if parsed.scheme == "https" else 80 return { "scheme": parsed.scheme, "host": parsed.hostname, "port": port, "path": parsed.path.rstrip("/") or "", "base_url": url.rstrip("/"), } except Exception as e: raise OdooConnectionError(f"Failed to parse URL: {e}") from e def _create_transport(self) -> xmlrpc.client.Transport: """Create XML-RPC transport with timeout support. Returns: Configured Transport object """ class TimeoutTransport(xmlrpc.client.Transport): def __init__(self, timeout, *args, **kwargs): self.timeout = timeout super().__init__(*args, **kwargs) def make_connection(self, host): connection = super().make_connection(host) if hasattr(connection, "sock") and connection.sock: connection.sock.settimeout(self.timeout) return connection return TimeoutTransport(self.timeout) def _build_endpoint_url(self, endpoint: str) -> str: """Build full URL for an MCP endpoint. Args: endpoint: The MCP endpoint path Returns: Full URL for the endpoint """ return f"{self._url_components['base_url']}{endpoint}" def connect(self) -> None: """Establish connection to Odoo server. Creates XML-RPC proxies for MCP endpoints but doesn't authenticate yet. Uses connection pooling for better performance. Raises: OdooConnectionError: If connection fails """ if self._connected: logger.warning("Already connected to Odoo") return try: # Use connection pool for proxies with dynamic endpoints self._db_proxy = self._performance_manager.get_optimized_connection(self.DB_ENDPOINT) self._common_proxy = self._performance_manager.get_optimized_connection( self.COMMON_ENDPOINT ) self._object_proxy = self._performance_manager.get_optimized_connection( self.OBJECT_ENDPOINT ) # Test connection by calling server_version self._test_connection() self._connected = True logger.info("Successfully connected to Odoo server") except socket.timeout: raise OdooConnectionError(f"Connection timeout after {self.timeout} seconds") from None except socket.error as e: raise OdooConnectionError( f"Failed to connect to {self._url_components['host']}:" f"{self._url_components['port']}: {e}" ) from e except Exception as e: raise OdooConnectionError(f"Connection failed: {e}") from e def _test_connection(self) -> None: """Test connection by calling server_version. Raises: OdooConnectionError: If test fails """ try: # Try to get server version via common endpoint version = self._common_proxy.version() logger.debug(f"Server version: {version}") except Exception as e: raise OdooConnectionError(f"Connection test failed: {e}") from e def disconnect(self, suppress_logging: bool = False) -> None: """Close connection and cleanup resources.""" if not self._connected: if not suppress_logging: try: logger.warning("Not connected to Odoo") except (ValueError, RuntimeError): # Ignore logging errors during cleanup pass return # Clear proxies (but don't close pooled connections) self._db_proxy = None self._common_proxy = None self._object_proxy = None # Clear connection state self._connected = False self._uid = None self._database = None self._authenticated = False self._auth_method = None if not suppress_logging: try: logger.info("Disconnected from Odoo server") except (ValueError, RuntimeError): # Ignore logging errors during cleanup pass def check_health(self) -> Tuple[bool, str]: """Check connection health. Returns: Tuple of (is_healthy, status_message) """ if not self._connected: return False, "Not connected" try: # Try to get server version as health check version = self._common_proxy.version() return True, f"Connected to Odoo {version.get('server_version', 'unknown')}" except socket.timeout: return False, f"Health check timeout after {self.timeout} seconds" except Exception as e: return False, f"Health check failed: {e}" def test_connection(self) -> bool: """Test if connection to Odoo is working. Returns: True if connection is working, False otherwise """ # If not connected, try to connect first if not self._connected: try: self.connect() except Exception as e: logger.error(f"Failed to connect: {e}") return False # Check health is_healthy, _ = self.check_health() return is_healthy def close(self) -> None: """Close the connection (alias for disconnect).""" self.disconnect() @property def is_connected(self) -> bool: """Check if currently connected.""" return self._connected @property def db_proxy(self) -> xmlrpc.client.ServerProxy: """Get database operations proxy. Returns: XML-RPC proxy for database operations Raises: OdooConnectionError: If not connected """ if not self._connected or not self._db_proxy: raise OdooConnectionError("Not connected to Odoo") return self._db_proxy @property def common_proxy(self) -> xmlrpc.client.ServerProxy: """Get common operations proxy. Returns: XML-RPC proxy for common operations Raises: OdooConnectionError: If not connected """ if not self._connected or not self._common_proxy: raise OdooConnectionError("Not connected to Odoo") return self._common_proxy @property def object_proxy(self) -> xmlrpc.client.ServerProxy: """Get object operations proxy. Returns: XML-RPC proxy for object operations Raises: OdooConnectionError: If not connected """ if not self._connected or not self._object_proxy: raise OdooConnectionError("Not connected to Odoo") return self._object_proxy def __enter__(self): """Context manager entry.""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.disconnect() return False def __del__(self): """Cleanup on deletion.""" try: # Only disconnect if we're actually connected if hasattr(self, "_connected") and self._connected: # Suppress logging during cleanup to avoid I/O errors self.disconnect(suppress_logging=True) except (ValueError, AttributeError, RuntimeError): # ValueError: I/O operation on closed file # AttributeError: object might be partially initialized # RuntimeError: various cleanup-related errors pass def list_databases(self) -> List[str]: """List all available databases on the Odoo server. Returns: List of database names Raises: OdooConnectionError: If listing fails or not connected """ if not self._connected: raise OdooConnectionError("Not connected to Odoo") # Warn about potential restrictions in YOLO mode if self.config.is_yolo_enabled: logger.debug( "YOLO mode: Database listing may be restricted on standard Odoo. " "Consider specifying ODOO_DB explicitly if listing fails." ) try: # Call list_db method on database proxy databases = self.db_proxy.list() logger.info(f"Found {len(databases)} databases: {databases}") return databases except xmlrpc.client.Fault as e: if self.config.is_yolo_enabled and "Access Denied" in str(e): # Common error when database listing is restricted logger.warning( "Database listing is restricted on this server. " "Please specify ODOO_DB in your configuration." ) if self.config.database: # Return configured database as fallback return [self.config.database] logger.error(f"Failed to list databases: {e}") raise OdooConnectionError(f"Failed to list databases: {e}") from e except Exception as e: logger.error(f"Failed to list databases: {e}") raise OdooConnectionError(f"Failed to list databases: {e}") from e def database_exists(self, db_name: str) -> bool: """Check if a specific database exists. Args: db_name: Name of the database to check Returns: True if database exists, False otherwise Raises: OdooConnectionError: If check fails """ try: databases = self.list_databases() return db_name in databases except Exception as e: logger.error(f"Failed to check database existence: {e}") raise OdooConnectionError(f"Failed to check database existence: {e}") from e def auto_select_database(self) -> str: """Automatically select an appropriate database. Selection logic: 1. If config.database is set, validate and use it 2. If only one database exists, use it 3. If multiple databases exist and one is named 'odoo', use it 4. Otherwise raise an error Returns: Selected database name Raises: OdooConnectionError: If no suitable database can be selected """ # If database is explicitly configured, use it without validation # Database listing may be restricted for security reasons if self.config.database: db_name = self.config.database logger.info(f"Using configured database: {db_name}") # Skip existence check as database listing might be restricted return db_name # List available databases try: databases = self.list_databases() except Exception as e: # If database listing is restricted, we cannot auto-select logger.warning(f"Cannot list databases (may be restricted): {e}") raise OdooConnectionError( "Database auto-selection failed. Database listing may be restricted. " "Please specify ODOO_DB in your configuration." ) from e # Handle different scenarios if not databases: raise OdooConnectionError("No databases found on Odoo server") if len(databases) == 1: db_name = databases[0] logger.info(f"Auto-selected only available database: {db_name}") return db_name # Multiple databases - check for 'odoo' if "odoo" in databases: logger.info("Auto-selected 'odoo' database from multiple options") return "odoo" # Cannot auto-select raise OdooConnectionError( f"Cannot auto-select database. Found {len(databases)} databases: " f"{', '.join(databases)}. Please specify ODOO_DB in configuration." ) def validate_database_access(self, db_name: str) -> bool: """Validate that we can access the specified database. This method attempts to authenticate with the database to verify access. Args: db_name: Name of the database to validate Returns: True if database is accessible, False otherwise Raises: OdooConnectionError: If validation fails """ if not self._connected: raise OdooConnectionError("Not connected to Odoo") try: # For API key auth, we'll need to implement a different check # For now, we just verify the database exists if self.config.uses_api_key: # API key validation would be done during actual authentication return self.database_exists(db_name) # For username/password auth, try to authenticate if self.config.uses_credentials: # Try to authenticate with the database # This will fail if we don't have access uid = self.common_proxy.authenticate( db_name, self.config.username, self.config.password, {} ) if uid: logger.info(f"Successfully validated access to database '{db_name}'") return True else: logger.warning(f"Authentication failed for database '{db_name}'") return False # Should not reach here due to config validation raise OdooConnectionError("No authentication method configured") except xmlrpc.client.Fault as e: logger.error(f"XML-RPC fault validating database access: {e}") if "Access Denied" in str(e): return False raise OdooConnectionError(f"Failed to validate database access: {e}") from e except Exception as e: logger.error(f"Error validating database access: {e}") raise OdooConnectionError(f"Failed to validate database access: {e}") from e def _authenticate_api_key_standard(self, database: str) -> bool: """Authenticate using API key with standard Odoo XML-RPC (YOLO mode). Args: database: Database name to authenticate against Returns: True if authentication successful, False otherwise """ if not self.config.username: logger.warning("YOLO mode requires username with API key for standard authentication") return False try: # Use standard XML-RPC auth with API key as password uid = self.common_proxy.authenticate( database, self.config.username, self.config.api_key, {} ) if uid: self._uid = uid self._database = database self._auth_method = "api_key" self._authenticated = True logger.info( f"YOLO mode: Authenticated using API key as password for user '{self.config.username}' (UID: {uid})" ) return True else: logger.warning( f"YOLO mode: Authentication failed for user '{self.config.username}'" ) return False except xmlrpc.client.Fault as e: # Handle specific Odoo authentication errors fault_string = str(e.faultString).lower() if "access denied" in fault_string or "wrong login" in fault_string: logger.warning(f"YOLO mode: Invalid credentials for user '{self.config.username}'") else: logger.warning(f"YOLO mode: Authentication error: {e.faultString}") return False except Exception as e: logger.error(f"YOLO mode: Unexpected authentication error: {e}") return False def _authenticate_api_key_mcp(self, database: str) -> bool: """Authenticate using API key with MCP REST endpoint (standard mode). Args: database: Database name to authenticate against Returns: True if authentication successful, False otherwise Raises: OdooConnectionError: If API request fails critically """ try: # Standard MCP API key validation url = f"{self._url_components['base_url']}/mcp/auth/validate" # Create request with API key header req = urllib.request.Request(url) req.add_header("X-API-Key", self.config.api_key) # Make the request with urllib.request.urlopen(req, timeout=self.timeout) as response: data = json.loads(response.read().decode("utf-8")) if data.get("success") and data.get("data", {}).get("valid"): self._uid = data["data"].get("user_id") self._database = database self._auth_method = "api_key" self._authenticated = True logger.info(f"Successfully authenticated with MCP API key (UID: {self._uid})") return True else: logger.warning("MCP API key validation failed") return False except urllib.error.HTTPError as e: if e.code == 401: logger.warning("Invalid MCP API key") return False elif e.code == 404: logger.warning("MCP auth endpoint not found (MCP module may not be installed)") return False elif e.code == 429: logger.warning("Rate limit exceeded during MCP API key validation") return False else: logger.error(f"HTTP error during MCP API key validation: {e}") raise OdooConnectionError(f"Failed to validate API key: HTTP {e.code}") from e except urllib.error.URLError as e: logger.error(f"Network error during MCP API key validation: {e}") raise OdooConnectionError(f"Network error during authentication: {e}") from e except Exception as e: logger.error(f"Unexpected error during MCP API key validation: {e}") raise OdooConnectionError(f"Failed to validate API key: {e}") from e def _authenticate_api_key(self, database: str) -> bool: """Authenticate using API key. Routes to appropriate authentication method based on mode. Args: database: Database name to authenticate against Returns: True if authentication successful, False otherwise Raises: OdooConnectionError: If API request fails critically """ if not self.config.api_key: return False # In YOLO mode, use standard XML-RPC authentication if self.config.is_yolo_enabled: return self._authenticate_api_key_standard(database) else: # In standard mode, use MCP REST endpoint return self._authenticate_api_key_mcp(database) def _authenticate_password(self, database: str) -> bool: """Authenticate using username and password. Args: database: Database name to authenticate against Returns: True if authentication successful, False otherwise Raises: OdooConnectionError: If authentication fails """ if not self.config.username or not self.config.password: return False try: # Use common proxy to authenticate uid = self.common_proxy.authenticate( database, self.config.username, self.config.password, {} ) if uid: self._uid = uid self._database = database self._auth_method = "password" self._authenticated = True logger.info(f"Successfully authenticated with username/password for user ID {uid}") return True else: logger.warning("Username/password authentication failed") return False except xmlrpc.client.Fault as e: logger.warning(f"Authentication fault: {e}") return False except Exception as e: logger.error(f"Error during password authentication: {e}") raise OdooConnectionError(f"Failed to authenticate: {e}") from e def authenticate(self, database: Optional[str] = None) -> None: """Authenticate with Odoo using available credentials. Authentication strategy depends on mode: - Standard mode: Try MCP API key, then fall back to username/password - YOLO mode: Try API key as password, then username/password Args: database: Database name. If not provided, uses auto-selection. Raises: OdooConnectionError: If authentication fails """ if not self._connected: raise OdooConnectionError("Not connected to Odoo") # Get database name if database: db_name = database else: db_name = self.auto_select_database() # Log authentication strategy if self.config.is_yolo_enabled: mode_desc = "read-only" if self.config.yolo_mode == "read" else "full access" logger.info(f"Authenticating in YOLO {mode_desc} mode for database '{db_name}'") else: logger.info(f"Authenticating in standard MCP mode for database '{db_name}'") auth_errors = [] # Try API key authentication first (if available) if self.config.uses_api_key: auth_method = "API key (YOLO mode)" if self.config.is_yolo_enabled else "MCP API key" logger.info(f"Attempting {auth_method} authentication") try: if self._authenticate_api_key(db_name): logger.info(f"Successfully authenticated using {auth_method}") return else: error_msg = f"{auth_method} authentication failed" auth_errors.append(error_msg) # Only try fallback if we have credentials if self.config.uses_credentials: logger.info(f"{error_msg}, trying username/password fallback") except OdooConnectionError as e: # Critical error (network, etc.) - don't try fallback logger.error(f"Critical error during {auth_method} authentication: {e}") raise # Try username/password authentication (if available) if self.config.uses_credentials: logger.info("Attempting username/password authentication") try: if self._authenticate_password(db_name): logger.info("Successfully authenticated using username/password") return else: auth_errors.append("Username/password authentication failed") except OdooConnectionError as e: # Critical error - propagate it logger.error(f"Critical error during password authentication: {e}") raise # Authentication failed - provide detailed error message if auth_errors: error_details = "; ".join(auth_errors) mode_hint = "" if self.config.is_yolo_enabled: mode_hint = " (YOLO mode - ensure Odoo credentials are correct)" else: mode_hint = " (Standard mode - ensure MCP module is installed and API key is valid)" raise OdooConnectionError(f"Authentication failed: {error_details}{mode_hint}") else: raise OdooConnectionError( "No authentication method configured. " "Provide either API key or username/password credentials." ) def is_authenticated(self) -> bool: """Check if currently authenticated.""" return self._authenticated @property def uid(self) -> Optional[int]: """Get authenticated user ID.""" return self._uid @property def database(self) -> Optional[str]: """Get authenticated database name.""" return self._database @property def auth_method(self) -> Optional[str]: """Get authentication method used ('api_key' or 'password').""" return self._auth_method @property def performance_manager(self) -> PerformanceManager: """Get the performance manager instance.""" return self._performance_manager def execute(self, model: str, method: str, *args) -> Any: """Execute an operation on an Odoo model. This is a simplified interface that calls execute_kw with empty kwargs. Args: model: The Odoo model name (e.g., 'res.partner') method: The method to call (e.g., 'search', 'read') *args: Arguments to pass to the method Returns: The result from Odoo Raises: OdooConnectionError: If not authenticated or execution fails """ return self.execute_kw(model, method, list(args), {}) def execute_kw(self, model: str, method: str, args: List[Any], kwargs: Dict[str, Any]) -> Any: """Execute an operation on an Odoo model with keyword arguments. This is the main method for interacting with Odoo models via XML-RPC. Args: model: The Odoo model name (e.g., 'res.partner') method: The method to call (e.g., 'search_read') args: List of positional arguments for the method kwargs: Dictionary of keyword arguments for the method Returns: The result from Odoo Raises: OdooConnectionError: If not authenticated or execution fails """ if not self._authenticated: raise OdooConnectionError("Not authenticated. Call authenticate() first.") if not self._connected: raise OdooConnectionError("Not connected to Odoo") # Get the appropriate password/token based on auth method password_or_token = ( self.config.api_key if self._auth_method == "api_key" else self.config.password ) try: # Log the operation logger.debug(f"Executing {method} on {model} with args={args}, kwargs={kwargs}") # Execute via object proxy result = self.object_proxy.execute_kw( self._database, self._uid, password_or_token, model, method, args, kwargs ) logger.debug("Operation completed successfully") return result except xmlrpc.client.Fault as e: logger.error(f"XML-RPC fault during {method} on {model}: {e}") # Sanitize the fault string before exposing to user sanitized_message = ErrorSanitizer.sanitize_xmlrpc_fault(e.faultString) raise OdooConnectionError(f"Operation failed: {sanitized_message}") from e except socket.timeout: logger.error(f"Timeout during {method} on {model}") raise OdooConnectionError(f"Operation timeout after {self.timeout} seconds") from None except Exception as e: logger.error(f"Error during {method} on {model}: {e}") # Sanitize generic errors as well sanitized_message = ErrorSanitizer.sanitize_message(str(e)) raise OdooConnectionError(f"Operation failed: {sanitized_message}") from e def search(self, model: str, domain: List[Union[str, List[Any]]], **kwargs) -> List[int]: """Search for records matching a domain. Args: model: The Odoo model name domain: Odoo domain filter (e.g., [['is_company', '=', True]]) **kwargs: Additional parameters (limit, offset, order) Returns: List of record IDs matching the domain """ return self.execute_kw(model, "search", [domain], kwargs) def read( self, model: str, ids: List[int], fields: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """Read records by IDs. Args: model: The Odoo model name ids: List of record IDs to read fields: List of field names to read (None for all fields) Returns: List of dictionaries containing record data """ # Try to get cached records cached_records = [] uncached_ids = [] for record_id in ids: cached = self._performance_manager.get_cached_record(model, record_id, fields) if cached: cached_records.append(cached) else: uncached_ids.append(record_id) # If all records are cached, return them if not uncached_ids: logger.debug(f"All {len(ids)} records retrieved from cache") return cached_records # Read uncached records kwargs = {} if fields: kwargs["fields"] = fields with self._performance_manager.monitor.track_operation(f"read_{model}"): new_records = self.execute_kw(model, "read", [uncached_ids], kwargs) # Cache the new records for record in new_records: self._performance_manager.cache_record(model, record, fields) # Combine cached and new records in original order all_records = cached_records + new_records # Sort by the original ID order id_order = {id_val: idx for idx, id_val in enumerate(ids)} all_records.sort(key=lambda r: id_order.get(r.get("id", 0), len(ids))) return all_records def search_read( self, model: str, domain: List[Union[str, List[Any]]], fields: Optional[List[str]] = None, **kwargs, ) -> List[Dict[str, Any]]: """Search for records and read their data in one operation. Args: model: The Odoo model name domain: Odoo domain filter fields: List of field names to read (None for all fields) **kwargs: Additional parameters (limit, offset, order) Returns: List of dictionaries containing record data """ if fields: kwargs["fields"] = fields return self.execute_kw(model, "search_read", [domain], kwargs) def fields_get( self, model: str, attributes: Optional[List[str]] = None ) -> Dict[str, Dict[str, Any]]: """Get field definitions for a model. Args: model: The Odoo model name attributes: List of field attributes to return Returns: Dictionary mapping field names to their definitions """ # Check cache first cached_fields = self._performance_manager.get_cached_fields(model) if cached_fields and not attributes: # Only use cache if no specific attributes requested logger.debug(f"Field definitions for {model} retrieved from cache") return cached_fields # Get fields from server kwargs = {} if attributes: kwargs["attributes"] = attributes with self._performance_manager.monitor.track_operation(f"fields_get_{model}"): fields = self.execute_kw(model, "fields_get", [], kwargs) # Cache if we got all attributes if not attributes: self._performance_manager.cache_fields(model, fields) return fields def search_count(self, model: str, domain: List[Union[str, List[Any]]]) -> int: """Count records matching a domain. Args: model: The Odoo model name domain: Odoo domain filter Returns: Number of records matching the domain """ return self.execute_kw(model, "search_count", [domain], {}) def create(self, model: str, values: Dict[str, Any]) -> int: """Create a new record. Args: model: The Odoo model name values: Dictionary of field values for the new record Returns: ID of the created record Raises: OdooConnectionError: If creation fails """ try: with self._performance_manager.monitor.track_operation(f"create_{model}"): record_id = self.execute_kw(model, "create", [values], {}) # Invalidate cache for this model self._performance_manager.invalidate_record_cache(model) logger.info(f"Created {model} record with ID {record_id}") return record_id except Exception as e: logger.error(f"Failed to create {model} record: {e}") raise def write(self, model: str, ids: List[int], values: Dict[str, Any]) -> bool: """Update existing records. Args: model: The Odoo model name ids: List of record IDs to update values: Dictionary of field values to update Returns: True if update was successful Raises: OdooConnectionError: If update fails """ try: with self._performance_manager.monitor.track_operation(f"write_{model}"): result = self.execute_kw(model, "write", [ids, values], {}) # Invalidate cache for updated records for record_id in ids: self._performance_manager.invalidate_record_cache(model, record_id) logger.info(f"Updated {len(ids)} {model} record(s)") return result except Exception as e: logger.error(f"Failed to update {model} records: {e}") raise def unlink(self, model: str, ids: List[int]) -> bool: """Delete records. Args: model: The Odoo model name ids: List of record IDs to delete Returns: True if deletion was successful Raises: OdooConnectionError: If deletion fails """ try: with self._performance_manager.monitor.track_operation(f"unlink_{model}"): result = self.execute_kw(model, "unlink", [ids], {}) # Invalidate cache for deleted records for record_id in ids: self._performance_manager.invalidate_record_cache(model, record_id) logger.info(f"Deleted {len(ids)} {model} record(s)") return result except Exception as e: logger.error(f"Failed to delete {model} records: {e}") raise def get_server_version(self) -> Optional[Dict[str, Any]]: """Get Odoo server version information. Returns: Dictionary with version information or None if not connected """ if not self._connected: return None try: return self.common_proxy.version() except Exception as e: logger.error(f"Failed to get server version: {e}") return None @contextmanager def create_connection(config: OdooConfig, timeout: int = OdooConnection.DEFAULT_TIMEOUT): """Create a connection context manager. Args: config: OdooConfig object timeout: Connection timeout in seconds Yields: Connected OdooConnection instance Example: with create_connection(config) as conn: # Use connection version = conn.common_proxy.version() """ conn = OdooConnection(config, timeout) try: conn.connect() yield conn finally: conn.disconnect()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ivnvxd/mcp-server-odoo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server