"""
Odoo XML-RPC Client with thread-safe factory.
Provides a lazy-loaded, thread-safe client for Odoo operations.
"""
from __future__ import annotations
import logging
import xmlrpc.client
from threading import Lock
from typing import Any, Sequence
from .config import settings
from .constants import DEFAULT_LIMIT, DEFAULT_OFFSET, MAX_LIMIT
from .exceptions import (
OdooAuthenticationError,
OdooConnectionError,
OdooError,
OdooNotFoundError,
OdooPermissionError,
)
logger = logging.getLogger(__name__)
class OdooClient:
"""
Odoo XML-RPC client with lazy loading.
Uses __slots__ for memory efficiency and lazy-loads connections.
Example:
>>> client = OdooClient()
>>> projects = client.search_read("project.project", [], ["name"])
"""
__slots__ = ("_url", "_db", "_username", "_api_key", "_uid", "_models", "_common")
def __init__(
self,
url: str | None = None,
db: str | None = None,
username: str | None = None,
api_key: str | None = None,
):
"""
Initialize the client.
Args:
url: Odoo server URL (defaults to settings)
db: Database name (defaults to settings)
username: Username (defaults to settings)
api_key: API key (defaults to settings)
"""
self._url = url or settings.odoo.url
self._db = db or settings.odoo.db
self._username = username or settings.odoo.username
self._api_key = api_key or settings.odoo.api_key
self._uid: int | None = None
self._models: xmlrpc.client.ServerProxy | None = None
self._common: xmlrpc.client.ServerProxy | None = None
@property
def common(self) -> xmlrpc.client.ServerProxy:
"""Get the common endpoint (lazy-loaded)."""
if self._common is None:
try:
self._common = xmlrpc.client.ServerProxy(
f"{self._url}/xmlrpc/2/common",
allow_none=True,
)
except Exception as e:
raise OdooConnectionError(
f"Failed to connect to Odoo common endpoint: {e}",
details={"url": self._url},
)
return self._common
@property
def models(self) -> xmlrpc.client.ServerProxy:
"""Get the models endpoint (lazy-loaded)."""
if self._models is None:
try:
self._models = xmlrpc.client.ServerProxy(
f"{self._url}/xmlrpc/2/object",
allow_none=True,
)
except Exception as e:
raise OdooConnectionError(
f"Failed to connect to Odoo models endpoint: {e}",
details={"url": self._url},
)
return self._models
@property
def uid(self) -> int:
"""Get the authenticated user ID (lazy-loaded)."""
if self._uid is None:
try:
self._uid = self.common.authenticate(
self._db,
self._username,
self._api_key,
{},
)
if not self._uid:
raise OdooAuthenticationError(
"Authentication failed. Check credentials.",
details={"username": self._username, "db": self._db},
)
logger.info(
"Authenticated to Odoo",
extra={"uid": self._uid, "db": self._db},
)
except OdooAuthenticationError:
raise
except Exception as e:
raise OdooAuthenticationError(
f"Authentication error: {e}",
details={"username": self._username},
)
return self._uid
def execute(
self,
model: str,
method: str,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Execute an Odoo model method.
Args:
model: Model technical name (e.g., "res.partner")
method: Method name (e.g., "search_read")
*args: Positional arguments for the method
**kwargs: Keyword arguments for the method
Returns:
Method result
Raises:
OdooError: On Odoo errors
OdooConnectionError: On connection errors
OdooPermissionError: On access denied errors
"""
try:
logger.debug(
"Executing Odoo method",
extra={"model": model, "method": method},
)
return self.models.execute_kw(
self._db,
self.uid,
self._api_key,
model,
method,
args,
kwargs,
)
except xmlrpc.client.Fault as e:
error_msg = str(e.faultString)
if "AccessError" in error_msg or "access" in error_msg.lower():
raise OdooPermissionError(
f"Access denied for {method} on {model}",
details={"error": error_msg},
)
raise OdooError(
f"Odoo error: {error_msg}",
details={"model": model, "method": method},
)
except ConnectionError as e:
raise OdooConnectionError(
f"Connection lost: {e}",
details={"url": self._url},
)
except Exception as e:
raise OdooError(
f"Unexpected error: {e}",
details={"model": model, "method": method},
)
# =========================================================================
# High-Level Methods
# =========================================================================
def search(
self,
model: str,
domain: list[Any],
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET,
order: str | None = None,
) -> list[int]:
"""
Search for record IDs.
Args:
model: Model technical name
domain: Search domain
limit: Maximum records to return
offset: Number of records to skip
order: Sort order (e.g., "name asc")
Returns:
List of record IDs
"""
kwargs: dict[str, Any] = {
"limit": min(limit, MAX_LIMIT),
"offset": offset,
}
if order:
kwargs["order"] = order
return self.execute(model, "search", domain, **kwargs)
def read(
self,
model: str,
ids: Sequence[int],
fields: list[str] | None = None,
) -> list[dict[str, Any]]:
"""
Read records by ID.
Args:
model: Model technical name
ids: Record IDs to read
fields: Fields to read (None for all)
Returns:
List of record dictionaries
"""
return self.execute(model, "read", list(ids), fields or [])
def search_read(
self,
model: str,
domain: list[Any],
fields: list[str] | None = None,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET,
order: str | None = None,
) -> list[dict[str, Any]]:
"""
Search and read records in one call.
Args:
model: Model technical name
domain: Search domain
fields: Fields to read
limit: Maximum records to return
offset: Number of records to skip
order: Sort order
Returns:
List of record dictionaries
"""
kwargs: dict[str, Any] = {
"fields": fields or [],
"limit": min(limit, MAX_LIMIT),
"offset": offset,
}
if order:
kwargs["order"] = order
return self.execute(model, "search_read", domain, **kwargs)
def create(
self,
model: str,
values: dict[str, Any],
) -> int:
"""
Create a new record.
Args:
model: Model technical name
values: Field values for the new record
Returns:
ID of the created record
"""
record_id = self.execute(model, "create", values)
logger.info(
"Created record",
extra={"model": model, "id": record_id},
)
return record_id
def write(
self,
model: str,
ids: Sequence[int],
values: dict[str, Any],
) -> bool:
"""
Update existing records.
Args:
model: Model technical name
ids: Record IDs to update
values: Field values to update
Returns:
True if successful
"""
result = self.execute(model, "write", list(ids), values)
logger.info(
"Updated records",
extra={"model": model, "ids": list(ids), "fields": list(values.keys())},
)
return result
def unlink(
self,
model: str,
ids: Sequence[int],
) -> bool:
"""
Delete records.
Args:
model: Model technical name
ids: Record IDs to delete
Returns:
True if successful
"""
result = self.execute(model, "unlink", list(ids))
logger.info(
"Deleted records",
extra={"model": model, "ids": list(ids)},
)
return result
def search_count(
self,
model: str,
domain: list[Any],
) -> int:
"""
Count records matching domain.
Args:
model: Model technical name
domain: Search domain
Returns:
Number of matching records
"""
return self.execute(model, "search_count", domain)
def get_record(
self,
model: str,
record_id: int,
fields: list[str] | None = None,
) -> dict[str, Any]:
"""
Get a single record by ID.
Args:
model: Model technical name
record_id: Record ID
fields: Fields to read
Returns:
Record dictionary
Raises:
OdooNotFoundError: If record doesn't exist
"""
records = self.read(model, [record_id], fields)
if not records:
raise OdooNotFoundError(
f"Record {record_id} not found in {model}",
model=model,
record_id=record_id,
)
return records[0]
def exists(
self,
model: str,
record_id: int,
) -> bool:
"""
Check if a record exists.
Args:
model: Model technical name
record_id: Record ID
Returns:
True if record exists
"""
ids = self.search(model, [("id", "=", record_id)], limit=1)
return len(ids) > 0
def get_user_info(self) -> dict[str, Any]:
"""
Get current user information.
Returns:
User record with name, email, and company info
"""
return self.get_record(
"res.users",
self.uid,
["name", "login", "email", "company_id"],
)
def test_connection(self) -> dict[str, Any]:
"""
Test the connection and return server info.
Returns:
Dictionary with version and user info
"""
version = self.common.version()
user = self.get_user_info()
return {
"status": "connected",
"server_version": version.get("server_version"),
"database": self._db,
"user": user.get("name"),
"user_id": self.uid,
"company": user.get("company_id", [None, "N/A"])[1],
}
class OdooClientFactory:
"""
Thread-safe factory for OdooClient instances.
Implements singleton pattern with thread safety.
Example:
>>> client = OdooClientFactory.get_client()
>>> # Same instance returned on subsequent calls
>>> client2 = OdooClientFactory.get_client()
>>> assert client is client2
"""
_instance: OdooClient | None = None
_lock: Lock = Lock()
@classmethod
def get_client(cls) -> OdooClient:
"""
Get the shared OdooClient instance.
Thread-safe singleton access.
Returns:
OdooClient instance
"""
if cls._instance is None:
with cls._lock:
# Double-check locking pattern
if cls._instance is None:
cls._instance = OdooClient()
logger.info("OdooClient instance created")
return cls._instance
@classmethod
def reset(cls) -> None:
"""
Reset the client instance.
Useful for testing or connection refresh.
"""
with cls._lock:
cls._instance = None
logger.info("OdooClient instance reset")
@classmethod
def is_initialized(cls) -> bool:
"""Check if client is initialized."""
return cls._instance is not None
# Convenience function for quick access
def get_client() -> OdooClient:
"""
Get the shared OdooClient instance.
Convenience function wrapping OdooClientFactory.get_client().
Returns:
OdooClient instance
"""
return OdooClientFactory.get_client()