"""Oracle Fusion API Client."""
import base64
from datetime import datetime, timedelta
from typing import Optional
from urllib.parse import urlencode
import httpx
from .config import settings
from .errors import handle_oracle_api_error, sanitize_error_for_logging
from .models import OracleApiResponse, OracleInvoice, SearchFilters
class OracleClient:
"""Oracle Fusion REST API client."""
def __init__(self):
self.base_url = settings.oracle_api_url
self.timeout = settings.request_timeout
def _create_auth_header(self, username: str, password: str) -> str:
"""Create Basic Auth header."""
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded}"
def _build_oracle_query(self, filters: dict) -> str:
"""Build Oracle REST API query filter (q parameter)."""
conditions = []
if filters.get("customer_name"):
conditions.append(f"CustomerName LIKE '%{filters['customer_name']}%'")
if filters.get("invoice_number"):
conditions.append(f"InvoiceNumber='{filters['invoice_number']}'")
if filters.get("date_from"):
conditions.append(f"InvoiceDate>='{filters['date_from']}'")
if filters.get("date_to"):
conditions.append(f"InvoiceDate<='{filters['date_to']}'")
if filters.get("status"):
conditions.append(f"InvoiceStatus='{filters['status']}'")
if filters.get("amountGreaterThan") is not None:
conditions.append(f"InvoiceAmount>{filters['amountGreaterThan']}")
if filters.get("amountLessThan") is not None:
conditions.append(f"InvoiceAmount<{filters['amountLessThan']}")
return ";".join(conditions)
async def list_invoices(
self,
username: str,
password: str,
limit: int = 20,
offset: int = 0,
customer_name: Optional[str] = None,
invoice_number: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
status: Optional[str] = None,
) -> OracleApiResponse:
"""List invoices with optional filtering."""
try:
query_params = {
"limit": limit,
"offset": offset,
}
# Build Oracle query filter
filters = {
"customer_name": customer_name,
"invoice_number": invoice_number,
"date_from": date_from,
"date_to": date_to,
"status": status,
}
oracle_query = self._build_oracle_query(filters)
if oracle_query:
query_params["q"] = oracle_query
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
self.base_url,
params=query_params,
headers={
"Authorization": self._create_auth_header(username, password),
"Accept": "application/json",
},
)
response.raise_for_status()
data = response.json()
return OracleApiResponse(**data)
except Exception as e:
print(f"Oracle API Error: {sanitize_error_for_logging(e)}")
handle_oracle_api_error(e)
raise # This won't be reached due to handle_oracle_api_error raising
async def get_invoice_details(
self,
username: str,
password: str,
invoice_id: str,
) -> OracleInvoice:
"""Get detailed information about a specific invoice."""
try:
url = f"{self.base_url}/{invoice_id}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
url,
headers={
"Authorization": self._create_auth_header(username, password),
"Accept": "application/json",
},
)
response.raise_for_status()
data = response.json()
return OracleInvoice(**data)
except Exception as e:
print(f"Oracle API Error: {sanitize_error_for_logging(e)}")
handle_oracle_api_error(e)
raise
async def search_invoices(
self,
username: str,
password: str,
q: Optional[str] = None,
filters: Optional[SearchFilters] = None,
limit: int = 20,
offset: int = 0,
) -> OracleApiResponse:
"""Search invoices with advanced filtering."""
try:
query_params = {
"limit": limit,
"offset": offset,
}
conditions = []
# Add text search
if q:
conditions.append(q)
# Add filter conditions
if filters:
filter_dict = {}
if filters.amountGreaterThan is not None:
filter_dict["amountGreaterThan"] = filters.amountGreaterThan
if filters.amountLessThan is not None:
filter_dict["amountLessThan"] = filters.amountLessThan
if filters.customerId:
conditions.append(f"CustomerId='{filters.customerId}'")
# Handle overdue days filter
if filters.overdueDays is not None:
overdue_date = datetime.now() - timedelta(days=filters.overdueDays)
overdue_date_str = overdue_date.strftime("%Y-%m-%d")
conditions.append(f"DueDate<'{overdue_date_str}'")
conditions.append("InvoiceStatus='Open'")
filter_query = self._build_oracle_query(filter_dict)
if filter_query:
conditions.append(filter_query)
if conditions:
query_params["q"] = ";".join(conditions)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
self.base_url,
params=query_params,
headers={
"Authorization": self._create_auth_header(username, password),
"Accept": "application/json",
},
)
response.raise_for_status()
data = response.json()
return OracleApiResponse(**data)
except Exception as e:
print(f"Oracle API Error: {sanitize_error_for_logging(e)}")
handle_oracle_api_error(e)
raise
async def health_check(self) -> bool:
"""Health check - verify API connectivity."""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(settings.oracle_base_url)
return response.status_code < 500
except Exception:
return False
# Global client instance
oracle_client = OracleClient()