Skip to main content
Glama
vzeman
by vzeman
odoo_service.py10.1 kB
"""Odoo service for API communication.""" import xmlrpc.client import ssl from typing import Any, Dict, List, Optional, Union from ..config import get_config from ..logger import get_logger from .cache_service import CacheService, get_cache_service logger = get_logger(__name__) class OdooService: """Service for interacting with Odoo via XML-RPC.""" def __init__(self): """Initialize Odoo service with configuration.""" self.config = get_config().odoo self.cache = get_cache_service() self.url = self.config.url self.database = self.config.database self.username = self.config.username self.password = self.config.api_key or self.config.password self.uid: Optional[int] = None # Create SSL context that doesn't verify certificates (for development) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # Initialize XML-RPC clients with SSL context self.common = xmlrpc.client.ServerProxy( f"{self.url}/xmlrpc/2/common", context=ssl_context ) self.models = xmlrpc.client.ServerProxy( f"{self.url}/xmlrpc/2/object", context=ssl_context, allow_none=True, use_builtin_types=True ) logger.info(f"Odoo service initialized for {self.url}/{self.database}") def authenticate(self) -> int: """Authenticate with Odoo and return user ID.""" if self.uid is None: cache_key = f"auth:{self.username}:{self.database}" cached_uid = self.cache.get(cache_key) if cached_uid: self.uid = cached_uid logger.debug("Using cached authentication") else: logger.info("Authenticating with Odoo...") self.uid = self.common.authenticate( self.database, self.username, self.password, {} ) if not self.uid: raise ValueError("Authentication failed. Check your credentials.") # Cache authentication for 1 hour self.cache.set(cache_key, self.uid, ttl=3600) logger.info(f"Authentication successful, user ID: {self.uid}") return self.uid def execute( self, model: str, method: str, *args: Any, **kwargs: Any ) -> Any: """Execute a method on an Odoo model.""" uid = self.authenticate() logger.debug(f"Executing {model}.{method} with args={args}, kwargs={kwargs}") try: result = self.models.execute_kw( self.database, uid, self.password, model, method, args, kwargs ) logger.debug(f"Execution successful, result type: {type(result)}") return result except Exception as e: logger.error(f"Execution failed: {e}") raise def search( self, model: str, domain: Optional[List[List[Any]]] = None, offset: int = 0, limit: Optional[int] = None, order: Optional[str] = None, ) -> List[int]: """Search for record IDs matching the domain.""" domain = domain or [] kwargs: Dict[str, Any] = {"offset": offset} if limit is not None: kwargs["limit"] = limit if order is not None: kwargs["order"] = order # Generate cache key cache_key = self.cache.generate_key( "search", model, str(domain), offset, limit, order ) # Try cache first cached_result = self.cache.get(cache_key) if cached_result is not None: return cached_result result = self.execute(model, "search", domain, **kwargs) # Cache the result self.cache.set(cache_key, result) return result def search_read( self, model: str, domain: Optional[List[List[Any]]] = None, fields: Optional[List[str]] = None, offset: int = 0, limit: Optional[int] = None, order: Optional[str] = None, ) -> List[Dict[str, Any]]: """Search and read records in a single call.""" domain = domain or [] kwargs: Dict[str, Any] = {"offset": offset} if fields is not None: kwargs["fields"] = fields if limit is not None: kwargs["limit"] = limit if order is not None: kwargs["order"] = order # Generate cache key cache_key = self.cache.generate_key( "search_read", model, str(domain), str(fields), offset, limit, order ) # Try cache first cached_result = self.cache.get(cache_key) if cached_result is not None: return cached_result result = self.execute(model, "search_read", domain, **kwargs) # Cache the result self.cache.set(cache_key, result) return result def read( self, model: str, ids: Union[int, List[int]], fields: Optional[List[str]] = None, ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: """Read records by IDs.""" if isinstance(ids, int): ids = [ids] single_record = True else: single_record = False kwargs: Dict[str, Any] = {} if fields is not None: kwargs["fields"] = fields # Generate cache key cache_key = self.cache.generate_key( "read", model, str(sorted(ids)), str(fields) ) # Try cache first cached_result = self.cache.get(cache_key) if cached_result is not None: return cached_result[0] if single_record and cached_result else cached_result result = self.execute(model, "read", ids, **kwargs) # Cache the result self.cache.set(cache_key, result) return result[0] if single_record and result else result def create( self, model: str, values: Union[Dict[str, Any], List[Dict[str, Any]]], ) -> Union[int, List[int]]: """Create one or more records.""" single_record = isinstance(values, dict) if single_record: values = [values] logger.info(f"Creating {len(values)} record(s) in {model}") result = self.execute(model, "create", values) # Clear related cache entries self._invalidate_cache(model) return result[0] if single_record else result def write( self, model: str, ids: Union[int, List[int]], values: Dict[str, Any], ) -> bool: """Update records.""" if isinstance(ids, int): ids = [ids] logger.info(f"Updating {len(ids)} record(s) in {model}") result = self.execute(model, "write", ids, values) # Clear related cache entries self._invalidate_cache(model) return result def unlink( self, model: str, ids: Union[int, List[int]], ) -> bool: """Delete records.""" if isinstance(ids, int): ids = [ids] logger.info(f"Deleting {len(ids)} record(s) from {model}") result = self.execute(model, "unlink", ids) # Clear related cache entries self._invalidate_cache(model) return result def fields_get( self, model: str, fields: Optional[List[str]] = None, attributes: Optional[List[str]] = None, ) -> Dict[str, Dict[str, Any]]: """Get field definitions for a model.""" kwargs: Dict[str, Any] = {} if fields is not None: kwargs["allfields"] = fields if attributes is not None: kwargs["attributes"] = attributes # Generate cache key cache_key = self.cache.generate_key( "fields_get", model, str(fields), str(attributes) ) # Try cache first (fields don't change often) cached_result = self.cache.get(cache_key) if cached_result is not None: return cached_result result = self.execute(model, "fields_get", **kwargs) # Cache for longer (1 hour) since fields don't change often self.cache.set(cache_key, result, ttl=3600) return result def get_model_list(self) -> List[Dict[str, Any]]: """Get list of all available models.""" cache_key = "model_list" # Try cache first cached_result = self.cache.get(cache_key) if cached_result is not None: return cached_result result = self.search_read("ir.model", [], ["model", "name", "transient"]) # Cache for longer (1 hour) since models don't change often self.cache.set(cache_key, result, ttl=3600) return result def _invalidate_cache(self, model: str) -> None: """Invalidate cache entries related to a model.""" # This is a simple implementation - in production you might want # more sophisticated cache invalidation logger.debug(f"Invalidating cache for model: {model}") # For now, we don't implement selective cache invalidation # The cache will naturally expire based on TTL # Global Odoo service instance _odoo_service: Optional[OdooService] = None def get_odoo_service() -> OdooService: """Get or create global Odoo service instance.""" global _odoo_service if _odoo_service is None: _odoo_service = OdooService() return _odoo_service

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vzeman/odoo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server